Source code for dupin.detect.costs

# Copyright (c) 2023-2026 The Regents of the University of Michigan.
# This file is from the dupin project, released under the BSD 3-Clause License.

"""Implements cost functions for use in event detection."""

from abc import ABC, abstractmethod

import numpy as np
import ruptures as rpt
from sklearn import preprocessing


class BaseLinearCost(rpt.base.BaseCost, ABC):
    """Base class for costs using linear fits of features across signal."""

    _metrics = frozenset(("l1", "l2"))
    min_size = 3

    def __init__(self, metric="l1"):
        """Create a CostLinearFit object."""
        if metric not in self._metrics:
            msg = f"Available metrics are {self._metrics}."
            raise ValueError(msg)
        self._metric = getattr(self, "_" + metric)

    def fit(self, signal: np.ndarray):
        """Store signal and compute base errors for later cost checking."""
        if len(signal.shape) == 1:
            signal = signal.reshape((-1, 1))
        self._y = preprocessing.MinMaxScaler().fit_transform(signal).T
        self._x = np.linspace(0, 1, self._y.shape[1], dtype=float)

    def error(self, start: int, end: int):
        """Return the cost for signal[start:end]."""
        m, b = self._get_regression(start, end)
        predicted_y = self._get_predicted(m, b, start, end)
        return self._metric(self._y[:, start:end], predicted_y)

    def _get_predicted(
        self, m: np.ndarray, b: np.ndarray, start: int, end: int
    ):
        return m[:, None] * self._x[None, start:end] + b[:, None]

    @abstractmethod
    def _get_regression(self, start: int, end: int):
        pass

    @staticmethod
    def _l1(y: np.ndarray, predicted_y: np.ndarray):
        return np.sum(np.abs(predicted_y - y))

    @staticmethod
    def _l2(y: np.ndarray, predicted_y: np.ndarray):
        return np.sqrt(np.sum(np.square(predicted_y - y)))

    @property
    def signal(self) -> np.ndarray:
        """:math:`(N_{samples}, N_{dim})` numpy.ndarray of float: signal \
           fitted on.
        """  # noqa: D205
        return self._y.T


[docs] class CostLinearFit(BaseLinearCost): r"""Compute the L1 cumulative error of piecewise linear fits in time. Works with `ruptures`_. Used to compute the relative cumulative L1 deviation from a linear piecewise fit of a signal. .. math:: C(s, e) = \min\limits_{m, b} \sum_i |y_i - (m x_i + b)| :math:`m` and :math:`b` can be vectors in the case of a multidimensional signal (the summation also goes across dimensions. Parameters ---------- metric : `str`, optional What metric to use in computing the error. Defaults to ``"l1"``. Options are ``"l1"`` and ``"l2"``. Note: For use in `ruptures`_ search algorithms. To use properly `fit` must be called first with the signal. """ model = "linear_regression" def _compute_cumsum(self, arr: np.ndarray): """Compute a cumulative sum for use in computing partial sums. Only works for 1 or 2D arrays. """ if arr.ndim > 1: shape = (*arr.shape[:-1], arr.shape[-1] + 1) else: shape = len(arr) + 1 out = np.empty(shape) out[..., 0] = 0 np.cumsum(arr, out=out[..., 1:], axis=arr.ndim - 1) return out
[docs] def fit(self, signal: np.ndarray): """Store signal and compute base errors for later cost checking.""" super().fit(signal) self._x_cumsum = self._compute_cumsum(self._x) self._x_sq_cumsum = self._compute_cumsum(self._x**2) self._xy_cumsum = self._compute_cumsum(self._x[None, :] * self._y) self._y_cumsum = self._compute_cumsum(self._y)
def _get_regression(self, start: int, end: int): """Compute a least squared regression on each dimension. Though never explicitly done the computations follow an X martix with a 1 vector prepended to allow for a non-zero intercept. """ # Given their off by one nature no offset for computing the partial sum # in needed. sum_x = self._x_cumsum[end] - self._x_cumsum[start] sum_x_sq = self._x_sq_cumsum[end] - self._x_sq_cumsum[start] sum_xy = self._xy_cumsum[:, end] - self._xy_cumsum[:, start] sum_y = self._y_cumsum[:, end] - self._y_cumsum[:, start] N = end - start m = (N * sum_xy - (sum_x * sum_y)) / (N * sum_x_sq - (sum_x * sum_x)) b = (sum_y - (m * sum_x)) / N return m, b
# TODO: Fill out documentation.
[docs] class CostLinearBiasedFit(CostLinearFit): """Compute a start to end linear fit and pentalize error and bias. Works with `ruptures`_. """ model = "biased_linear_regression" def _get_regression( self, start: int, end: int ) -> tuple[np.ndarray, np.ndarray]: m = (self._y[:, end - 1] - self._y[:, start]) / ( self._x[None, end - 1] - self._x[None, start] ) b = self._y[:, start] - (m * self._x[None, start]) return m, b