Source code for dupin.data.reduce

# Copyright (c) 2023-2026 The Regents of the University of Michigan.
# This file is from the dupin project, released under the BSD 3-Clause License.

"""Classes for transforming array quantities into scalar features.

Reduction in ``dupin`` takes an array and _reduces_ it to a set number of scalar
values. A computer science reduction goes from an array to a single value. Our
usage of the term is similar; we just allow for multiple reductions to happen
within the same reducer. Examples of common reducers in the ``dupin`` sense are
the max, min, mean, mode, and standard deviation functions.
"""

import warnings
from typing import Optional

import numpy as np
import numpy.typing as npt

from . import base


[docs] class Percentile(base.DataReducer): """Reduce a distribution into percentile values. The reducers sorts the input array to get the provided percentiles. The reducers then uses the key format f"{percentile}%" to identify it reductions. Parameters ---------- percentiles : `tuple` [ `int` ], optional The percentiles in integer form (i.e. 100% equals 100). By defualt, every 10% increment from 0% to 100% (inclusive) is taken. """
[docs] def __init__(self, percentiles: Optional[tuple[int]] = None) -> None: if percentiles is None: percentiles = (0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100) if len(percentiles) == 0: msg = "Cannot have an empty percentiles sequence." raise ValueError(msg) self._percentiles = np.unique(list(percentiles)) self._quantiles = self._percentiles / 100.0 super().__init__()
[docs] def compute(self, distribution: np.ndarray) -> dict[str, float]: """Return the reduced distribution.""" if len(distribution) == 0: return {} data = {} log = {} non_nan = np.flatnonzero(~np.isnan(distribution)) if non_nan.size == 0: for p in self._percentiles: key = f"{p}%" log[key] = 0 data[key] = np.nan if self._logger is not None and len(log) > 0: self._logger["Percentile"] = log return data cleaned_dist = distribution[non_nan] indices = self._get_indices(len(cleaned_dist)) sorted_indices = np.argsort(cleaned_dist) for i, p in zip(indices, self._percentiles): key = f"{p}%" log[key] = non_nan[sorted_indices[i]] data[key] = cleaned_dist[sorted_indices[i]] if self._logger is not None and len(log) > 0: self._logger["Percentile"] = log return data
def _get_indices(self, distribution_size): """Map to percentiles to [0, len - 1].""" fractional_indices = self._quantiles * (distribution_size - 1) return np.rint(fractional_indices).astype(int)
[docs] class NthGreatest(base.DataReducer): """Reduce a distribution to the Nth greatest values. This reducer returns the greatest and least values from a distribution as specified by the provided indices. Greatest values are specified by positive integers and least by negative, e.g. -1 is the minimum value in the array. The features keys are modified with the index ordinal number and whether it is greatest or least. -1 becomes "1st_least" and 10 becomes "10th_greatest". Parameters ---------- indices : `list` [ `int` ], optional The values to query. 1 is the greatest value in the distribution; 10 the tenth, and so on. Negative number consitute the smallest values in the distribution. -1 is the least value in the distribution. 0 is treated as 1. """
[docs] def __init__(self, indices: tuple[int]) -> None: if len(indices) == 0: msg = "Cannot have an empty indices sequence." raise ValueError(msg) self._indices = self._fix_indices(np.asarray(list(indices))) self._names = [self._index_name(index) for index in self._indices] super().__init__()
[docs] def compute(self, distribution: np.ndarray) -> dict[str, float]: """Return the signals with modified keys.""" if len(distribution) == 0: warnings.warn("Received empty array.", stacklevel=2) return {} nan_mask = np.flatnonzero(~np.isnan(distribution)) filtered_distribution = distribution[nan_mask] sorted_indices = np.argsort(filtered_distribution) log = {} data = {} for i, name in zip(self._indices, self._names): set_to_nan = False if not self._fits(distribution, i): set_to_nan = True warnings.warn( "Not enough elements found for NthGreatest, setting to " "nan.", stacklevel=2, ) if not self._fits(filtered_distribution, i): set_to_nan = True warnings.warn( "Not enough non-nan elements found for NthGreatest, " "setting to nan.", stacklevel=2, ) if set_to_nan: data[name] = np.nan log[name] = np.nan continue data[name] = filtered_distribution[sorted_indices[i]] log[name] = nan_mask[sorted_indices[i]] if self._logger is not None and len(log) > 0: self._logger["NthGreatest"] = log return data
@staticmethod def _fits(dist: np.array, index: int): if index < 0: return -index <= len(dist) return index < len(dist) @staticmethod def _index_name(index: int) -> str: if index >= 0: index += 1 type_ = "least" if index > 0 else "greatest" abs_index = abs(index) unit_value = abs_index % 10 # add appropriate suffix if unit_value == 1: suffix = "st" elif unit_value == 2: # noqa: PLR2004 suffix = "nd" elif unit_value == 3: # noqa: PLR2004 suffix = "rd" else: suffix = "th" return f"{abs_index}{suffix}_{type_}" @staticmethod def _fix_indices(indices: list[int]) -> list[int]: neg_indices = -indices return np.unique(np.where(indices > 0, neg_indices, neg_indices - 1))
[docs] class Tee(base.DataReducer): """Enable mutliple reducers to act on the same generator like object. Each reducer is run on the original distribution and their reductions are concatenated. This reducer does not create its own reductions or corresponding keys. Parameters ---------- reducers: `list` [`dupin.data.base.DataReducer`] A sequence of a data reducers. """
[docs] def __init__( self, reducers: list[base.DataReducer], ): if len(reducers) == 0: msg = "Cannot have empty reducers sequence." raise ValueError(msg) self._reducers = reducers super().__init__()
[docs] def compute(self, distribution: npt.ArrayLike) -> dict[str, float]: """Run all composed reducer computes.""" processed_data = {} for reducer in self._reducers: processed_data.update(reducer.compute(distribution)) return processed_data
[docs] def attach_logger(self, logger): """Add a logger to this step in the data pipeline. Parameters ---------- logger: dupin.data.logging.Logger A logger object to store data from the data pipeline for individual elements of the composed maps. """ self._logger = logger for reducer in self._reducers: try: reducer.attach_logger(logger) # Do nothing if generator does not have attach_logger logger # function (e.g. custom map function). except AttributeError: pass
[docs] def remove_logger(self): """Remove a logger from this step in the pipeline if it exists.""" self._logger = None for reducer in self._reducers: try: reducer.remove_logger() # Do nothing if generator does not have remove_logger logger # function (e.g. custom reducer function). except AttributeError: pass
def _decorate(self, generator: base.GeneratorLike): self._generator = generator for reducer in self._reducers: reducer._decorate(generator)
CustomReducer = base.CustomReducer
[docs] def reduce_(func): """Add the reduce step to the current pipeline. Note: This is for the decorator syntax for creating pipelines. Note: This uses `CustomReducer`. Parameters ---------- func : ``callable`` The function to use for reducing. """ return CustomReducer(func)