Source code for endgame.timeseries.validation

from __future__ import annotations

"""Time series cross-validation and metrics.

This module extends the validation strategies in endgame.validation
with time series specific implementations.

Provides:
- Expanding window CV
- Sliding window CV
- Blocked time series split
- Forecasting metrics (MASE, SMAPE, etc.)
"""

from collections.abc import Generator
from typing import Any

import numpy as np
from sklearn.model_selection import BaseCrossValidator


[docs] class ExpandingWindowCV(BaseCrossValidator): """Expanding window cross-validation for time series. Training window expands with each fold while validation window remains fixed size. Parameters ---------- n_splits : int, default=5 Number of folds. initial_train_size : int, optional Initial training set size. If None, computed from n_splits. val_size : int, optional Validation set size. If None, computed from n_splits. gap : int, default=0 Number of samples to skip between train and validation. step_size : int, optional How many samples to add per fold. If None, uses val_size. Examples -------- >>> cv = ExpandingWindowCV(n_splits=5, initial_train_size=100, val_size=20) >>> for train_idx, val_idx in cv.split(X): ... # train_idx grows with each fold ... pass """ def __init__( self, n_splits: int = 5, initial_train_size: int | None = None, val_size: int | None = None, gap: int = 0, step_size: int | None = None, ): self.n_splits = n_splits self.initial_train_size = initial_train_size self.val_size = val_size self.gap = gap self.step_size = step_size
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
[docs] def split( self, X: Any, y: Any | None = None, groups: Any | None = None, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate train/validation indices. Parameters ---------- X : array-like Input data. y : ignored groups : ignored Yields ------ train_idx, val_idx : ndarray Training and validation indices for this fold. """ n_samples = len(X) indices = np.arange(n_samples) # Compute sizes if not provided val_size = self.val_size or (n_samples // (self.n_splits + 1)) step_size = self.step_size or val_size # Initial train size if self.initial_train_size is not None: initial_size = self.initial_train_size else: initial_size = n_samples - (self.n_splits * (val_size + self.gap)) if initial_size < 1: raise ValueError( f"initial_train_size ({initial_size}) must be > 0. " "Try reducing n_splits or providing explicit sizes." ) for fold in range(self.n_splits): train_end = initial_size + fold * step_size val_start = train_end + self.gap val_end = val_start + val_size if val_end > n_samples: break train_idx = indices[:train_end] val_idx = indices[val_start:val_end] if len(train_idx) > 0 and len(val_idx) > 0: yield train_idx, val_idx
[docs] class SlidingWindowCV(BaseCrossValidator): """Sliding window cross-validation for time series. Both training and validation windows have fixed sizes and slide through the data. Parameters ---------- n_splits : int, default=5 Number of folds. train_size : int, optional Training window size. val_size : int, optional Validation window size. gap : int, default=0 Gap between train and validation. step_size : int, optional How many samples to slide per fold. Examples -------- >>> cv = SlidingWindowCV(n_splits=5, train_size=100, val_size=20) >>> for train_idx, val_idx in cv.split(X): ... # Both windows slide forward ... pass """ def __init__( self, n_splits: int = 5, train_size: int | None = None, val_size: int | None = None, gap: int = 0, step_size: int | None = None, ): self.n_splits = n_splits self.train_size = train_size self.val_size = val_size self.gap = gap self.step_size = step_size
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
[docs] def split( self, X: Any, y: Any | None = None, groups: Any | None = None, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate train/validation indices.""" n_samples = len(X) indices = np.arange(n_samples) # Compute sizes total_per_fold = n_samples // (self.n_splits + 1) val_size = self.val_size or (total_per_fold // 3) train_size = self.train_size or (total_per_fold - val_size - self.gap) step_size = self.step_size or val_size for fold in range(self.n_splits): train_start = fold * step_size train_end = train_start + train_size val_start = train_end + self.gap val_end = val_start + val_size if val_end > n_samples: break train_idx = indices[train_start:train_end] val_idx = indices[val_start:val_end] if len(train_idx) > 0 and len(val_idx) > 0: yield train_idx, val_idx
[docs] class BlockedTimeSeriesSplit(BaseCrossValidator): """Blocked time series split for reducing temporal leakage. Splits data into blocks that respect temporal structure and adds gaps to prevent leakage from adjacent periods. Parameters ---------- n_splits : int, default=5 Number of folds. gap_before : int, default=0 Gap samples before validation set. gap_after : int, default=0 Gap samples after validation set (embargo). Examples -------- >>> cv = BlockedTimeSeriesSplit(n_splits=5, gap_before=10, gap_after=5) >>> for train_idx, val_idx in cv.split(X): ... pass """ def __init__( self, n_splits: int = 5, gap_before: int = 0, gap_after: int = 0, ): self.n_splits = n_splits self.gap_before = gap_before self.gap_after = gap_after
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
[docs] def split( self, X: Any, y: Any | None = None, groups: Any | None = None, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate train/validation indices with gaps.""" n_samples = len(X) indices = np.arange(n_samples) # Divide into n_splits + 1 blocks (training gets multiple blocks) fold_size = n_samples // (self.n_splits + 1) for fold in range(self.n_splits): val_start = (fold + 1) * fold_size val_end = val_start + fold_size if fold < self.n_splits - 1 else n_samples # Training: all blocks before validation, minus gap train_end = val_start - self.gap_before train_idx = indices[:max(0, train_end)] # Validation val_idx = indices[val_start:val_end] if len(train_idx) > 0 and len(val_idx) > 0: yield train_idx, val_idx
# Forecasting metrics
[docs] def mase( y_true: np.ndarray, y_pred: np.ndarray, y_train: np.ndarray, seasonal_period: int = 1, ) -> float: """Mean Absolute Scaled Error. Scales MAE by the in-sample MAE of a naive seasonal forecast. MASE < 1 means the model outperforms seasonal naive. Parameters ---------- y_true : array-like True values. y_pred : array-like Predicted values. y_train : array-like Training data for computing scaling factor. seasonal_period : int, default=1 Seasonal period for naive forecast (1 = random walk). Returns ------- float MASE score. """ y_true = np.asarray(y_true) y_pred = np.asarray(y_pred) y_train = np.asarray(y_train) # Compute MAE of forecast forecast_mae = np.mean(np.abs(y_true - y_pred)) # Compute in-sample MAE of seasonal naive if len(y_train) > seasonal_period: naive_errors = y_train[seasonal_period:] - y_train[:-seasonal_period] scale = np.mean(np.abs(naive_errors)) else: scale = np.mean(np.abs(np.diff(y_train))) if scale == 0: return float('inf') if forecast_mae > 0 else 0.0 return forecast_mae / scale
[docs] def smape(y_true: np.ndarray, y_pred: np.ndarray) -> float: """Symmetric Mean Absolute Percentage Error. Parameters ---------- y_true : array-like True values. y_pred : array-like Predicted values. Returns ------- float SMAPE in percentage (0-200%). """ y_true = np.asarray(y_true) y_pred = np.asarray(y_pred) denom = np.abs(y_true) + np.abs(y_pred) mask = denom != 0 if not np.any(mask): return 0.0 return np.mean(2 * np.abs(y_true[mask] - y_pred[mask]) / denom[mask]) * 100
[docs] def mape(y_true: np.ndarray, y_pred: np.ndarray) -> float: """Mean Absolute Percentage Error. Parameters ---------- y_true : array-like True values. y_pred : array-like Predicted values. Returns ------- float MAPE in percentage. """ y_true = np.asarray(y_true) y_pred = np.asarray(y_pred) mask = y_true != 0 if not np.any(mask): return float('inf') return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100
[docs] def rmsse( y_true: np.ndarray, y_pred: np.ndarray, y_train: np.ndarray, ) -> float: """Root Mean Squared Scaled Error. Used in M5 forecasting competition. Parameters ---------- y_true : array-like True values. y_pred : array-like Predicted values. y_train : array-like Training data for scaling. Returns ------- float RMSSE score. """ y_true = np.asarray(y_true) y_pred = np.asarray(y_pred) y_train = np.asarray(y_train) # Compute MSE of forecast forecast_mse = np.mean((y_true - y_pred) ** 2) # Compute in-sample MSE of naive (random walk) if len(y_train) > 1: naive_errors = y_train[1:] - y_train[:-1] scale = np.mean(naive_errors ** 2) else: scale = np.var(y_train) if scale == 0: return float('inf') if forecast_mse > 0 else 0.0 return np.sqrt(forecast_mse / scale)
[docs] def wape(y_true: np.ndarray, y_pred: np.ndarray) -> float: """Weighted Absolute Percentage Error. Parameters ---------- y_true : array-like True values. y_pred : array-like Predicted values. Returns ------- float WAPE in percentage. """ y_true = np.asarray(y_true) y_pred = np.asarray(y_pred) total_true = np.sum(np.abs(y_true)) if total_true == 0: return 0.0 if np.sum(np.abs(y_pred)) == 0 else float('inf') return np.sum(np.abs(y_true - y_pred)) / total_true * 100
[docs] def coverage( y_true: np.ndarray, lower: np.ndarray, upper: np.ndarray, ) -> float: """Compute prediction interval coverage. Parameters ---------- y_true : array-like True values. lower : array-like Lower bounds of prediction intervals. upper : array-like Upper bounds of prediction intervals. Returns ------- float Coverage probability (0-1). """ y_true = np.asarray(y_true) lower = np.asarray(lower) upper = np.asarray(upper) in_interval = (y_true >= lower) & (y_true <= upper) return np.mean(in_interval)
[docs] def interval_width(lower: np.ndarray, upper: np.ndarray) -> float: """Compute average prediction interval width. Parameters ---------- lower : array-like Lower bounds. upper : array-like Upper bounds. Returns ------- float Mean interval width. """ return np.mean(np.asarray(upper) - np.asarray(lower))
[docs] def winkler_score( y_true: np.ndarray, lower: np.ndarray, upper: np.ndarray, alpha: float = 0.05, ) -> float: """Compute Winkler score for prediction intervals. Combines coverage and sharpness. Lower is better. Parameters ---------- y_true : array-like True values. lower : array-like Lower bounds. upper : array-like Upper bounds. alpha : float, default=0.05 Significance level (1 - coverage). Returns ------- float Winkler score. """ y_true = np.asarray(y_true) lower = np.asarray(lower) upper = np.asarray(upper) width = upper - lower # Penalty for below lower bound below = np.maximum(0, lower - y_true) # Penalty for above upper bound above = np.maximum(0, y_true - upper) score = width + (2 / alpha) * below + (2 / alpha) * above return np.mean(score)