Source code for dupin.detect.costs

"""Implements cost functions for use in event detection."""

from abc import ABC, abstractmethod

import numpy as np
import ruptures as rpt
from sklearn import preprocessing


class BaseLinearCost(rpt.base.BaseCost, ABC):
    """Base class for costs using linear fits of features across signal."""

    _metrics = frozenset(("l1", "l2"))
    min_size = 3

    def __init__(self, metric="l1"):
        """Create a CostLinearFit object."""
        if metric not in self._metrics:
            msg = f"Available metrics are {self._metrics}."
            raise ValueError(msg)
        self._metric = getattr(self, "_" + metric)

    def fit(self, signal: np.ndarray):
        """Store signal and compute base errors for later cost checking."""
        if len(signal.shape) == 1:
            signal = signal.reshape((-1, 1))
        self._y = preprocessing.MinMaxScaler().fit_transform(signal).T
        self._x = np.linspace(0, 1, self._y.shape[1], dtype=float)

    def error(self, start: int, end: int):
        """Return the cost for signal[start:end]."""
        m, b = self._get_regression(start, end)
        predicted_y = self._get_predicted(m, b, start, end)
        return self._metric(self._y[:, start:end], predicted_y)

    def _get_predicted(
        self, m: np.ndarray, b: np.ndarray, start: int, end: int
    ):
        return m[:, None] * self._x[None, start:end] + b[:, None]

    @abstractmethod
    def _get_regression(self, start: int, end: int):
        pass

    @staticmethod
    def _l1(y: np.ndarray, predicted_y: np.ndarray):
        return np.sum(np.abs(predicted_y - y))

    @staticmethod
    def _l2(y: np.ndarray, predicted_y: np.ndarray):
        return np.sqrt(np.sum(np.square(predicted_y - y)))

    @property
    def signal(self) -> np.ndarray:
        """:math:`(N_{samples}, N_{dim})` numpy.ndarray of float: signal \
           fitted on.
        """  # noqa: D205
        return self._y.T


[docs] class CostLinearFit(BaseLinearCost): r"""Compute the L1 cumulative error of piecewise linear fits in time. Works with `ruptures`_. Used to compute the relative cumulative L1 deviation from a linear piecewise fit of a signal. .. math:: C(s, e) = \min\limits_{m, b} \sum_i |y_i - (m x_i + b)| :math:`m` and :math:`b` can be vectors in the case of a multidimensional signal (the summation also goes across dimensions. Parameters ---------- metric : `str`, optional What metric to use in computing the error. Defaults to ``"l1"``. Options are ``"l1"`` and ``"l2"``. Note: For use in `ruptures`_ search algorithms. To use properly `fit` must be called first with the signal. """ model = "linear_regression" def _compute_cumsum(self, arr: np.ndarray): """Compute a cumulative sum for use in computing partial sums. Only works for 1 or 2D arrays. """ if arr.ndim > 1: shape = arr.shape[:-1] + (arr.shape[-1] + 1,) else: shape = len(arr) + 1 out = np.empty(shape) out[..., 0] = 0 np.cumsum(arr, out=out[..., 1:], axis=arr.ndim - 1) return out
[docs] def fit(self, signal: np.ndarray): """Store signal and compute base errors for later cost checking.""" super().fit(signal) self._x_cumsum = self._compute_cumsum(self._x) self._x_sq_cumsum = self._compute_cumsum(self._x**2) self._xy_cumsum = self._compute_cumsum(self._x[None, :] * self._y) self._y_cumsum = self._compute_cumsum(self._y)
def _get_regression(self, start: int, end: int): """Compute a least squared regression on each dimension. Though never explicitly done the computations follow an X martix with a 1 vector prepended to allow for a non-zero intercept. """ # Given their off by one nature no offset for computing the partial sum # in needed. sum_x = self._x_cumsum[end] - self._x_cumsum[start] sum_x_sq = self._x_sq_cumsum[end] - self._x_sq_cumsum[start] sum_xy = self._xy_cumsum[:, end] - self._xy_cumsum[:, start] sum_y = self._y_cumsum[:, end] - self._y_cumsum[:, start] N = end - start m = (N * sum_xy - (sum_x * sum_y)) / (N * sum_x_sq - (sum_x * sum_x)) b = (sum_y - (m * sum_x)) / N return m, b
# TODO: Fill out documentation.
[docs] class CostLinearBiasedFit(CostLinearFit): """Compute a start to end linear fit and pentalize error and bias. Works with `ruptures`_. """ model = "biased_linear_regression" def _get_regression( self, start: int, end: int ) -> tuple[np.ndarray, np.ndarray]: m = (self._y[:, end - 1] - self._y[:, start]) / ( self._x[None, end - 1] - self._x[None, start] ) b = self._y[:, start] - (m * self._x[None, start]) return m, b