# Copyright (c) 2023-2026 The Regents of the University of Michigan.
# This file is from the dupin project, released under the BSD 3-Clause License.
"""Classes for use in utilizing supervised learning for event detection."""
from collections.abc import Sequence
from typing import Callable, Optional
import numpy as np
import pandas as pd
from dupin import errors
try:
import sklearn as sk
except ImportError:
sk = errors._RaiseModuleError("sklearn")
def _str_isinstance(obj, classes):
name = obj.__class__.__module__ + "." + obj.__class__.__name__
for cls in classes:
if name.endswith(cls):
return True
return False
[docs]
def window_iter(seq: Sequence, window_size: int) -> Sequence:
"""Iterate over a sequence in slices of length window_size.
Parameters
----------
seq: list [``any``]
The sequence to yield windows of.
window_size: int
The size of window iter iterator over.
Yields
------
window: list [ ``any`` ]
The current window of the original data.
"""
L = len(seq)
for i, j in zip(range(0, L - window_size + 1), range(window_size, L + 1)):
yield seq[i:j]
[docs]
class Window:
"""Computes the error of a classifier discerning between halves of a window.
The class implements a generic way of discerning the similiarity between
nearby sections in a sequence through the use of a rolling window and
machine learning classifiers. The class then outputs this similarity as a
single dimension regardless of input size.
The procedure is take a sliding window of a set size across the traectory.
For each window, the left half is labeled as class 0 and the right as class
one. The class then trains one or more weak classifiers for each window on a
subset of points. The test loss on the remaining points is then
aggregated across the classifiers and recorded. This testing loss is the
single dimension representation of local signal similarity with higher
values indicating dissimiliarity.
Note:
The returned signal will be smaller by ``window_size - 1`` than the
original signal.
Warning:
For this to be useful, a *weak* classifier must be chosen. A weak
classifier is one that has low discrimination ability. This prevents the
training on noise between window halves. For small and intermediate
window sizes, most classifiers will find noise that can (nearly)
perfectly discriminate the halves of the window.
Parameters
----------
classifier : sklearn.base.ClassifierMixin
A sklearn compatible classifier that is ready to fit to data.
window_size : int
The size of windows to learn on, should be a even number for best
results.
test_size : float
Fraction of samples to use for computing the error through the loss
function. This fraction is not fitted on.
loss_function : ``callable`` [[`sklearn.base.ClassifierMixin`, \
`numpy.ndarray`, `numpy.ndarray`], \
`float`], optional
A callable that takes in the fitted classifier, the test x and test y
values and returns a loss (lower is better). By default this computes
the zero-one loss if sklearn is available, otherwise this errors.
store_intermediate_classifiers : `bool`, optional
Whether to store the fitted classifier for each window in the sequence
passed to `compute`. Defaults to False. **Warning**: If the classifier
stores some or all of the sequence in fitting as is the case for
kernelized classifiers, this optional will lead to significant
increase in use of memory.
n_classifiers : `int`, optional
The number of classifiers and test train splits to use per window,
defaults to 1. Higher numbers naturally smooth the error across a
trajectory.
combine_errors : `str`, optional
What function to reduce the errors of ``n_classifiers`` with, defauts to
"mean". Available values are "mean" and "median".
"""
[docs]
def __init__(
self,
classifier: "sk.base.ClassifierMixin",
window_size: int,
test_size: float,
loss_function: Optional[
Callable[["sk.base.ClassifierMixin", np.ndarray, np.ndarray], float]
] = None,
store_intermediate_classifiers: bool = False,
n_classifiers: int = 1,
combine_errors: str = "mean",
) -> None:
self.classifier = classifier
self.window_size = window_size
self.test_size = test_size
if loss_function is None:
loss_function = self._default_loss
self.loss_function = loss_function
self.store_intermediate_classifiers = store_intermediate_classifiers
self.n_classifiers = n_classifiers
self.combine_errors = combine_errors
@property
def window_size(self):
"""int: The size of windows to learn on."""
return self._window_size
@window_size.setter
def window_size(self, value):
if value < 2: # noqa: PLR2004
msg = "window_size must be greater than 1."
raise ValueError(msg)
self._window_size = value
@property
def store_intermediate_classifiers(self):
"""bool: Whether to store the classifiers for each window.
If ``True`` the classifiers are stored in ``classifiers_`` after calling
``compute``.
"""
return self._store_intermediate_classifiers
@store_intermediate_classifiers.setter
def store_intermediate_classifiers(self, value):
if not isinstance(value, bool):
msg = "Expected bool for store_intermediate_classifiers."
raise TypeError(msg)
self._store_intermediate_classifiers = value
@property
def loss_function(self):
"""``callable`` [[ `sklearn.base.ClassifierMixin`, `numpy.ndarray`, \
`numpy.ndarray` ], `float` ]: Returns the loss for a fitted \
classifier given the test x and y.
""" # noqa: D205
return self._loss_function
@loss_function.setter
def loss_function(self, value):
if not callable(value):
msg = "loss_function must be callable."
raise TypeError(msg)
self._loss_function = value
@property
def test_size(self):
"""float: Fraction of samples to use for computing the error."""
return self._test_size
@test_size.setter
def test_size(self, value):
if value <= 0.0 or value >= 1.0:
msg = "test_size must be between 0 and 1."
raise ValueError(msg)
self._test_size = value
@property
def n_classifiers(self):
"""int: Number of classifiers and test-train splits per window.
Higher numbers naturally smooth the error across a trajectory.
"""
return self._n_classifiers
@n_classifiers.setter
def n_classifiers(self, value):
if value < 1:
msg = "n_classifiers must be greater than 0."
raise ValueError(msg)
self._n_classifiers = value
@property
def combine_errors(self):
"""str: What function to reduce the errors of ``n_classifiers`` with.
Available values are "mean" and "median".
"""
return self._combine_errors
@combine_errors.setter
def combine_errors(self, value):
if value not in ("mean", "median"):
msg = "combine_errors must be in ('mean', 'median')."
raise ValueError(msg)
self._combine_errors = value
@property
def _reduce(self):
return np.mean if self.combine_errors == "mean" else np.median
[docs]
def compute(self, X: np.ndarray) -> np.ndarray:
"""Compute the loss for classifiers trained on discerning window halves.
Parameters
----------
X : (:math:`T`, :math:`N_f`) np.ndarray
An NumPy array where the first dimension is time or sequence
progression and the second is features.
Returns
-------
errors : list
Returns the list of loss function values for each window in ``X``.
"""
if isinstance(X, pd.core.frame.DataFrame):
return self.compute(X.to_numpy())
errors = []
if self.store_intermediate_classifiers:
self.classifiers_ = []
y = np.repeat([0, 1], np.ceil(self.window_size / 2))[: self.window_size]
shuffle_splits = sk.model_selection.StratifiedShuffleSplit(
n_splits=self.n_classifiers, test_size=self.test_size
)
for x in window_iter(X, self.window_size):
if self.store_intermediate_classifiers:
self.classifiers_.append([])
slice_errors = []
for train_indices, test_indices in shuffle_splits.split(x, y):
self.classifier.fit(x[train_indices], y[train_indices])
slice_errors.append(
self._loss_function(
self.classifier,
x[test_indices],
y[test_indices],
)
)
# If storing intermediate classifiers clone the classifier to
# ensure we train/fit on a new identical model.
if self.store_intermediate_classifiers:
self.classifiers_[-1].append(self.classifier)
self.classifier = sk.base.clone(self.classifier)
errors.append(self._reduce(slice_errors))
self.errors = np.array(errors)
return self.errors
@staticmethod
def _default_loss(classifier, x, y):
return sk.metrics.zero_one_loss(y, classifier.predict(x))