Source code for dupin.data.aggregate

# Copyright (c) 2023-2026 The Regents of the University of Michigan.
# This file is from the dupin project, released under the BSD 3-Clause License.

"""Helper module for generating/storing features across an entire trajectory.

This class provides the `SignalAggregator` class which takes a pipeline and
provides methods for storing the output across a trajectory.
"""

from collections import deque
from collections.abc import Iterator
from typing import Any, Optional

import numpy as np

from .. import errors
from . import base, logging

try:
    import pandas as pd
except ImportError:
    pd = errors._RaiseModuleError("pandas")
try:
    import xarray as xa
except ImportError:
    xa = errors._RaiseModuleError("xarray")


[docs] class SignalAggregator: """Using generators computes signals across a trajectory. This class can be used to create appropriate data structures for use in analyzing a whole trajectory with offline methods or iteratively analyzing for online use. See the `compute` and `accumulate` methods for usage. The signal is stored in a deque, which maintains a signal of constant length by forgetting earlier values. This is useful for online detection. Full signal length can be achieved by setting `max_deque_length` to `None`. Parameters ---------- generator : :obj:`dupin.data.base.GeneratorLike` A sequence of signal generators to use for generating the multivariate signal of a trajectory. logger : dupin.data.logging.Logger A logger object to store information about the data processing of max_deque_length : int, optional The maximum length of the deque. If None, the deque will be of infinite size (like a list) and whole signal will be stored. Attributes ---------- generator : :obj:`dupin.data.base.GeneratorLike` The generator which generates data given a trajectory frame. signals : list[dict] The current list of analyzed frames. """
[docs] def __init__( self, generator: base.GeneratorLike, logger: Optional[logging.Logger] = None, max_deque_length=None, ): self.generator = generator self._max_deque_length = max_deque_length if max_deque_length is not None: self.signals = deque(maxlen=max_deque_length) else: self.signals = [] self.logger = logger
[docs] def compute( self, iterator: Iterator[tuple[tuple[Any, ...], dict[str, Any]]] ): """Compute signals from generator across the iterator. These signals are stored internally in ``signals`` until asked for by `to_dataframe` or `to_xarray`. This can be called multiple times, and the stored signals values will be appended. Parameters ---------- iterator: Iterator[Tuple[Tuple[Any,...], Dict[str, Any]]] An object when iterated over, yields args and kwargs compatible with the ``generator_like`` object's call signature. Note ---- Use the `from_base_iterator` staticmethod to convert a standard iterator into one compatible with this method. """ for args, kwargs in iterator: self.accumulate(*args, **kwargs)
[docs] def accumulate(self, *args: Any, **kwargs: Any): r"""Add features from simulation snapshot to object. Allows the addition of individual snapshots to aggregator. This can be useful for online detection or any case where computing the entire trajectory is not possible or not desired or for use in a for loop. Parameters ---------- \*args: Positional arguments to feed to the generator like object. \*\*kwargs: Keyword arguments to feed to the generator like object. """ self.signals.append(self.generator(*args, **kwargs)) if self._logger is not None: self._logger.end_frame()
[docs] def to_dataframe(self) -> "pd.DataFrame": """Return the aggregated signals as a pandas DataFrame. Note: This method requires pandas to be available. Returns ------- signals: pandas.DataFrame The aggregated signals. The columns are features, and the indices correspond to system frames in the order passed to `accumulate` or `compute`. """ if len(self.signals) == 0: return pd.DataFrame() def is_array(v): if hasattr(v, "__len__"): return len(v) > 1 return False # We assume that all frames "look" alike in their structure. If this is # broken than this check will not catch it. However, this breaks one our # assumptions so is outside what we should check for. if any(is_array(value) for value in self.signals[0].values()): msg = ( "Signal is 3 dimensional. ~.to_dataframe requires a 2 " "dimensional signal. Use ~.to_xarray instead." ) raise ValueError(msg) return pd.DataFrame( { col: [frame[col] for frame in self.signals] for col in self.signals[0] } )
[docs] def to_xarray(self, third_dim_name="third_dim") -> "xa.DataArray": """Return the aggregated signal as a `xarray.DataArray`. This method is designed to be used primarily with non-reduced data (e.g. per-particle features). This enables with `XarrayGenerator` to do the mapping/reduction later, attempt multiple reductions, or use the data for purposes outside detection such as visualization or plotting. Note: This method requires `xarray` to be available. Warning: This method only works when all arrays have the same first dimension size. Returns ------- signal: xarray.DataArray The aggregated signal. The first dimension is frames, the second is features, and the third the first dimension of the aggregated features (eg. number of particles). """ feature_order = self.signals[0].keys() data = np.stack( [ np.stack([frame[k].squeeze() for k in feature_order]) for frame in self.signals ] ) return xa.DataArray( data=data, dims=("frame", "feature", third_dim_name), coords={"feature": list(feature_order)}, )
[docs] @staticmethod def from_base_iterator( iterator: Iterator[Any], is_args: bool = False, is_kwargs: bool = False ): """Convert a base iterator into one that works with `compute`. The default behavior is to treat the items of the iterator as a single positional argument. Read the argument list for alternative options. Parameters ---------- iterator : Iterator[Any] The iterator to convert. is_args : :obj:`bool`, optional Whether to treat the iterator objects as positional arguments (i.e. yields tuples). Defaults to False. is_kwargs : :obj:`bool`, optional Whether to treat the iterator objects as keyword arguments (i.e. yields dicts). Defaults to False. Returns ------- new_iterator: Iterator[Tuple[Tuple[Any,...], Dict[str, Any]]] The modified iterator. """ if is_args: return ((arg, {}) for arg in iterator) if is_kwargs: return (((), arg) for arg in iterator) return (((arg,), {}) for arg in iterator)
@property def logger(self): """dupin.data.logging.Logger: Logger for the aggregator.""" return self._logger @property def max_deque_length(self): """int: The maximum length of the deque.""" return self._max_deque_length @logger.setter def logger(self, new_logger): self._logger = new_logger if new_logger is None: try: self.generator.remove_logger() except AttributeError: pass try: self.generator.attach_logger(new_logger) except AttributeError: pass
[docs] class XarrayGenerator(base.Generator): """Generator that converts a frame from xarray to a dupin compatible form. This class is useful to use with `SignalAggregator.to_xarray` to separate the data generation and optionally the mapping step from the reduction step. Parameters ---------- feature_dim : :obj:`str`, optional The name of the feature dimension in the xarray frames, defaults to "feature" (the default of `SignalAggregator.to_xarray`). """
[docs] def __init__(self, feature_dim: str = "feature"): self._feature_dim = feature_dim
[docs] def __call__(self, xarray_frame: "xa.DataArray"): """Convert the xarray object to a dupin pipeline representation. Parameters ---------- xarray_frame : xarray.DataArray The data array for the current frame of the signal. Returns ------- frame : dict [str, numpy.ndarray] The data represented as a dictionary of arrays. """ return { feature: xarray_frame.sel({self._feature_dim: feature}).to_numpy() for feature in map( str, xarray_frame.coords[self._feature_dim].to_numpy() ) }
__all__ = ["SignalAggregator", "XarrayGenerator"]