"""Functions and classes for allowing logging extra data from pipelines.Logging serves as the only directly supported means of introspecting into thestate of a pipeline. This allows pipeline components to report to a loggerinformation that otherwise would be discarded as it is not part of the featuregeneration. An example would be the index for the specified least and greatestvalues in `dupin.data.reduce.NthGreatest`. This particular example allows a userafter feature generation to see which particles in a trajectory are chosen eachstep for a feature.Similar to the pipeline proper, the logging infrastructure expects pipelinecomponents to return dictionaries. The data itself is stored in a list whereeach entry is data from one frame of the trajectory parsed. The list elementsare nested dictionaries where the top level is feature names (at the variousstages of modification) and the second level is the class/modifier specificidentifier, and the third and final level is the logging data for that objectfor that frame.Note---- Logging data is not accessible to other parts of the pipeline."""importnumpyasnpfrom..importerrorstry:importpandasaspdexceptImportError:pd=errors._RaiseModuleError("pandas")
[docs]classLogger:"""Class for logging extra information from data pipeline. Stores available metadata from pipeline components. Not all components offer metadata, and those that do document them. """
[docs]def__init__(self):"""Construct a Logger instance."""self._data=[]self._reset()
def_set_context(self,key):"""Set the current distribution to store information on. This sets the name for the current feature being logged. """# is not none or emptyifself._current_context:self._current_frame.setdefault(self._context_key,self._current_context)# Don't duplicate the same key.self._current_context=self._current_frame.get(key,{})self._context_key=key
[docs]def__setitem__(self,key,value):"""Internally store information from data pipeline. This is used to store pipeline component feature specific metadata. """self._current_context[key]=value
[docs]defend_frame(self):"""End the current frame of data. Allows separate by time of data."""# is not none or emptyifself._current_context:self._current_frame.setdefault(self._context_key,self._current_context)self._data.append(self._current_frame)self._reset()
def_reset(self):self._current_frame={}self._current_context=Noneself._current_key=None@propertydefframes(self):r"""`list` [`dict`]: Assess a particular frame of data. The data is a `list` of `dict` where keys are features and values are `dict`\ s with the metadata gathered from the pipeline components. """returnself._data
[docs]defto_dataframe(self):"""Return a `pandas.DataFrame` object consisting of stored data. This uses `pandas.MultiIndex` to map the nested dictionaries to a dataframe. Warning ------- This assumes the pipeline produces homogenous data along a trajectory. Warning ------- This only works for floating point logged values. """frame_data=self._first_non_empty(self._data)ifframe_dataisNone:returnpd.DataFrame()column_index=pd.MultiIndex.from_tuples(_create_column_index(frame_data))# TODO: Extend to other dtypes?data_arr=_log_data_to_array(self._data,np.empty((len(self._data),len(column_index)),dtype=float),)returnpd.DataFrame(data_arr,columns=column_index)
def_create_column_index(log_data):"""Yield tuples of keys for creating a multi-index from a nested dict."""forkey,valueinlog_data.items():ifisinstance(value,dict):forinner_indexin_create_column_index(value):yield(key,*inner_index)else:yield(key,)def_log_data_to_array(data,out):"""Take the output of `Logger.frames` and fills a NumPy array with the data. This is designed to be used with _create_column_index to help the `Logger` create a `pandas.Dataframe`. """defwrite_frame(frame_data,out,index):"""Recursive function which fills a row of an array. Parameters ---------- frame_data: dict | float The data to store. out: numpy.ndarray The array row to fill. index: int An index to write the data out to. This is used with recursion to ensure that each entry is written once and in a non-overlapping location. """forvalueinframe_data.values():ifisinstance(value,dict):index=write_frame(value,out,index)else:out[index]=valueindex+=1returnindexfori,frameinenumerate(data):write_frame(frame,out[i],0)returnout