from __future__ import annotations
"""ROCKET, MiniROCKET, and HYDRA time series transformers and classifiers.
This module provides sklearn-compatible wrappers for the ROCKET family of
time series classification methods:
- ROCKET: Random Convolutional Kernel Transform (Dempster et al., 2020)
- MiniROCKET: Faster, almost deterministic variant (Dempster et al., 2021)
- MultiROCKET: Multivariate extension with additional pooling
- HYDRA: Hybrid Dictionary-ROCKET approach (Dempster et al., 2023)
These methods transform time series into features using random/fixed
convolutional kernels, then use a linear classifier (typically Ridge).
Installation
------------
For native implementations:
pip install rocket-learn # Original ROCKET/MiniROCKET
pip install hydra-ts # HYDRA
For sktime wrappers (recommended):
pip install sktime
Examples
--------
>>> from endgame.timeseries import RocketClassifier, MiniRocketClassifier
>>> model = MiniRocketClassifier()
>>> model.fit(X_train, y_train)
>>> predictions = model.predict(X_test)
References
----------
- Dempster, A., et al. (2020). "ROCKET: Exceptionally fast and accurate time
series classification using random convolutional kernels." Data Mining and
Knowledge Discovery, 34(5), 1454-1495.
- Dempster, A., et al. (2021). "MINIROCKET: A very fast (almost) deterministic
transform for time series classification." KDD 2021.
- Dempster, A., et al. (2023). "HYDRA: Competing convolutional kernels for fast
and accurate time series classification." Data Mining and Knowledge Discovery.
"""
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
from sklearn.linear_model import RidgeClassifierCV
from sklearn.preprocessing import StandardScaler
# Check for different backends
try:
from sktime.transformations.panel.rocket import MiniRocket as SktimeMiniRocket
from sktime.transformations.panel.rocket import MiniRocketMultivariate as SktimeMiniRocketMV
from sktime.transformations.panel.rocket import Rocket as SktimeRocket
HAS_SKTIME_ROCKET = True
except ImportError:
HAS_SKTIME_ROCKET = False
try:
from sktime.transformations.panel.rocket import MultiRocket as SktimeMultiRocket
HAS_SKTIME_MULTIROCKET = True
except ImportError:
HAS_SKTIME_MULTIROCKET = False
try:
from sktime.transformations.panel.hydra import Hydra as SktimeHydra
HAS_SKTIME_HYDRA = True
except ImportError:
HAS_SKTIME_HYDRA = False
# Check for native implementations
try:
from rocket_functions import apply_kernels, generate_kernels
HAS_NATIVE_ROCKET = True
except ImportError:
HAS_NATIVE_ROCKET = False
try:
from minirocket import fit as minirocket_fit
from minirocket import transform as minirocket_transform
HAS_NATIVE_MINIROCKET = True
except ImportError:
HAS_NATIVE_MINIROCKET = False
try:
from hydra import Hydra as NativeHydra
HAS_NATIVE_HYDRA = True
except ImportError:
HAS_NATIVE_HYDRA = False
def _check_rocket_available():
"""Check if any ROCKET implementation is available."""
if not (HAS_SKTIME_ROCKET or HAS_NATIVE_ROCKET):
raise ImportError(
"ROCKET requires either sktime or the native rocket implementation. "
"Install with: pip install sktime OR pip install rocket-learn"
)
def _check_minirocket_available():
"""Check if any MiniROCKET implementation is available."""
if not (HAS_SKTIME_ROCKET or HAS_NATIVE_MINIROCKET):
raise ImportError(
"MiniROCKET requires either sktime or the native minirocket implementation. "
"Install with: pip install sktime OR pip install rocket-learn"
)
def _check_hydra_available():
"""Check if any HYDRA implementation is available."""
if not (HAS_SKTIME_HYDRA or HAS_NATIVE_HYDRA):
raise ImportError(
"HYDRA requires either sktime or the native hydra implementation. "
"Install with: pip install sktime OR pip install hydra-ts"
)
def _to_sktime_format(X: np.ndarray) -> np.ndarray:
"""Convert array to sktime's expected 3D format.
sktime expects shape (n_instances, n_channels, n_timepoints) or
(n_instances, n_timepoints) for univariate.
"""
X = np.asarray(X)
if X.ndim == 1:
# Single time series
X = X.reshape(1, 1, -1)
elif X.ndim == 2:
# (n_instances, n_timepoints) -> (n_instances, 1, n_timepoints)
X = X.reshape(X.shape[0], 1, X.shape[1])
elif X.ndim == 3:
# Already in correct format
pass
else:
raise ValueError(f"Expected 1D, 2D, or 3D array, got shape {X.shape}")
return X.astype(np.float32)
[docs]
class RocketClassifier(BaseEstimator, ClassifierMixin):
"""ROCKET classifier for time series classification.
Combines ROCKET transform with RidgeClassifierCV for fast and accurate
time series classification.
Parameters
----------
n_kernels : int, default=10000
Number of random kernels.
normalize : bool, default=True
Whether to normalize input.
alphas : array-like, optional
Regularization values to try in RidgeClassifierCV.
Default: np.logspace(-3, 3, 10).
random_state : int, optional
Random seed.
Attributes
----------
classes_ : ndarray
Unique class labels.
transformer_ : RocketTransformer
Fitted ROCKET transformer.
classifier_ : RidgeClassifierCV
Fitted classifier.
Examples
--------
>>> clf = RocketClassifier(n_kernels=10000)
>>> clf.fit(X_train, y_train)
>>> predictions = clf.predict(X_test)
>>> accuracy = clf.score(X_test, y_test)
Notes
-----
This achieves state-of-the-art accuracy on many UCR datasets while
being orders of magnitude faster than deep learning approaches.
"""
def __init__(
self,
n_kernels: int = 10000,
normalize: bool = True,
alphas: np.ndarray | None = None,
random_state: int | None = None,
):
self.n_kernels = n_kernels
self.normalize = normalize
self.alphas = alphas
self.random_state = random_state
self.classes_ = None
self.transformer_ = None
self.classifier_ = None
self._scaler = None
[docs]
def fit(self, X, y):
"""Fit the ROCKET classifier.
Parameters
----------
X : array-like of shape (n_samples, n_timepoints)
Training time series.
y : array-like of shape (n_samples,)
Target labels.
Returns
-------
self
"""
y = np.asarray(y)
self.classes_ = np.unique(y)
# Create and fit transformer
self.transformer_ = RocketTransformer(
n_kernels=self.n_kernels,
normalize=self.normalize,
random_state=self.random_state,
)
X_transformed = self.transformer_.fit_transform(X)
# Scale features
self._scaler = StandardScaler(with_mean=False)
X_scaled = self._scaler.fit_transform(X_transformed)
# Train classifier
alphas = self.alphas if self.alphas is not None else np.logspace(-3, 3, 10)
self.classifier_ = RidgeClassifierCV(alphas=alphas)
self.classifier_.fit(X_scaled, y)
return self
[docs]
def predict(self, X) -> np.ndarray:
"""Predict class labels.
Parameters
----------
X : array-like
Time series to classify.
Returns
-------
np.ndarray
Predicted labels.
"""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
return self.classifier_.predict(X_scaled)
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities.
Note: RidgeClassifier doesn't have native predict_proba.
This uses decision function with softmax normalization.
Parameters
----------
X : array-like
Time series to classify.
Returns
-------
np.ndarray of shape (n_samples, n_classes)
Class probabilities.
"""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
# Use decision function
decision = self.classifier_.decision_function(X_scaled)
# Convert to probabilities via softmax
if decision.ndim == 1:
# Binary case
proba = 1 / (1 + np.exp(-decision))
return np.column_stack([1 - proba, proba])
else:
# Multiclass case
exp_decision = np.exp(decision - decision.max(axis=1, keepdims=True))
return exp_decision / exp_decision.sum(axis=1, keepdims=True)
[docs]
def score(self, X, y) -> float:
"""Return accuracy score."""
return np.mean(self.predict(X) == y)
[docs]
class MiniRocketClassifier(BaseEstimator, ClassifierMixin):
"""MiniROCKET classifier for time series classification.
Faster variant of ROCKET using fixed kernels. Up to 75x faster
on large datasets while maintaining similar accuracy.
Parameters
----------
n_kernels : int, default=10000
Number of kernels.
alphas : array-like, optional
Regularization values for RidgeClassifierCV.
random_state : int, optional
Random seed.
Examples
--------
>>> clf = MiniRocketClassifier()
>>> clf.fit(X_train, y_train)
>>> accuracy = clf.score(X_test, y_test)
Notes
-----
Recommended as the default ROCKET variant due to speed and
near-deterministic behavior.
"""
def __init__(
self,
n_kernels: int = 10000,
alphas: np.ndarray | None = None,
random_state: int | None = None,
):
self.n_kernels = n_kernels
self.alphas = alphas
self.random_state = random_state
self.classes_ = None
self.transformer_ = None
self.classifier_ = None
self._scaler = None
[docs]
def fit(self, X, y):
"""Fit the MiniROCKET classifier."""
y = np.asarray(y)
self.classes_ = np.unique(y)
self.transformer_ = MiniRocketTransformer(
n_kernels=self.n_kernels,
random_state=self.random_state,
)
X_transformed = self.transformer_.fit_transform(X)
self._scaler = StandardScaler(with_mean=False)
X_scaled = self._scaler.fit_transform(X_transformed)
alphas = self.alphas if self.alphas is not None else np.logspace(-3, 3, 10)
self.classifier_ = RidgeClassifierCV(alphas=alphas)
self.classifier_.fit(X_scaled, y)
return self
[docs]
def predict(self, X) -> np.ndarray:
"""Predict class labels."""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
return self.classifier_.predict(X_scaled)
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities."""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
decision = self.classifier_.decision_function(X_scaled)
if decision.ndim == 1:
proba = 1 / (1 + np.exp(-decision))
return np.column_stack([1 - proba, proba])
else:
exp_decision = np.exp(decision - decision.max(axis=1, keepdims=True))
return exp_decision / exp_decision.sum(axis=1, keepdims=True)
[docs]
def score(self, X, y) -> float:
"""Return accuracy score."""
return np.mean(self.predict(X) == y)
[docs]
class MultiRocketClassifier(BaseEstimator, ClassifierMixin):
"""MultiROCKET classifier for multivariate time series.
Extension of MiniROCKET with additional pooling operations.
Parameters
----------
n_kernels : int, default=10000
Number of kernels.
n_features_per_kernel : int, default=4
Features per kernel.
alphas : array-like, optional
Regularization values.
random_state : int, optional
Random seed.
"""
def __init__(
self,
n_kernels: int = 10000,
n_features_per_kernel: int = 4,
alphas: np.ndarray | None = None,
random_state: int | None = None,
):
self.n_kernels = n_kernels
self.n_features_per_kernel = n_features_per_kernel
self.alphas = alphas
self.random_state = random_state
self.classes_ = None
self.transformer_ = None
self.classifier_ = None
self._scaler = None
[docs]
def fit(self, X, y):
"""Fit the MultiROCKET classifier."""
y = np.asarray(y)
self.classes_ = np.unique(y)
self.transformer_ = MultiRocketTransformer(
n_kernels=self.n_kernels,
n_features_per_kernel=self.n_features_per_kernel,
random_state=self.random_state,
)
X_transformed = self.transformer_.fit_transform(X)
self._scaler = StandardScaler(with_mean=False)
X_scaled = self._scaler.fit_transform(X_transformed)
alphas = self.alphas if self.alphas is not None else np.logspace(-3, 3, 10)
self.classifier_ = RidgeClassifierCV(alphas=alphas)
self.classifier_.fit(X_scaled, y)
return self
[docs]
def predict(self, X) -> np.ndarray:
"""Predict class labels."""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
return self.classifier_.predict(X_scaled)
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities."""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
decision = self.classifier_.decision_function(X_scaled)
if decision.ndim == 1:
proba = 1 / (1 + np.exp(-decision))
return np.column_stack([1 - proba, proba])
else:
exp_decision = np.exp(decision - decision.max(axis=1, keepdims=True))
return exp_decision / exp_decision.sum(axis=1, keepdims=True)
[docs]
def score(self, X, y) -> float:
"""Return accuracy score."""
return np.mean(self.predict(X) == y)
[docs]
class HydraClassifier(BaseEstimator, ClassifierMixin):
"""HYDRA classifier for time series classification.
Combines dictionary methods with ROCKET using competing kernels.
Often more accurate than ROCKET alone.
Parameters
----------
n_kernels : int, default=8
Number of kernels per group.
n_groups : int, default=64
Number of kernel groups.
alphas : array-like, optional
Regularization values.
random_state : int, optional
Random seed.
Examples
--------
>>> clf = HydraClassifier()
>>> clf.fit(X_train, y_train)
>>> accuracy = clf.score(X_test, y_test)
"""
def __init__(
self,
n_kernels: int = 8,
n_groups: int = 64,
alphas: np.ndarray | None = None,
random_state: int | None = None,
):
self.n_kernels = n_kernels
self.n_groups = n_groups
self.alphas = alphas
self.random_state = random_state
self.classes_ = None
self.transformer_ = None
self.classifier_ = None
self._scaler = None
[docs]
def fit(self, X, y):
"""Fit the HYDRA classifier."""
y = np.asarray(y)
self.classes_ = np.unique(y)
self.transformer_ = HydraTransformer(
n_kernels=self.n_kernels,
n_groups=self.n_groups,
random_state=self.random_state,
)
X_transformed = self.transformer_.fit_transform(X)
self._scaler = StandardScaler(with_mean=False)
X_scaled = self._scaler.fit_transform(X_transformed)
alphas = self.alphas if self.alphas is not None else np.logspace(-3, 3, 10)
self.classifier_ = RidgeClassifierCV(alphas=alphas)
self.classifier_.fit(X_scaled, y)
return self
[docs]
def predict(self, X) -> np.ndarray:
"""Predict class labels."""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
return self.classifier_.predict(X_scaled)
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities."""
X_transformed = self.transformer_.transform(X)
X_scaled = self._scaler.transform(X_transformed)
decision = self.classifier_.decision_function(X_scaled)
if decision.ndim == 1:
proba = 1 / (1 + np.exp(-decision))
return np.column_stack([1 - proba, proba])
else:
exp_decision = np.exp(decision - decision.max(axis=1, keepdims=True))
return exp_decision / exp_decision.sum(axis=1, keepdims=True)
[docs]
def score(self, X, y) -> float:
"""Return accuracy score."""
return np.mean(self.predict(X) == y)
[docs]
class HydraMiniRocketClassifier(BaseEstimator, ClassifierMixin):
"""Combined HYDRA + MiniROCKET classifier.
Concatenates features from both HYDRA and MiniROCKET for improved
accuracy. This combination often achieves the best results.
Parameters
----------
hydra_kernels : int, default=8
HYDRA kernels per group.
hydra_groups : int, default=64
HYDRA kernel groups.
minirocket_kernels : int, default=10000
MiniROCKET kernels.
alphas : array-like, optional
Regularization values.
random_state : int, optional
Random seed.
Examples
--------
>>> clf = HydraMiniRocketClassifier()
>>> clf.fit(X_train, y_train)
>>> accuracy = clf.score(X_test, y_test)
Notes
-----
This combination was shown to significantly improve accuracy over
either method alone in the HYDRA paper.
"""
def __init__(
self,
hydra_kernels: int = 8,
hydra_groups: int = 64,
minirocket_kernels: int = 10000,
alphas: np.ndarray | None = None,
random_state: int | None = None,
):
self.hydra_kernels = hydra_kernels
self.hydra_groups = hydra_groups
self.minirocket_kernels = minirocket_kernels
self.alphas = alphas
self.random_state = random_state
self.classes_ = None
self.hydra_transformer_ = None
self.minirocket_transformer_ = None
self.classifier_ = None
self._scaler = None
[docs]
def fit(self, X, y):
"""Fit the combined classifier."""
y = np.asarray(y)
self.classes_ = np.unique(y)
# Fit HYDRA
self.hydra_transformer_ = HydraTransformer(
n_kernels=self.hydra_kernels,
n_groups=self.hydra_groups,
random_state=self.random_state,
)
X_hydra = self.hydra_transformer_.fit_transform(X)
# Fit MiniROCKET
self.minirocket_transformer_ = MiniRocketTransformer(
n_kernels=self.minirocket_kernels,
random_state=self.random_state,
)
X_minirocket = self.minirocket_transformer_.fit_transform(X)
# Concatenate features
X_combined = np.hstack([X_hydra, X_minirocket])
self._scaler = StandardScaler(with_mean=False)
X_scaled = self._scaler.fit_transform(X_combined)
alphas = self.alphas if self.alphas is not None else np.logspace(-3, 3, 10)
self.classifier_ = RidgeClassifierCV(alphas=alphas)
self.classifier_.fit(X_scaled, y)
return self
[docs]
def predict(self, X) -> np.ndarray:
"""Predict class labels."""
X_hydra = self.hydra_transformer_.transform(X)
X_minirocket = self.minirocket_transformer_.transform(X)
X_combined = np.hstack([X_hydra, X_minirocket])
X_scaled = self._scaler.transform(X_combined)
return self.classifier_.predict(X_scaled)
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities."""
X_hydra = self.hydra_transformer_.transform(X)
X_minirocket = self.minirocket_transformer_.transform(X)
X_combined = np.hstack([X_hydra, X_minirocket])
X_scaled = self._scaler.transform(X_combined)
decision = self.classifier_.decision_function(X_scaled)
if decision.ndim == 1:
proba = 1 / (1 + np.exp(-decision))
return np.column_stack([1 - proba, proba])
else:
exp_decision = np.exp(decision - decision.max(axis=1, keepdims=True))
return exp_decision / exp_decision.sum(axis=1, keepdims=True)
[docs]
def score(self, X, y) -> float:
"""Return accuracy score."""
return np.mean(self.predict(X) == y)