Source code for hed.schema.schema_io.schema2df

"""Allows output of HedSchema objects as .tsv format"""

from hed.schema.hed_schema_constants import HedSectionKey, HedKey
from hed.schema.schema_io.df_util import create_empty_dataframes, get_library_name_and_id, remove_prefix, \
    calculate_attribute_type
from hed.schema.schema_io.schema2base import Schema2Base
from hed.schema.schema_io import text_util
import pandas as pd
import hed.schema.hed_schema_df_constants as constants
from hed.schema.hed_schema_entry import HedTagEntry

section_key_to_df = {
    HedSectionKey.Tags: constants.TAG_KEY,
    HedSectionKey.Units: constants.UNIT_KEY,
    HedSectionKey.UnitClasses: constants.UNIT_CLASS_KEY,
    HedSectionKey.UnitModifiers: constants.UNIT_MODIFIER_KEY,
    HedSectionKey.ValueClasses: constants.VALUE_CLASS_KEY,
    HedSectionKey.Attributes: HedSectionKey.Attributes,
    HedSectionKey.Properties: HedSectionKey.Properties
}


[docs]class Schema2DF(Schema2Base):
[docs] def __init__(self, get_as_ids=False): """ Constructor for schema to dataframe converter Parameters: get_as_ids(bool): If true, return the hedId rather than name in most places This is mostly relevant for creating an ontology. """ super().__init__() self._get_as_ids = get_as_ids self._tag_rows = []
def _get_object_name_and_id(self, object_name, include_prefix=False): """ Get the adjusted name and ID for the given object type. Parameters: object_name(str): The name of the base hed object, e.g. HedHeader, HedUnit include_prefix(bool): If True, include the "hed:" Returns: object_name(str): The inherited object name, e.g. StandardHeader hed_id(str): The full formatted hed_id """ prefix, obj_id = get_library_name_and_id(self._schema) name = f"{prefix}{remove_prefix(object_name, 'Hed')}" full_hed_id = self._get_object_id(object_name, obj_id, include_prefix) return name, full_hed_id def _get_object_id(self, object_name, base_id=0, include_prefix=False): prefix = "" if include_prefix: prefix = "hed:" return f"{prefix}HED_{base_id + constants.struct_base_ids[object_name]:07d}" # ========================================= # Required baseclass function # ========================================= def _initialize_output(self): self.output = create_empty_dataframes() self._tag_rows = [] def _create_and_add_object_row(self, base_object, attributes="", description=""): name, full_hed_id = self._get_object_name_and_id(base_object) new_row = { constants.hed_id: full_hed_id, constants.name: name, constants.attributes: attributes, constants.subclass_of: base_object, constants.description: description.replace("\n", "\\n"), constants.equivalent_to: self._get_header_equivalent_to(attributes, base_object) } self.output[constants.STRUCT_KEY].loc[len(self.output[constants.STRUCT_KEY])] = new_row def _output_header(self, attributes, prologue): base_object = "HedHeader" attributes_string = self._get_attribs_string_from_schema(attributes, sep=", ") self._create_and_add_object_row(base_object, attributes_string) base_object = "HedPrologue" self._create_and_add_object_row(base_object, description=prologue) def _output_footer(self, epilogue): base_object = "HedEpilogue" self._create_and_add_object_row(base_object, description=epilogue) def _start_section(self, key_class): pass def _end_tag_section(self): self.output[constants.TAG_KEY] = pd.DataFrame(self._tag_rows, columns=constants.tag_columns, dtype=str) def _write_tag_entry(self, tag_entry, parent_node=None, level=0): tag_id = tag_entry.attributes.get(HedKey.HedID, "") new_row = { constants.hed_id: f"{tag_id}", constants.level: f"{level}", constants.name: tag_entry.short_tag_name if not tag_entry.name.endswith("#") else tag_entry.short_tag_name + "-#", constants.subclass_of: self._get_subclass_of(tag_entry), constants.attributes: self._format_tag_attributes(tag_entry.attributes), constants.description: tag_entry.description, constants.equivalent_to: self._get_tag_equivalent_to(tag_entry), } # Todo: do other sections like this as well for efficiency self._tag_rows.append(new_row) def _write_entry(self, entry, parent_node, include_props=True): df_key = section_key_to_df.get(entry.section_key) if not df_key: return # Special case if df_key == HedSectionKey.Properties: return self._write_property_entry(entry) elif df_key == HedSectionKey.Attributes: return self._write_attribute_entry(entry, include_props=include_props) df = self.output[df_key] tag_id = entry.attributes.get(HedKey.HedID, "") new_row = { constants.hed_id: f"{tag_id}", constants.name: entry.name, constants.subclass_of: self._get_subclass_of(entry), constants.attributes: self._format_tag_attributes(entry.attributes), constants.description: entry.description, constants.equivalent_to: self._get_tag_equivalent_to(entry), } # Handle the special case of units, which have the extra unit class if hasattr(entry, "unit_class_entry"): class_entry_name = entry.unit_class_entry.name if self._get_as_ids: class_entry_name = f"{entry.unit_class_entry.attributes.get(constants.hed_id)}" new_row[constants.has_unit_class] = class_entry_name df.loc[len(df)] = new_row pass def _write_attribute_entry(self, entry, include_props): df_key = constants.OBJECT_KEY property_type = "ObjectProperty" if HedKey.AnnotationProperty in entry.attributes: df_key = constants.ANNOTATION_KEY property_type = "AnnotationProperty" elif (HedKey.NumericRange in entry.attributes or HedKey.StringRange in entry.attributes or HedKey.BoolRange in entry.attributes): df_key = constants.DATA_KEY property_type = "DataProperty" hed_id_mapping = { "HedTag": self._get_object_id("HedTag", include_prefix=True), "HedUnit": self._get_object_id("HedUnit", include_prefix=True), "HedUnitClass": self._get_object_id("HedUnitClass", include_prefix=True), "HedUnitModifier": self._get_object_id("HedUnitModifier", include_prefix=True), "HedValueClass": self._get_object_id("HedValueClass", include_prefix=True), "HedElement": self._get_object_id("HedElement", include_prefix=True), "string": "xsd:string", "boolean": "xsd:boolean", "float": "xsd:float" } domain_attributes = { HedKey.TagDomain: "HedTag", HedKey.UnitDomain: "HedUnit", HedKey.UnitClassDomain: "HedUnitClass", HedKey.UnitModifierDomain: "HedUnitModifier", HedKey.ValueClassDomain: "HedValueClass", HedKey.ElementDomain: "HedElement" } range_attributes = { HedKey.StringRange: "string", HedKey.TagRange: "HedTag", HedKey.NumericRange: "float", HedKey.BoolRange: "boolean", HedKey.UnitRange: "HedUnit", HedKey.UnitClassRange: "HedUnitClass", HedKey.ValueClassRange: "HedValueClass" } domain_keys = [key for key in entry.attributes if key in domain_attributes] range_keys = [key for key in entry.attributes if key in range_attributes] if self._get_as_ids: domain_string = " or ".join(hed_id_mapping[domain_attributes[key]] for key in domain_keys) range_string = " or ".join(hed_id_mapping[range_attributes[key]] for key in range_keys) else: domain_string = " or ".join(domain_attributes[key] for key in domain_keys) range_string = " or ".join(range_attributes[key] for key in range_keys) df = self.output[df_key] tag_id = entry.attributes.get(HedKey.HedID, "") new_row = { constants.hed_id: f"{tag_id}", constants.name: entry.name, constants.property_type: property_type, constants.property_domain: domain_string, constants.property_range: range_string, constants.properties: self._format_tag_attributes(entry.attributes) if include_props else "", constants.description: entry.description, } df.loc[len(df)] = new_row def _write_property_entry(self, entry): df_key = constants.ATTRIBUTE_PROPERTY_KEY property_type = "AnnotationProperty" df = self.output[df_key] tag_id = entry.attributes.get(HedKey.HedID, "") new_row = { constants.hed_id: f"{tag_id}", constants.name: entry.name, constants.property_type: property_type, constants.description: entry.description, } df.loc[len(df)] = new_row def _attribute_disallowed(self, attribute): if super()._attribute_disallowed(attribute): return True # strip out hedID in dataframe format return attribute in [HedKey.HedID, HedKey.AnnotationProperty] def _get_header_equivalent_to(self, attributes_string, subclass_of): attribute_strings = [] attributes, _ = text_util._parse_header_attributes_line(attributes_string) schema_name, schema_id = self._get_object_name_and_id("HedSchema", include_prefix=True) if self._get_as_ids: attribute_strings.append(f"(hed:HED_0000102 some {schema_id})") else: attribute_strings.append(f"(inHedSchema some {schema_name})") for attribute, value in attributes.items(): if attribute not in constants.valid_omn_attributes: continue if self._get_as_ids: attribute = f"hed:{constants.valid_omn_attributes[attribute]}" attribute_strings.append(f'({attribute} value "{value}")') if self._get_as_ids: # we just want the ID for normal hed objects, not schema specific subclass_of = self._get_object_id(subclass_of, base_id=0, include_prefix=True) # If they match, we want to leave equivalent_to blank final_out = " and ".join([subclass_of] + attribute_strings) if final_out == subclass_of: return "" return final_out def _get_tag_equivalent_to(self, tag_entry): subclass = self._get_subclass_of(tag_entry) attribute_strings = [] attribute_strings.extend(self._process_attributes(tag_entry)) attribute_strings.extend(self._process_unit_class_entry(tag_entry)) attribute_strings.extend(self._process_schema_parent(tag_entry)) final_out = " and ".join([subclass] + attribute_strings) if final_out == subclass: return "" return final_out def _process_attributes(self, tag_entry): attribute_strings = [] attribute_types = { "object": "some", "data": "value" } range_types = { HedKey.TagRange: HedSectionKey.Tags, HedKey.UnitRange: HedSectionKey.Units, HedKey.UnitClassRange: HedSectionKey.UnitClasses, HedKey.ValueClassRange: HedSectionKey.ValueClasses, HedKey.NumericRange: HedKey.NumericRange } for attribute, value in tag_entry.attributes.items(): attribute_entry = self._schema.attributes.get(attribute) attribute_type = calculate_attribute_type(attribute_entry) if self._attribute_disallowed(attribute) or attribute_type == "annotation": continue values = self._prepare_values(attribute_entry, value, range_types) for v in values: if self._get_as_ids: attribute = f"hed:{attribute_entry.attributes[HedKey.HedID]}" attribute_strings.append(f"({attribute} {attribute_types[attribute_type]} {v})") return attribute_strings def _prepare_values(self, attribute_entry, value, range_types): if isinstance(value, str): values = value.split(",") values = [v.strip() for v in values] found_range = self._find_range(attribute_entry, range_types) if self._get_as_ids and found_range and found_range != HedKey.NumericRange: section = self._schema[found_range] if any(section.get(v) is None for v in values): raise ValueError(f"Cannot find schema entry for {values}") for v in values: test_id = section.get(v).attributes.get(HedKey.HedID) if not test_id: raise ValueError(f"Schema entry {v} has no hedId.") values = [f"hed:{section.get(v).attributes[HedKey.HedID]}" for v in values] elif not found_range: values = [f'"{v}"' for v in values] else: if value is True: value = 'true' values = [value] return values def _find_range(self, attribute_entry, range_types): for range_type in range_types: if range_type in attribute_entry.attributes: return range_types[range_type] return None def _process_unit_class_entry(self, tag_entry): attribute_strings = [] if hasattr(tag_entry, "unit_class_entry"): class_entry_name = tag_entry.unit_class_entry.name if self._get_as_ids: class_entry_name = f"hed:{tag_entry.unit_class_entry.attributes.get(constants.hed_id)}" if self._get_as_ids: attribute_strings.append(f"(hed:HED_0000103 some {class_entry_name})") else: attribute_strings.append(f"({constants.has_unit_class} some {class_entry_name})") return attribute_strings def _process_schema_parent(self, tag_entry): attribute_strings = [] if hasattr(tag_entry, "parent") and not tag_entry.parent: schema_name, schema_id = self._get_object_name_and_id("HedSchema", include_prefix=True) if self._get_as_ids: attribute_strings.append(f"(hed:HED_0000102 some {schema_id})") else: attribute_strings.append(f"(inHedSchema some {schema_name})") return attribute_strings def _get_subclass_of(self, tag_entry): # Special case for HedTag if isinstance(tag_entry, HedTagEntry): if self._get_as_ids: parent_entry = tag_entry.parent if parent_entry: return f"hed:{parent_entry.attributes[HedKey.HedID]}" # HedTag always returns as base object return "hed:HED_0000005" else: return tag_entry.parent.short_tag_name if tag_entry.parent else "HedTag" base_objects = { HedSectionKey.Units: f"HedUnit", HedSectionKey.UnitClasses: f"HedUnitClass", HedSectionKey.UnitModifiers: f"HedUnitModifier", HedSectionKey.ValueClasses: f"HedValueClass" } name, obj_id = self._get_object_name_and_id(base_objects[tag_entry.section_key], include_prefix=True) if self._get_as_ids: return obj_id return name