Source code for endgame.timeseries.features
from __future__ import annotations
"""Time series feature extraction.
This module provides feature extraction utilities for time series data,
designed to feed into GBDT models for Kaggle competitions.
Integrates with tsfresh for comprehensive feature extraction and provides
custom extractors optimized for competition use.
Installation
------------
pip install tsfresh
Examples
--------
>>> from endgame.timeseries import TSFreshFeatureExtractor
>>> extractor = TSFreshFeatureExtractor(preset="efficient")
>>> features = extractor.fit_transform(time_series_df, column_id="id", column_sort="time")
"""
from typing import Literal
import numpy as np
try:
import polars as pl
HAS_POLARS = True
except ImportError:
HAS_POLARS = False
try:
import pandas as pd
HAS_PANDAS = True
except ImportError:
HAS_PANDAS = False
# Check for tsfresh availability
try:
import tsfresh
from tsfresh import extract_features, select_features
from tsfresh.feature_extraction import (
ComprehensiveFCParameters,
EfficientFCParameters,
MinimalFCParameters,
)
from tsfresh.utilities.dataframe_functions import impute
HAS_TSFRESH = True
except ImportError:
HAS_TSFRESH = False
from sklearn.base import BaseEstimator, TransformerMixin
def _check_tsfresh():
"""Raise ImportError if tsfresh is not installed."""
if not HAS_TSFRESH:
raise ImportError(
"tsfresh is required for TSFreshFeatureExtractor. "
"Install with: pip install tsfresh"
)
[docs]
class TSFreshFeatureExtractor(BaseEstimator, TransformerMixin):
"""Sklearn-compatible tsfresh feature extractor.
Wraps tsfresh's feature extraction with automatic relevance filtering
and imputation.
Parameters
----------
preset : str, default="efficient"
Feature calculation preset:
- "minimal": ~10 features per column (fast)
- "efficient": ~100 features per column (balanced)
- "comprehensive": ~800 features per column (slow but thorough)
column_id : str, default="id"
Name of the column containing series identifiers.
column_sort : str, optional
Name of the column to sort by (e.g., timestamp).
column_value : str, optional
Name of the column containing values (if single-column series).
select_relevant : bool, default=True
Whether to filter features by relevance to target.
fdr_level : float, default=0.05
False discovery rate for feature selection.
n_jobs : int, default=-1
Number of parallel jobs (-1 for all cores).
show_warnings : bool, default=False
Whether to show tsfresh warnings.
Attributes
----------
selected_features_ : List[str]
Names of selected features after fitting.
feature_settings_ : dict
Feature calculation settings used.
Examples
--------
>>> # Basic usage
>>> extractor = TSFreshFeatureExtractor(preset="efficient")
>>> features = extractor.fit_transform(df, y=target)
>>> # Without relevance filtering
>>> extractor = TSFreshFeatureExtractor(select_relevant=False)
>>> features = extractor.transform(df)
"""
def __init__(
self,
preset: Literal["minimal", "efficient", "comprehensive"] = "efficient",
column_id: str = "id",
column_sort: str | None = None,
column_value: str | None = None,
select_relevant: bool = True,
fdr_level: float = 0.05,
n_jobs: int = -1,
show_warnings: bool = False,
):
self.preset = preset
self.column_id = column_id
self.column_sort = column_sort
self.column_value = column_value
self.select_relevant = select_relevant
self.fdr_level = fdr_level
self.n_jobs = n_jobs
self.show_warnings = show_warnings
self.selected_features_: list[str] | None = None
self.feature_settings_: dict | None = None
self._is_fitted = False
def _get_feature_settings(self):
"""Get feature calculation settings based on preset."""
_check_tsfresh()
if self.preset == "minimal":
return MinimalFCParameters()
elif self.preset == "efficient":
return EfficientFCParameters()
elif self.preset == "comprehensive":
return ComprehensiveFCParameters()
else:
raise ValueError(f"Unknown preset: {self.preset}")
def _to_pandas(self, X):
"""Convert input to pandas DataFrame."""
if HAS_POLARS and isinstance(X, (pl.DataFrame, pl.LazyFrame)):
if isinstance(X, pl.LazyFrame):
X = X.collect()
return X.to_pandas()
elif HAS_PANDAS and isinstance(X, pd.DataFrame):
return X
else:
raise TypeError(
f"Expected DataFrame, got {type(X)}. "
"TSFreshFeatureExtractor requires tabular data."
)
[docs]
def fit(
self,
X,
y=None,
**fit_params,
) -> TSFreshFeatureExtractor:
"""Fit the feature extractor.
Parameters
----------
X : DataFrame
Input data with columns for id, sort, and values.
y : array-like, optional
Target variable for relevance filtering.
Required if select_relevant=True.
Returns
-------
self
Fitted extractor.
"""
_check_tsfresh()
df = self._to_pandas(X)
# Get feature settings
self.feature_settings_ = self._get_feature_settings()
# Extract features
features = extract_features(
df,
column_id=self.column_id,
column_sort=self.column_sort,
column_value=self.column_value,
default_fc_parameters=self.feature_settings_,
n_jobs=self.n_jobs if self.n_jobs > 0 else None,
disable_progressbar=not self.show_warnings,
)
# Impute missing values
features = impute(features)
# Select relevant features if target provided
if self.select_relevant and y is not None:
y = np.asarray(y)
features = select_features(
features,
y,
fdr_level=self.fdr_level,
)
self.selected_features_ = list(features.columns)
else:
self.selected_features_ = list(features.columns)
self._is_fitted = True
return self
[docs]
def transform(self, X) -> np.ndarray:
"""Transform time series to features.
Parameters
----------
X : DataFrame
Input data.
Returns
-------
np.ndarray
Extracted features.
"""
if not self._is_fitted:
raise RuntimeError("TSFreshFeatureExtractor has not been fitted")
_check_tsfresh()
df = self._to_pandas(X)
# Extract features using fitted settings
features = extract_features(
df,
column_id=self.column_id,
column_sort=self.column_sort,
column_value=self.column_value,
default_fc_parameters=self.feature_settings_,
n_jobs=self.n_jobs if self.n_jobs > 0 else None,
disable_progressbar=not self.show_warnings,
)
# Impute missing values
features = impute(features)
# Select only fitted features
if self.selected_features_:
available = [f for f in self.selected_features_ if f in features.columns]
features = features[available]
# Add missing columns as zeros
for f in self.selected_features_:
if f not in features.columns:
features[f] = 0.0
# Ensure column order
features = features[self.selected_features_]
return features.values
[docs]
def fit_transform(self, X, y=None, **fit_params) -> np.ndarray:
"""Fit and transform in one step."""
return self.fit(X, y, **fit_params).transform(X)
[docs]
def get_feature_names_out(self, input_features=None) -> list[str]:
"""Get output feature names."""
if not self._is_fitted:
raise RuntimeError("TSFreshFeatureExtractor has not been fitted")
return self.selected_features_ or []
[docs]
class TimeSeriesFeatureExtractor(BaseEstimator, TransformerMixin):
"""Fast time series feature extractor without tsfresh dependency.
Extracts common time series features that are useful for GBDT models.
Faster than tsfresh but with fewer features.
Parameters
----------
features : List[str], optional
Features to extract. If None, extracts all.
Options: "statistics", "trend", "seasonality", "autocorr", "entropy",
"spectral", "peaks", "crossings".
window_sizes : List[int], default=[7, 14, 30]
Window sizes for rolling features.
lag_range : tuple, default=(1, 10)
Range of lags for autocorrelation features.
Examples
--------
>>> extractor = TimeSeriesFeatureExtractor(features=["statistics", "trend"])
>>> features = extractor.fit_transform(time_series_array)
"""
def __init__(
self,
features: list[str] | None = None,
window_sizes: list[int] = [7, 14, 30],
lag_range: tuple = (1, 10),
):
self.features = features or [
"statistics", "trend", "autocorr", "entropy", "peaks", "crossings"
]
self.window_sizes = window_sizes
self.lag_range = lag_range
self.feature_names_: list[str] | None = None
self._is_fitted = False
def _extract_statistics(self, y: np.ndarray) -> dict[str, float]:
"""Extract basic statistical features."""
features = {
"mean": np.mean(y),
"std": np.std(y),
"min": np.min(y),
"max": np.max(y),
"median": np.median(y),
"q25": np.percentile(y, 25),
"q75": np.percentile(y, 75),
"iqr": np.percentile(y, 75) - np.percentile(y, 25),
"skewness": self._skewness(y),
"kurtosis": self._kurtosis(y),
"range": np.max(y) - np.min(y),
"variation_coef": np.std(y) / np.mean(y) if np.mean(y) != 0 else 0,
}
return features
def _skewness(self, y: np.ndarray) -> float:
"""Compute skewness."""
n = len(y)
if n < 3:
return 0.0
mean = np.mean(y)
std = np.std(y)
if std == 0:
return 0.0
return np.mean(((y - mean) / std) ** 3)
def _kurtosis(self, y: np.ndarray) -> float:
"""Compute kurtosis."""
n = len(y)
if n < 4:
return 0.0
mean = np.mean(y)
std = np.std(y)
if std == 0:
return 0.0
return np.mean(((y - mean) / std) ** 4) - 3
def _extract_trend(self, y: np.ndarray) -> dict[str, float]:
"""Extract trend-related features."""
n = len(y)
x = np.arange(n)
# Linear regression
slope, intercept = np.polyfit(x, y, 1)
# Residuals
trend_line = slope * x + intercept
residuals = y - trend_line
r_squared = 1 - np.sum(residuals ** 2) / np.sum((y - np.mean(y)) ** 2)
features = {
"trend_slope": slope,
"trend_intercept": intercept,
"trend_r_squared": max(0, r_squared),
"trend_strength": abs(slope) / (np.std(y) + 1e-10),
"first_value": y[0],
"last_value": y[-1],
"change": y[-1] - y[0],
"pct_change": (y[-1] - y[0]) / (abs(y[0]) + 1e-10),
}
return features
def _extract_autocorr(self, y: np.ndarray) -> dict[str, float]:
"""Extract autocorrelation features."""
features = {}
n = len(y)
y_centered = y - np.mean(y)
var = np.var(y)
if var == 0:
for lag in range(self.lag_range[0], min(self.lag_range[1] + 1, n)):
features[f"autocorr_lag_{lag}"] = 0.0
return features
for lag in range(self.lag_range[0], min(self.lag_range[1] + 1, n)):
if lag >= n:
features[f"autocorr_lag_{lag}"] = 0.0
else:
acf = np.correlate(y_centered, y_centered, mode='full')[n - 1:]
features[f"autocorr_lag_{lag}"] = acf[lag] / (acf[0] + 1e-10)
# Partial autocorrelation (simplified)
features["partial_autocorr_1"] = features.get("autocorr_lag_1", 0.0)
return features
def _extract_entropy(self, y: np.ndarray) -> dict[str, float]:
"""Extract entropy-based features."""
# Sample entropy (simplified)
n = len(y)
if n < 10:
return {"sample_entropy": 0.0, "approx_entropy": 0.0}
# Binned entropy
bins = min(10, n // 5)
hist, _ = np.histogram(y, bins=bins)
hist = hist / hist.sum()
hist = hist[hist > 0]
binned_entropy = -np.sum(hist * np.log(hist))
# Permutation entropy (simplified)
m = 3 # embedding dimension
if n < m:
perm_entropy = 0.0
else:
patterns = []
for i in range(n - m + 1):
pattern = tuple(np.argsort(y[i:i+m]))
patterns.append(pattern)
unique, counts = np.unique(patterns, axis=0, return_counts=True)
probs = counts / len(patterns)
perm_entropy = -np.sum(probs * np.log(probs + 1e-10))
features = {
"binned_entropy": binned_entropy,
"permutation_entropy": perm_entropy,
}
return features
def _extract_peaks(self, y: np.ndarray) -> dict[str, float]:
"""Extract peak-related features."""
# Simple peak detection
n = len(y)
if n < 3:
return {"n_peaks": 0, "n_troughs": 0}
peaks = 0
troughs = 0
for i in range(1, n - 1):
if y[i] > y[i-1] and y[i] > y[i+1]:
peaks += 1
if y[i] < y[i-1] and y[i] < y[i+1]:
troughs += 1
features = {
"n_peaks": peaks,
"n_troughs": troughs,
"peaks_per_sample": peaks / n,
"troughs_per_sample": troughs / n,
}
return features
def _extract_crossings(self, y: np.ndarray) -> dict[str, float]:
"""Extract zero/mean crossing features."""
n = len(y)
mean_val = np.mean(y)
y_centered = y - mean_val
# Mean crossings
mean_crossings = np.sum(np.diff(np.sign(y_centered)) != 0)
# Zero crossings (if not already centered)
zero_crossings = np.sum(np.diff(np.sign(y)) != 0)
features = {
"mean_crossings": mean_crossings,
"zero_crossings": zero_crossings,
"mean_crossing_rate": mean_crossings / (n - 1),
}
return features
def _extract_for_series(self, y: np.ndarray) -> np.ndarray:
"""Extract all features for a single series."""
all_features = {}
if "statistics" in self.features:
all_features.update(self._extract_statistics(y))
if "trend" in self.features:
all_features.update(self._extract_trend(y))
if "autocorr" in self.features:
all_features.update(self._extract_autocorr(y))
if "entropy" in self.features:
all_features.update(self._extract_entropy(y))
if "peaks" in self.features:
all_features.update(self._extract_peaks(y))
if "crossings" in self.features:
all_features.update(self._extract_crossings(y))
# Sort by key for consistent ordering
sorted_features = dict(sorted(all_features.items()))
return np.array(list(sorted_features.values())), list(sorted_features.keys())
[docs]
def fit(self, X, y=None, **fit_params) -> TimeSeriesFeatureExtractor:
"""Fit the extractor (learn feature names).
Parameters
----------
X : array-like of shape (n_samples,) or (n_samples, n_series)
Time series data.
Returns
-------
self
Fitted extractor.
"""
X = np.asarray(X)
if X.ndim == 1:
X = X.reshape(-1, 1)
# Extract from first series to get feature names
_, self.feature_names_ = self._extract_for_series(X[:, 0])
self._is_fitted = True
return self
[docs]
def transform(self, X) -> np.ndarray:
"""Extract features from time series.
Parameters
----------
X : array-like of shape (n_samples,) or (n_series, n_samples)
Time series data. If 2D, each row is a separate series.
Returns
-------
np.ndarray
Extracted features of shape (n_series, n_features).
"""
if not self._is_fitted:
raise RuntimeError("TimeSeriesFeatureExtractor has not been fitted")
X = np.asarray(X)
# Handle single series
if X.ndim == 1:
features, _ = self._extract_for_series(X)
return features.reshape(1, -1)
# Handle multiple series (each row is a series)
if X.shape[0] > X.shape[1]:
# Likely (n_samples, n_series) - transpose
X = X.T
n_series = X.shape[0]
all_features = []
for i in range(n_series):
features, _ = self._extract_for_series(X[i])
all_features.append(features)
return np.array(all_features)
[docs]
def fit_transform(self, X, y=None, **fit_params) -> np.ndarray:
"""Fit and transform in one step."""
return self.fit(X, y, **fit_params).transform(X)
[docs]
def get_feature_names_out(self, input_features=None) -> list[str]:
"""Get output feature names."""
if not self._is_fitted:
raise RuntimeError("TimeSeriesFeatureExtractor has not been fitted")
return self.feature_names_ or []