"""
This module is used to create a HedSchema object from an OWL file or graph.
"""
from hed.errors.exceptions import HedFileError, HedExceptions
from hed.schema.hed_schema_constants import HedSectionKey, HedKey
from hed.schema import schema_validation_util
from .base2schema import SchemaLoader
import rdflib
from rdflib.exceptions import ParserError
from rdflib import Graph, RDF, RDFS, Literal, URIRef, OWL, XSD
from collections import defaultdict
from hed.schema.schema_io.owl_constants import HED, HEDT, HEDU, HEDUM
[docs]class SchemaLoaderOWL(SchemaLoader):
""" Loads XML schemas from filenames or strings.
Expected usage is SchemaLoaderXML.load(filename)
SchemaLoaderXML(filename) will load just the header_attributes
"""
[docs] def __init__(self, filename, schema_as_string=None, schema=None, file_format=None, name=""):
if schema_as_string and not file_format:
raise HedFileError(HedExceptions.BAD_PARAMETERS,
"Must pass a file_format if loading owl schema as a string.",
name)
super().__init__(filename, schema_as_string, schema, file_format, name)
self._schema.source_format = ".owl"
self.graph = None
# When loading, this stores rooted tag name -> full root path pairs
self._rooted_cache = {}
def _open_file(self):
"""Parses a Turtle/owl/etc file and returns the RDF graph."""
graph = rdflib.Graph()
try:
if self.filename:
graph.parse(self.filename, format=self.file_format)
else:
graph.parse(data=self.schema_as_string, format=self.file_format)
except FileNotFoundError as fnf_error:
raise HedFileError(HedExceptions.FILE_NOT_FOUND, str(fnf_error), self.name)
except ParserError as parse_error:
raise HedFileError(HedExceptions.CANNOT_PARSE_RDF, str(parse_error), self.name)
return graph
def _read_prologue(self):
"""Reads the Prologue section from the ontology."""
prologue = self.graph.value(subject=HED.Prologue, predicate=HED.elementValue, any=False)
return str(prologue) if prologue else ""
def _read_epilogue(self):
"""Reads the Epilogue section from the ontology."""
epilogue = self.graph.value(subject=HED.Epilogue, predicate=HED.elementValue, any=False)
return str(epilogue) if epilogue else ""
def _get_header_attributes(self, graph):
"""Parses header attributes from an RDF graph into a dictionary."""
header_attributes = {}
for s, _, _ in graph.triples((None, RDF.type, HED.HeaderMember)):
label = graph.value(s, RDFS.label)
if label:
header_attribute = graph.value(s, HED.HeaderAttribute)
header_attributes[str(label)] = str(header_attribute) if header_attribute else None
return header_attributes
def _parse_data(self):
self.graph = self.input_data
self.graph.bind("hed", HED)
self.graph.bind("hedt", HEDT)
self.graph.bind("hedu", HEDU)
self.graph.bind("hedum", HEDUM)
self._schema.epilogue = self._read_epilogue()
self._schema.prologue = self._read_prologue()
self._get_header_attributes(self.graph)
self._read_properties()
self._read_attributes()
self._read_units()
self._read_section(HedSectionKey.ValueClasses, HED.HedValueClass)
self._read_section(HedSectionKey.UnitModifiers, HED.HedUnitModifier)
self._read_tags()
breakHere = 3
[docs] def get_local_names_from_uris(parent_chain, tag_uri):
"""
Extracts local names from URIs using RDFlib's n3() method.
"""
full_names = []
for uri in parent_chain + [tag_uri]:
# Serialize the URI into N3 format and extract the local name
name = uri.n3(namespace_manager=HED.namespace_manager).split(':')[-1]
full_names.append(name)
return full_names
[docs] def sort_classes_by_hierarchy(self, classes):
"""
Sorts all tags based on assembled full name
Returns:
list of tuples.
Left Tag URI, right side is parent labels(not including self)
"""
parent_chains = []
full_tag_names = []
for tag_uri in classes:
parent_chain = self._get_parent_chain(tag_uri)
parent_chain = [uri.n3(namespace_manager=self.graph.namespace_manager).split(':')[-1] for uri in parent_chain + [tag_uri]]
# parent_chain = [self.graph.value(p, RDFS.label) or p for p in parent_chain + [tag_uri]]
full_tag_names.append("/".join(parent_chain))
parent_chains.append((tag_uri, parent_chain[:-1]))
# Sort parent_chains by full_tag_names.
_, parent_chains = zip(*sorted(zip(full_tag_names, parent_chains)))
return parent_chains
def _get_parent_chain(self, cls):
""" Recursively builds the parent chain for a given class. """
parent = self.graph.value(subject=cls, predicate=HED.hasHedParent)
if parent is None:
return []
return self._get_parent_chain(parent) + [parent]
def _parse_uri(self, uri, key_class, name=None):
if name:
label = name
else:
label = self.graph.value(subject=uri, predicate=RDFS.label)
if not label:
raise ValueError(f"Empty label value found in owl file in uri {uri}")
label = str(label)
tag_entry = self._schema._create_tag_entry(label, key_class)
description = self.graph.value(subject=uri, predicate=RDFS.comment)
if description:
tag_entry.description = str(description)
section = self._schema._sections[key_class]
valid_attributes = section.valid_attributes
new_values = defaultdict(list)
for predicate, obj in self.graph.predicate_objects(subject=uri):
# Convert predicate URI to a readable string, assuming it's in a known namespace
attr_name = predicate.n3(self.graph.namespace_manager).split(':')[1]
if attr_name in valid_attributes:
if isinstance(obj, URIRef):
attr_value = obj.n3(self.graph.namespace_manager).split(':')[1]
else:
attr_value = str(obj)
new_values[attr_name].append(attr_value)
for name, value in new_values.items():
value = ",".join(value)
if value == "true":
value = True
tag_entry._set_attribute_value(name, value)
return tag_entry
def _get_classes_with_subproperty(self, subproperty_uri, base_type):
"""Iterates over all classes that have a specified rdfs:subPropertyOf."""
classes = set()
for s in self.graph.subjects(RDF.type, base_type):
if (s, RDFS.subPropertyOf, subproperty_uri) in self.graph:
classes.add(s)
return classes
def _get_all_subclasses(self, base_type):
"""
Recursively finds all subclasses of the given base_type.
"""
subclasses = set()
for subclass in self.graph.subjects(RDFS.subClassOf, base_type):
subclasses.add(subclass)
subclasses.update(self._get_all_subclasses(subclass))
return subclasses
def _get_classes(self, base_type):
"""
Retrieves all instances of the given base_type, including instances of its subclasses.
"""
classes = set()
# Add instances of the base type
for s in self.graph.subjects(RDF.type, base_type):
classes.add(s)
# Add instances of all subclasses
for subclass in self._get_all_subclasses(base_type):
for s in self.graph.subjects(RDF.type, subclass):
classes.add(s)
return classes
def _read_properties(self):
key_class = HedSectionKey.Properties
self._schema._initialize_attributes(key_class)
prop_uris = self._get_classes_with_subproperty(HED.schemaProperty, OWL.AnnotationProperty)
for uri in prop_uris:
new_entry = self._parse_uri(uri, key_class)
self._add_to_dict(new_entry, key_class)
def _read_attributes(self):
key_class = HedSectionKey.Attributes
self._schema._initialize_attributes(key_class)
prop_uris = self._get_classes_with_subproperty(HED.schemaAttributeDatatypeProperty, OWL.DatatypeProperty)
prop_uris.update(self._get_classes_with_subproperty(HED.schemaAttributeObjectProperty, OWL.ObjectProperty))
for uri in prop_uris:
new_entry = self._parse_uri(uri, key_class)
self._add_to_dict(new_entry, key_class)
def _read_section(self, key_class, node_uri):
self._schema._initialize_attributes(key_class)
classes = self._get_classes(node_uri)
for uri in classes:
new_entry = self._parse_uri(uri, key_class)
self._add_to_dict(new_entry, key_class)
def _read_units(self):
self._schema._initialize_attributes(HedSectionKey.UnitClasses)
self._schema._initialize_attributes(HedSectionKey.Units)
key_class = HedSectionKey.UnitClasses
classes = self._get_classes(HED.HedUnitClass)
unit_classes = {}
for uri in classes:
new_entry = self._parse_uri(uri, key_class)
self._add_to_dict(new_entry, key_class)
unit_classes[uri] = new_entry
key_class = HedSectionKey.Units
units = self._get_classes(HED.HedUnit)
for uri in units:
new_entry = self._parse_uri(uri, key_class)
self._add_to_dict(new_entry, key_class)
unit_class_uri = self.graph.value(subject=uri, predicate=HED.unitClass)
class_entry = unit_classes.get(unit_class_uri)
class_entry.add_unit(new_entry)
breakHere = 3
def _add_tag_internal(self, uri, parent_tags):
tag_name = self.graph.value(uri, RDFS.label)
if not tag_name:
raise ValueError(f"No label for uri {uri}")
tag_name = str(tag_name)
parents_and_child = parent_tags + [tag_name]
if parent_tags and parents_and_child[0] in self._rooted_cache:
full_tag = "/".join([self._rooted_cache[parents_and_child[0]]] + parents_and_child[1:])
else:
full_tag = "/".join(parents_and_child)
tag_entry = self._parse_uri(uri, HedSectionKey.Tags, full_tag)
rooted_entry = schema_validation_util.find_rooted_entry(tag_entry, self._schema, self._loading_merged)
if rooted_entry:
loading_from_chain = rooted_entry.name + "/" + tag_entry.short_tag_name
loading_from_chain_short = tag_entry.short_tag_name
self._rooted_cache[tag_entry.short_tag_name] = loading_from_chain
full_tag = full_tag.replace(loading_from_chain_short, loading_from_chain)
tag_entry = self._parse_uri(uri, HedSectionKey.Tags, full_tag)
self._add_to_dict(tag_entry, HedSectionKey.Tags)
def _read_tags(self):
"""Populates a dictionary of dictionaries associated with tags and their attributes."""
classes = self._get_classes(HED.HedTag)
classes.update(self._get_classes(HED.HedPlaceholder))
sorted_classes = self.sort_classes_by_hierarchy(classes)
self._schema._initialize_attributes(HedSectionKey.Tags)
for uri, parents in sorted_classes:
self._add_tag_internal(uri, parents)
def _add_to_dict(self, entry, key_class):
if entry.has_attribute(HedKey.InLibrary) and not self._loading_merged and not self.appending_to_schema:
raise HedFileError(HedExceptions.IN_LIBRARY_IN_UNMERGED,
f"Library tag in unmerged schema has InLibrary attribute",
self.name)
return self._add_to_dict_base(entry, key_class)