""" Manager of events of temporal extent. """
import pandas as pd
import bisect
from hed.errors.exceptions import HedFileError
from hed.models.hed_string import HedString
from hed.models.model_constants import DefTagNames
from hed.models import df_util
from hed.models import string_util
from hed.tools.analysis.temporal_event import TemporalEvent
from hed.tools.analysis.hed_type_defs import HedTypeDefs
[docs]class EventManager:
""" Manager of events of temporal extent. """
[docs] def __init__(self, input_data, hed_schema, extra_defs=None):
""" Create an event manager for an events file. Manages events of temporal extent.
Parameters:
input_data (TabularInput): Represents an events file with its sidecar.
hed_schema (HedSchema): HED schema used.
extra_defs (DefinitionDict): Extra definitions not included in the input_data information.
:raises HedFileError:
- if there are any unmatched offsets.
Notes: Keeps the events of temporal extend by their starting index in events file. These events
are separated from the rest of the annotations, which are contained in self.hed_strings.
"""
self.hed_schema = hed_schema
self.input_data = input_data
self.def_dict = input_data.get_def_dict(hed_schema, extra_def_dicts=extra_defs)
if self.input_data.needs_sorting:
raise HedFileError("OnsetsNotOrdered", "The onset values must be non-decreasing", "")
self.onsets = None
self.hed_strings = None
self.event_list = None
self._create_event_list(input_data)
def _create_event_list(self, input_data):
""" Populate the event_list with the events with temporal extent indexed by event number.
Parameters:
input_data (TabularInput): A tabular input that includes its relevant sidecar.
:raises HedFileError:
- If the hed_strings contain unmatched offsets.
Notes:
"""
hed_strings = input_data.series_a
df_util.shrink_defs(hed_strings, self.hed_schema)
delay_df = df_util.split_delay_tags(hed_strings, self.hed_schema, input_data.onsets)
hed_strings = [HedString(hed_string, self.hed_schema) for hed_string in delay_df.HED]
self.onsets = pd.to_numeric(delay_df.onset, errors='coerce')
self.event_list = [[] for _ in range(len(hed_strings))]
onset_dict = {} # Temporary dictionary keeping track of temporal events that haven't ended yet.
for event_index, hed in enumerate(hed_strings):
self._extract_temporal_events(hed, event_index, onset_dict)
self._extract_duration_events(hed, event_index)
# Now handle the events that extend to end of list
for item in onset_dict.values():
item.set_end(len(self.onsets), None)
self.hed_strings = hed_strings
def _extract_duration_events(self, hed, event_index):
groups = hed.find_top_level_tags(anchor_tags={DefTagNames.DURATION_KEY})
to_remove = []
for duration_tag, group in groups:
start_time = self.onsets[event_index]
new_event = TemporalEvent(group, event_index, start_time)
end_time = new_event.end_time
# Todo: This may need updating. end_index==len(self.onsets) in the edge
end_index = bisect.bisect_left(self.onsets, end_time)
new_event.set_end(end_index, end_time)
self.event_list[event_index].append(new_event)
to_remove.append(group)
hed.remove(to_remove)
def _extract_temporal_events(self, hed, event_index, onset_dict):
""" Extract the temporal events and remove them from the other HED strings.
Parameters:
hed (HedString): The assembled HedString at position event_index in the data.
event_index (int): The position of this string in the data.
onset_dict (dict): Running dict that keeps track of temporal events that haven't yet ended.
Note:
This removes the events of temporal extent from HED.
"""
if not hed:
return
group_tuples = hed.find_top_level_tags(anchor_tags={DefTagNames.ONSET_KEY, DefTagNames.OFFSET_KEY},
include_groups=2)
to_remove = []
for def_tag, group in group_tuples:
anchor_tag = group.find_def_tags(recursive=False, include_groups=0)[0]
anchor = anchor_tag.extension.casefold()
if anchor in onset_dict or def_tag == DefTagNames.OFFSET_KEY:
temporal_event = onset_dict.pop(anchor)
temporal_event.set_end(event_index, self.onsets[event_index])
if def_tag == DefTagNames.ONSET_KEY:
new_event = TemporalEvent(group, event_index, self.onsets[event_index])
self.event_list[event_index].append(new_event)
onset_dict[anchor] = new_event
to_remove.append(group)
hed.remove(to_remove)
[docs] def unfold_context(self, remove_types=[]):
""" Unfold the event information into a tuple based on context.
Parameters:
remove_types (list): List of types to remove.
Returns:
list of str or HedString representing the information without the events of temporal extent.
list of str or HedString representing the onsets of the events of temporal extent.
list of str or HedString representing the ongoing context information.
"""
placeholder = ""
remove_defs = self.get_type_defs(remove_types) # definitions corresponding to remove types to be filtered out
new_hed = [placeholder for _ in range(len(self.hed_strings))]
new_base = [placeholder for _ in range(len(self.hed_strings))]
new_contexts = [placeholder for _ in range(len(self.hed_strings))]
base, contexts = self._expand_context()
for index, item in enumerate(self.hed_strings):
new_hed[index] = self._filter_hed(item, remove_types=remove_types,
remove_defs=remove_defs, remove_group=False)
new_base[index] = self._filter_hed(base[index], remove_types=remove_types,
remove_defs=remove_defs, remove_group=True)
new_contexts[index] = self._filter_hed(contexts[index], remove_types=remove_types,
remove_defs=remove_defs, remove_group=True)
return new_hed, new_base, new_contexts # these are each a list of strings
def _expand_context(self):
""" Expand the onset and the ongoing context for additional processing.
Returns:
tuple of lists: (base list of str, context list of str).
Notes: For each event, the Onset goes in the base list and the remainder of the times go in the contexts list.
"""
base = [[] for _ in range(len(self.hed_strings))]
contexts = [[] for _ in range(len(self.hed_strings))]
for events in self.event_list:
for event in events:
this_str = str(event.contents)
base[event.start_index].append(this_str)
for i in range(event.start_index + 1, event.end_index):
contexts[i].append(this_str)
return self.compress_strings(base), self.compress_strings(contexts)
def _filter_hed(self, hed, remove_types=[], remove_defs=[], remove_group=False):
""" Remove types and definitions from a HED string.
Parameters:
hed (string or HedString): The HED string to be filtered.
remove_types (list): List of HED tags to filter as types (usually Task and Condition-variable).
remove_defs (list): List of definition names to filter out.
remove_group (bool): (Default False) Whether to remove the groups included when removing.
Returns:
str: The resulting filtered HED string.
"""
if not hed:
return ""
# Reconvert even if HED is already a HedString to make sure a copy and expandable.
hed_obj = HedString(str(hed), hed_schema=self.hed_schema, def_dict=self.def_dict)
hed_obj, temp1 = string_util.split_base_tags(hed_obj, remove_types, remove_group=remove_group)
if remove_defs:
hed_obj, temp2 = string_util.split_def_tags(hed_obj, remove_defs, remove_group=remove_group)
return str(hed_obj)
[docs] def str_list_to_hed(self, str_list):
""" Create a HedString object from a list of strings.
Parameters:
str_list (list): A list of strings to be concatenated with commas and then converted.
Returns:
HedString or None: The converted list.
"""
filtered_list = [item for item in str_list if item != ''] # list of strings
if not filtered_list: # empty lists don't contribute
return None
return HedString(",".join(filtered_list), self.hed_schema, def_dict=self.def_dict)
[docs] def get_type_defs(self, types):
""" Return a list of definition names (lower case) that correspond to any of the specified types.
Parameters:
types (list or None): List of tags that are treated as types such as 'Condition-variable'
Returns:
list: List of definition names (lower-case) that correspond to the specified types
"""
def_list = []
if not types:
return def_list
for this_type in types:
type_defs = HedTypeDefs(self.def_dict, type_tag=this_type)
def_list = def_list + list(type_defs.def_map.keys())
return def_list
[docs] @staticmethod
def compress_strings(list_to_compress):
""" Compress a list of lists of strings into a single str with comma-separated elements.
Parameters:
list_to_compress (list): List of lists of HED str to turn into a list of single HED strings.
Returns:
list: List of same length as list_to_compress with each entry being a str.
"""
result_list = ["" for _ in range(len(list_to_compress))]
for index, item in enumerate(list_to_compress):
if item:
result_list[index] = ",".join(item)
return result_list