Source code for hed.schema.schema_io.df2schema

"""
This module is used to create a HedSchema object from a set of .tsv files.
"""
import io

from hed.schema.schema_io import df_util, load_dataframes
from hed.schema.hed_schema_constants import HedSectionKey, HedKey
from hed.errors.exceptions import HedFileError, HedExceptions
from hed.schema.schema_io.base2schema import SchemaLoader
import pandas as pd
import hed.schema.hed_schema_df_constants as constants
from hed.errors import error_reporter
from hed.schema.schema_io import text_util


[docs]class SchemaLoaderDF(SchemaLoader): """ Load dataframe schemas from filenames Expected usage is SchemaLoaderDF.load(filenames) Note: due to supporting multiple files, this one differs from the other schema loaders """
[docs] def __init__(self, filenames, schema_as_strings_or_df, name=""): self.filenames = df_util.convert_filenames_to_dict(filenames) self.schema_as_strings_or_df = schema_as_strings_or_df if self.filenames: reported_filename = self.filenames.get(constants.STRUCT_KEY) else: reported_filename = "from_strings" super().__init__(reported_filename, None, None, None, name) self._schema.source_format = "spreadsheet"
[docs] @classmethod def load_spreadsheet(cls, filenames=None, schema_as_strings_or_df=None, name=""): """ Loads and returns the schema, including partnered schema if applicable. Parameters: filenames(str or None or dict of str): A valid set of schema spreadsheet filenames If a single filename string, assumes the standard filename suffixes. schema_as_strings_or_df(None or dict of str): A valid set of schema spreadsheet files(tsv as strings) name (str): what to identify this schema as Returns: schema(HedSchema): The new schema """ loader = cls(filenames, schema_as_strings_or_df=schema_as_strings_or_df, name=name) return loader._load()
def _open_file(self): if self.filenames: dataframes = load_dataframes(self.filenames) else: dataframes = load_dataframes_from_strings(self.schema_as_strings_or_df) return dataframes def _get_header_attributes(self, file_data): header_attributes = {} for row_number, row in file_data[constants.STRUCT_KEY].iterrows(): cls = row[constants.subclass_of] attributes = row[constants.attributes] if cls == "HedHeader" and attributes: header_attributes, _ = text_util._parse_header_attributes_line(attributes) continue return header_attributes def _parse_data(self): self._schema.prologue, self._schema.epilogue = self._get_prologue_epilogue(self.input_data) self._schema._initialize_attributes(HedSectionKey.Properties) self._read_attribute_section(self.input_data[constants.ATTRIBUTE_PROPERTY_KEY], section_key=HedSectionKey.Properties) self._read_attributes() self._read_section(self.input_data[constants.UNIT_MODIFIER_KEY], HedSectionKey.UnitModifiers) self._read_section(self.input_data[constants.VALUE_CLASS_KEY], HedSectionKey.ValueClasses) self._read_section(self.input_data[constants.UNIT_CLASS_KEY], HedSectionKey.UnitClasses) self._read_units(self.input_data[constants.UNIT_KEY]) # This one is a special case self._read_schema(self.input_data) if self.fatal_errors: self.fatal_errors = error_reporter.sort_issues(self.fatal_errors) raise HedFileError(self.fatal_errors[0]['code'], f"{len(self.fatal_errors)} issues found when parsing schema. See the .issues " f"parameter on this exception for more details.", self.name, issues=self.fatal_errors) def _get_prologue_epilogue(self, file_data): prologue, epilogue = "", "" for row_number, row in file_data[constants.STRUCT_KEY].iterrows(): cls = row[constants.subclass_of] description = row[constants.description] if cls == "HedPrologue" and description: prologue = description.replace("\\n", "\n") continue elif cls == "HedEpilogue" and description: epilogue = description.replace("\\n", "\n") return prologue, epilogue def _read_schema(self, dataframe): """Add the main schema section Parameters: dataframe (pd.DataFrame): The dataframe for the main tags section """ self._schema._initialize_attributes(HedSectionKey.Tags) known_parent_tags = {"HedTag": []} iterations = 0 # Handle this over multiple iterations in case tags have parent tags listed later in the file. # A properly formatted .tsv file will never have parents after the child. current_rows = list(dataframe[constants.TAG_KEY].iterrows()) while current_rows: iterations += 1 next_round_rows = [] for row_number, row in current_rows: # skip blank rows, though there shouldn't be any if not any(row): continue parent_tag = row[constants.subclass_of] org_parent_tags = known_parent_tags.get(parent_tag) tag_entry = self._create_tag_entry(org_parent_tags, row_number, row) if not tag_entry: # This will have already raised an error continue # If this is NOT a rooted tag and we have no parent, try it in another round. if org_parent_tags is None and not tag_entry.has_attribute(HedKey.Rooted): next_round_rows.append((row_number, row)) continue tag_entry = self._add_tag_entry(tag_entry, row_number, row) if tag_entry: known_parent_tags[tag_entry.short_tag_name] = tag_entry.name.split("/") if len(next_round_rows) == len(current_rows): for row_number, row in current_rows: tag_name = self._get_tag_name(row) msg = (f"Cannot resolve parent tag. " f"There is probably an issue with circular parent tags of {tag_name} on row {row_number}.") self._add_fatal_error(row_number, row, msg, HedExceptions.SCHEMA_TAG_TSV_BAD_PARENT) break current_rows = next_round_rows def _add_tag_entry(self, tag_entry, row_number, row): try: rooted_entry = self.find_rooted_entry(tag_entry, self._schema, self._loading_merged) if rooted_entry: parent_tags = rooted_entry.long_tag_name.split("/") # Create the entry again for rooted tags, to get the full name. tag_entry = self._create_tag_entry(parent_tags, row_number, row) except HedFileError as e: self._add_fatal_error(row_number, row, e.message, e.code) return None tag_entry = self._add_to_dict(row_number, row, tag_entry, HedSectionKey.Tags) return tag_entry def _create_tag_entry(self, parent_tags, row_number, row): """ Create a tag entry(does not add to dict) Parameters: parent_tags (list): A list of parent tags in order. row_number (int): The row number to report errors as row (str or pd.Series): A tag row or pandas series(depends on format) Returns: HedSchemaEntry: The entry for the added tag. Notes: Includes attributes and description. """ tag_name = self._get_tag_name(row) if tag_name: if parent_tags: long_tag_name = "/".join(parent_tags) + "/" + tag_name else: long_tag_name = tag_name return self._create_entry(row_number, row, HedSectionKey.Tags, long_tag_name) self._add_fatal_error(row_number, row, f"No tag name found in row.", error_code=HedExceptions.GENERIC_ERROR) def _read_section(self, df, section_key): self._schema._initialize_attributes(section_key) for row_number, row in df.iterrows(): new_entry = self._create_entry(row_number, row, section_key) self._add_to_dict(row_number, row, new_entry, section_key) def _read_units(self, df): self._schema._initialize_attributes(HedSectionKey.Units) for row_number, row in df.iterrows(): new_entry = self._create_entry(row_number, row, HedSectionKey.Units) unit_class_name = row[constants.has_unit_class] unit_class_entry = self._schema.get_tag_entry(unit_class_name, HedSectionKey.UnitClasses) unit_class_entry.add_unit(new_entry) self._add_to_dict(row_number, row, new_entry, HedSectionKey.Units) def _read_attributes(self): self._schema._initialize_attributes(HedSectionKey.Attributes) self._read_attribute_section(self.input_data[constants.ANNOTATION_KEY], True) self._read_attribute_section(self.input_data[constants.OBJECT_KEY]) self._read_attribute_section(self.input_data[constants.DATA_KEY]) def _read_attribute_section(self, df, annotation_property=False, section_key=HedSectionKey.Attributes): # todo: this needs to ALSO check range/domain(and verify they match) for row_number, row in df.iterrows(): new_entry = self._create_entry(row_number, row, section_key) if annotation_property: new_entry._set_attribute_value(HedKey.AnnotationProperty, True) self._add_to_dict(row_number, row, new_entry, section_key) def _get_tag_name(self, row): base_tag_name = row[constants.name] if base_tag_name.endswith("-#"): return "#" return base_tag_name def _create_entry(self, row_number, row, key_class, full_tag_name=None): element_name = self._get_tag_name(row) if full_tag_name: element_name = full_tag_name node_attributes = self._get_tag_attributes(row_number, row) hed_id = row[constants.hed_id] if hed_id: node_attributes[HedKey.HedID] = hed_id description = row[constants.description] tag_entry = self._schema._create_tag_entry(element_name, key_class) if description: tag_entry.description = description.strip() for attribute_name, attribute_value in node_attributes.items(): tag_entry._set_attribute_value(attribute_name, attribute_value) return tag_entry def _get_tag_attributes(self, row_number, row): """ Get the tag attributes from a row. Parameters: row_number (int): The row number to report errors as. row (pd.Series): A tag row. Returns: dict: Dictionary of attributes. """ try: return df_util.get_attributes_from_row(row) except ValueError as e: self._add_fatal_error(row_number, str(row), str(e)) def _add_to_dict(self, row_number, row, entry, key_class): if entry.has_attribute(HedKey.InLibrary) and not self._loading_merged and not self.appending_to_schema: self._add_fatal_error(row_number, row, "Library tag in unmerged schema has InLibrary attribute", HedExceptions.IN_LIBRARY_IN_UNMERGED) return self._add_to_dict_base(entry, key_class)
[docs]def load_dataframes_from_strings(schema_data): """ Load the given strings/dataframes as dataframes. Parameters: schema_data(dict): The dict of files(strings or pd.DataFrames) key being constants like TAG_KEY Returns: schema_data(dict): A dict with the same keys as schema_data, but values are dataframes if not before """ return {key: value if isinstance(value, pd.DataFrame) else pd.read_csv(io.StringIO(value), sep="\t", dtype=str, na_filter=False) for key, value in schema_data.items()}