Source code for endgame.validation.splitters

from __future__ import annotations

"""Cross-validation splitters for competition-specific scenarios."""

from collections.abc import Generator
from itertools import combinations
from typing import Any

import numpy as np
from sklearn.model_selection import BaseCrossValidator, StratifiedKFold

try:
    import polars as pl
    HAS_POLARS = True
except ImportError:
    HAS_POLARS = False

try:
    import pandas as pd
    HAS_PANDAS = True
except ImportError:
    HAS_PANDAS = False


[docs] class PurgedTimeSeriesSplit(BaseCrossValidator): """Time series CV with purging and embargo to prevent lookahead bias. Essential for financial competitions (Optiver, Jane Street) where temporal leakage can severely overfit models. Purging removes samples between train and validation that might contain information about the validation period. Embargo adds a gap after validation to prevent using future information. Parameters ---------- n_splits : int, default=5 Number of folds. purge_gap : int, default=0 Number of samples to purge between train and validation. embargo_pct : float, default=0.01 Percentage of test data to embargo after each split. max_train_size : int, optional Maximum size of training set (rolling window). Examples -------- >>> cv = PurgedTimeSeriesSplit(n_splits=5, purge_gap=10, embargo_pct=0.01) >>> for train_idx, val_idx in cv.split(X): ... # train_idx ends purge_gap samples before val_idx starts ... pass """ def __init__( self, n_splits: int = 5, purge_gap: int = 0, embargo_pct: float = 0.01, max_train_size: int | None = None, ): self.n_splits = n_splits self.purge_gap = purge_gap self.embargo_pct = embargo_pct self.max_train_size = max_train_size
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
[docs] def split( self, X: Any, y: Any | None = None, groups: Any | None = None, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate train/validation indices with purging and embargo. Parameters ---------- X : array-like Training data. y : array-like, optional Target variable (ignored). groups : array-like, optional Group labels (ignored). Yields ------ train_idx : ndarray Training indices for this fold. val_idx : ndarray Validation indices for this fold. """ n_samples = len(X) indices = np.arange(n_samples) # Compute embargo size embargo_size = int(n_samples * self.embargo_pct) # Compute fold boundaries fold_size = n_samples // (self.n_splits + 1) for fold in range(self.n_splits): # Validation start and end val_start = (fold + 1) * fold_size val_end = val_start + fold_size if fold == self.n_splits - 1: val_end = n_samples # Training ends before purge gap train_end = val_start - self.purge_gap if train_end <= 0: continue train_idx = indices[:train_end] # Apply max_train_size (rolling window) if self.max_train_size is not None and len(train_idx) > self.max_train_size: train_idx = train_idx[-self.max_train_size:] # Validation with embargo val_idx = indices[val_start:val_end] # Apply embargo to training (remove samples too close to future val) if embargo_size > 0 and fold > 0: embargo_start = val_start - embargo_size train_idx = train_idx[train_idx < embargo_start] if len(train_idx) > 0 and len(val_idx) > 0: yield train_idx, val_idx
[docs] class StratifiedGroupKFold(BaseCrossValidator): """Stratified K-Fold that respects groups. Combines stratification (maintaining class balance) with group constraints (keeping all samples from a group in the same fold). Essential when samples are related (e.g., patient_id, user_id) to prevent data leakage. Parameters ---------- n_splits : int, default=5 Number of folds. shuffle : bool, default=True Whether to shuffle groups before splitting. random_state : int, optional Random seed for reproducibility. Examples -------- >>> cv = StratifiedGroupKFold(n_splits=5) >>> for train_idx, val_idx in cv.split(X, y, groups=patient_ids): ... # No patient appears in both train and val ... pass """ def __init__( self, n_splits: int = 5, shuffle: bool = True, random_state: int | None = None, ): self.n_splits = n_splits self.shuffle = shuffle self.random_state = random_state
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
[docs] def split( self, X: Any, y: Any, groups: Any, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate stratified group-aware train/validation indices. Parameters ---------- X : array-like Training data. y : array-like Target variable for stratification. groups : array-like Group labels (e.g., patient_id). Yields ------ train_idx : ndarray Training indices for this fold. val_idx : ndarray Validation indices for this fold. """ y = np.asarray(y) groups = np.asarray(groups) # Get unique groups and their properties unique_groups = np.unique(groups) n_groups = len(unique_groups) # Compute dominant class for each group (for stratification) group_to_class = {} for group in unique_groups: group_mask = groups == group group_y = y[group_mask] # Use most common class in group unique, counts = np.unique(group_y, return_counts=True) group_to_class[group] = unique[np.argmax(counts)] # Create array of group classes for stratification group_classes = np.array([group_to_class[g] for g in unique_groups]) # Stratified split of groups skf = StratifiedKFold( n_splits=self.n_splits, shuffle=self.shuffle, random_state=self.random_state, ) for train_group_idx, val_group_idx in skf.split(unique_groups, group_classes): train_groups = set(unique_groups[train_group_idx]) val_groups = set(unique_groups[val_group_idx]) train_idx = np.where([g in train_groups for g in groups])[0] val_idx = np.where([g in val_groups for g in groups])[0] yield train_idx, val_idx
[docs] class RepeatedStratifiedGroupKFold(BaseCrossValidator): """Repeated Stratified Group K-Fold. Runs multiple iterations of StratifiedGroupKFold with different random seeds for more robust CV estimates. Parameters ---------- n_splits : int, default=5 Number of folds per repeat. n_repeats : int, default=3 Number of times to repeat the splits. random_state : int, optional Random seed for reproducibility. """ def __init__( self, n_splits: int = 5, n_repeats: int = 3, random_state: int | None = None, ): self.n_splits = n_splits self.n_repeats = n_repeats self.random_state = random_state
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the total number of splits.""" return self.n_splits * self.n_repeats
[docs] def split( self, X: Any, y: Any, groups: Any, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate repeated stratified group-aware splits.""" rng = np.random.RandomState(self.random_state) for repeat in range(self.n_repeats): cv = StratifiedGroupKFold( n_splits=self.n_splits, shuffle=True, random_state=rng.randint(0, 2**31), ) yield from cv.split(X, y, groups)
[docs] class MultilabelStratifiedKFold(BaseCrossValidator): """Stratified K-Fold for multilabel classification. Maintains label distribution across folds for multilabel problems using iterative stratification. Parameters ---------- n_splits : int, default=5 Number of folds. shuffle : bool, default=True Whether to shuffle before splitting. random_state : int, optional Random seed for reproducibility. Examples -------- >>> # y is shape (n_samples, n_labels) with binary labels >>> cv = MultilabelStratifiedKFold(n_splits=5) >>> for train_idx, val_idx in cv.split(X, y): ... # Label proportions maintained across folds ... pass """ def __init__( self, n_splits: int = 5, shuffle: bool = True, random_state: int | None = None, ): self.n_splits = n_splits self.shuffle = shuffle self.random_state = random_state
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
[docs] def split( self, X: Any, y: Any, groups: Any | None = None, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate multilabel-stratified train/validation indices. Uses iterative stratification algorithm to maintain label proportions. Parameters ---------- X : array-like Training data. y : array-like of shape (n_samples, n_labels) Multilabel target matrix. groups : array-like, optional Ignored. Yields ------ train_idx : ndarray Training indices for this fold. val_idx : ndarray Validation indices for this fold. """ y = np.asarray(y) if y.ndim == 1: y = y.reshape(-1, 1) n_samples, n_labels = y.shape indices = np.arange(n_samples) rng = np.random.RandomState(self.random_state) if self.shuffle: rng.shuffle(indices) y = y[indices] # Initialize folds folds = [[] for _ in range(self.n_splits)] fold_label_counts = np.zeros((self.n_splits, n_labels)) # Desired samples per fold per label label_totals = y.sum(axis=0) desired_per_fold = label_totals / self.n_splits # Iterative stratification for i in range(n_samples): sample_labels = y[i] # Find fold with most need for this sample's labels scores = np.zeros(self.n_splits) for fold in range(self.n_splits): for label in range(n_labels): if sample_labels[label] == 1: deficit = desired_per_fold[label] - fold_label_counts[fold, label] scores[fold] += deficit # Add sample to fold with highest score (most deficit) # Break ties by fold size (prefer smaller) fold_sizes = np.array([len(f) for f in folds]) # Slightly prefer smaller folds scores = scores - 0.001 * fold_sizes best_fold = np.argmax(scores) folds[best_fold].append(indices[i]) fold_label_counts[best_fold] += sample_labels # Generate splits for fold in range(self.n_splits): val_idx = np.array(folds[fold]) train_idx = np.concatenate([ np.array(folds[f]) for f in range(self.n_splits) if f != fold ]) yield train_idx, val_idx
[docs] class AdversarialKFold(BaseCrossValidator): """K-Fold that weights folds by test-similarity. Uses adversarial validation to identify training samples that look most like test data, then ensures each fold has similar proportions of test-like samples. Parameters ---------- n_splits : int, default=5 Number of folds. test_similarity_threshold : float, default=0.5 Threshold for considering a sample "test-like". random_state : int, optional Random seed for reproducibility. Examples -------- >>> cv = AdversarialKFold(n_splits=5) >>> for train_idx, val_idx in cv.split(X_train, y, X_test=X_test): ... # Each fold has similar proportion of test-like samples ... pass """ def __init__( self, n_splits: int = 5, test_similarity_threshold: float = 0.5, random_state: int | None = None, ): self.n_splits = n_splits self.test_similarity_threshold = test_similarity_threshold self.random_state = random_state self._test_similarity: np.ndarray | None = None
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
[docs] def fit(self, X_train: Any, X_test: Any) -> AdversarialKFold: """Compute test similarity scores for training samples. Parameters ---------- X_train : array-like Training features. X_test : array-like Test features. Returns ------- self """ from endgame.validation.adversarial import AdversarialValidator av = AdversarialValidator( cv=3, random_state=self.random_state, ) av.check_drift(X_train, X_test) # Get test-likeness scores X_train_arr = np.asarray(X_train) if X_train_arr.ndim == 1: X_train_arr = X_train_arr.reshape(-1, 1) X_train_arr = np.nan_to_num(X_train_arr, nan=0.0, posinf=0.0, neginf=0.0) self._test_similarity = av._estimator.predict_proba(X_train_arr)[:, 1] return self
[docs] def split( self, X: Any, y: Any | None = None, groups: Any | None = None, X_test: Any | None = None, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate adversarial-aware train/validation indices. Parameters ---------- X : array-like Training data. y : array-like, optional Target variable. groups : array-like, optional Ignored. X_test : array-like, optional Test data for computing similarity (if not already fit). Yields ------ train_idx : ndarray Training indices for this fold. val_idx : ndarray Validation indices for this fold. """ n_samples = len(X) # Compute similarity if not already done if self._test_similarity is None: if X_test is None: raise ValueError( "X_test must be provided or fit() must be called first" ) self.fit(X, X_test) # Split samples into test-like and non-test-like test_like_mask = self._test_similarity >= self.test_similarity_threshold test_like_idx = np.where(test_like_mask)[0] non_test_like_idx = np.where(~test_like_mask)[0] rng = np.random.RandomState(self.random_state) rng.shuffle(test_like_idx) rng.shuffle(non_test_like_idx) # Split both groups into folds test_like_folds = np.array_split(test_like_idx, self.n_splits) non_test_like_folds = np.array_split(non_test_like_idx, self.n_splits) for fold in range(self.n_splits): val_idx = np.concatenate([ test_like_folds[fold], non_test_like_folds[fold], ]) train_idx = np.concatenate([ np.concatenate([test_like_folds[f] for f in range(self.n_splits) if f != fold]), np.concatenate([non_test_like_folds[f] for f in range(self.n_splits) if f != fold]), ]) yield train_idx, val_idx
[docs] class CombinatorialPurgedKFold(BaseCrossValidator): """Combinatorial Purged Cross-Validation for time series/financial data. Implements the CPCV method from Marcos López de Prado's "Advances in Financial Machine Learning" (Chapter 12). This method: 1. Divides data into N sequential groups (folds) 2. Uses combinations of k groups as test sets (C(N,k) total splits) 3. Applies purging to remove training samples that overlap with test labels 4. Applies embargo to remove training samples too close to test periods This generates multiple "backtest paths" that can be recombined to compute statistics like the distribution of Sharpe ratios, enabling detection of backtest overfitting. Parameters ---------- n_folds : int, default=10 Number of sequential groups to divide the data into. Must be >= 3. n_test_folds : int, default=2 Number of folds to use as test set in each split. Must be >= 1 and < n_folds. Total number of splits = C(n_folds, n_test_folds). purge_gap : int, default=0 Number of samples to purge (remove) from training set at boundaries with test set. These are samples whose labels might overlap with the test period. embargo_pct : float, default=0.0 Percentage of total samples to embargo after each test period. Embargo removes training samples that occur immediately after test samples to prevent lookahead bias from label leakage. Attributes ---------- n_splits : int Total number of train/test splits = C(n_folds, n_test_folds). n_test_paths : int Number of reconstructible test paths from combinations. fold_bounds_ : List[Tuple[int, int]] Start and end indices for each fold (set after split is called). Notes ----- The key insight of CPCV is that standard k-fold CV produces only ONE backtest path (the concatenation of all test folds). CPCV produces MULTIPLE backtest paths by using combinations of test folds, enabling statistical analysis of strategy performance across different scenarios. For example, with n_folds=6 and n_test_folds=2: - Standard KFold: 6 splits, 1 backtest path - CPCV: C(6,2)=15 splits, multiple backtest paths References ---------- López de Prado, M. (2018). "Advances in Financial Machine Learning". Chapter 12: Backtesting through Cross-Validation. Examples -------- >>> from endgame.validation import CombinatorialPurgedKFold >>> import numpy as np >>> >>> # Financial time series with 1000 samples >>> X = np.random.randn(1000, 10) >>> y = np.random.randn(1000) >>> >>> # Use 6 folds, 2 test folds per split, with purging and embargo >>> cpcv = CombinatorialPurgedKFold( ... n_folds=6, ... n_test_folds=2, ... purge_gap=10, ... embargo_pct=0.01, ... ) >>> >>> print(f"Number of splits: {cpcv.get_n_splits()}") # 15 splits >>> >>> for train_idx, test_idx in cpcv.split(X): ... # Train model on train_idx, evaluate on test_idx ... pass >>> >>> # Get backtest paths for strategy analysis >>> paths = cpcv.get_test_paths(X) >>> print(f"Number of backtest paths: {len(paths)}") """ def __init__( self, n_folds: int = 10, n_test_folds: int = 2, purge_gap: int = 0, embargo_pct: float = 0.0, ): if n_folds < 3: raise ValueError("n_folds must be >= 3") if n_test_folds < 1: raise ValueError("n_test_folds must be >= 1") if n_test_folds >= n_folds: raise ValueError("n_test_folds must be < n_folds") self.n_folds = n_folds self.n_test_folds = n_test_folds self.purge_gap = purge_gap self.embargo_pct = embargo_pct self.fold_bounds_: list[tuple[int, int]] | None = None self._n_samples: int | None = None @property def n_splits(self) -> int: """Total number of train/test splits.""" from math import comb return comb(self.n_folds, self.n_test_folds) @property def n_test_paths(self) -> int: """Number of reconstructible backtest paths. Each path is a complete sequence through the data using different combinations of the test folds. """ from math import comb # Number of ways to arrange test folds into paths return comb(self.n_folds - 1, self.n_test_folds - 1)
[docs] def get_n_splits( self, X: Any | None = None, y: Any | None = None, groups: Any | None = None, ) -> int: """Return the number of splits.""" return self.n_splits
def _compute_fold_bounds(self, n_samples: int) -> list[tuple[int, int]]: """Compute the start and end indices for each fold.""" fold_size = n_samples // self.n_folds bounds = [] for i in range(self.n_folds): start = i * fold_size end = (i + 1) * fold_size if i < self.n_folds - 1 else n_samples bounds.append((start, end)) return bounds def _get_embargo_size(self, n_samples: int) -> int: """Compute embargo size in number of samples.""" return int(n_samples * self.embargo_pct) def _apply_purging_and_embargo( self, train_indices: np.ndarray, test_fold_indices: list[int], fold_bounds: list[tuple[int, int]], n_samples: int, ) -> np.ndarray: """Apply purging and embargo to training indices. Purging: Remove training samples whose indices are within purge_gap of any test fold boundary. Embargo: Remove training samples that occur within embargo period after any test fold. Parameters ---------- train_indices : np.ndarray Original training indices. test_fold_indices : List[int] Indices of folds used for testing. fold_bounds : List[Tuple[int, int]] Start and end indices for each fold. n_samples : int Total number of samples. Returns ------- np.ndarray Purged and embargoed training indices. """ if self.purge_gap == 0 and self.embargo_pct == 0.0: return train_indices embargo_size = self._get_embargo_size(n_samples) mask = np.ones(len(train_indices), dtype=bool) for test_fold_idx in test_fold_indices: test_start, test_end = fold_bounds[test_fold_idx] for i, train_idx in enumerate(train_indices): # Purging: Remove samples too close to test boundaries if self.purge_gap > 0: # Sample is within purge_gap before test start if test_start - self.purge_gap <= train_idx < test_start or test_end <= train_idx < test_end + self.purge_gap: mask[i] = False # Embargo: Remove samples in embargo period after test if embargo_size > 0: if test_end <= train_idx < test_end + embargo_size: mask[i] = False return train_indices[mask]
[docs] def split( self, X: Any, y: Any | None = None, groups: Any | None = None, ) -> Generator[tuple[np.ndarray, np.ndarray], None, None]: """Generate combinatorial purged train/test splits. Parameters ---------- X : array-like Training data. Used only to determine the number of samples. y : array-like, optional Target variable (ignored, but accepted for sklearn compatibility). groups : array-like, optional Group labels (ignored). Yields ------ train_idx : np.ndarray Training indices for this split (purged and embargoed). test_idx : np.ndarray Test indices for this split. """ n_samples = len(X) self._n_samples = n_samples indices = np.arange(n_samples) # Compute fold boundaries fold_bounds = self._compute_fold_bounds(n_samples) self.fold_bounds_ = fold_bounds # Generate all combinations of test folds test_fold_combinations = list(combinations(range(self.n_folds), self.n_test_folds)) for test_fold_indices in test_fold_combinations: # Determine train fold indices train_fold_indices = [i for i in range(self.n_folds) if i not in test_fold_indices] # Build test indices test_idx_parts = [] for fold_idx in test_fold_indices: start, end = fold_bounds[fold_idx] test_idx_parts.append(indices[start:end]) test_idx = np.concatenate(test_idx_parts) # Build train indices train_idx_parts = [] for fold_idx in train_fold_indices: start, end = fold_bounds[fold_idx] train_idx_parts.append(indices[start:end]) train_idx = np.concatenate(train_idx_parts) # Apply purging and embargo train_idx = self._apply_purging_and_embargo( train_idx, list(test_fold_indices), fold_bounds, n_samples, ) if len(train_idx) > 0 and len(test_idx) > 0: yield train_idx, test_idx
[docs] def get_test_paths(self, X: Any) -> list[list[np.ndarray]]: """Reconstruct all possible backtest paths from the splits. A backtest path is a sequence of test sets that together cover the entire dataset in temporal order. CPCV allows reconstructing multiple such paths from the combinatorial splits. Parameters ---------- X : array-like Training data (used only to determine size). Returns ------- List[List[np.ndarray]] List of paths, where each path is a list of test index arrays that together form a complete pass through the data. """ n_samples = len(X) if self.fold_bounds_ is None: self.fold_bounds_ = self._compute_fold_bounds(n_samples) indices = np.arange(n_samples) paths = [] # Generate paths by selecting which fold goes into which position # For n_test_folds positions, we need to select from remaining folds # This is a simplified path reconstruction # Each path consists of test_folds in sequence # We enumerate paths by choosing which n_test_folds to use test_fold_combinations = list(combinations(range(self.n_folds), self.n_test_folds)) for test_fold_indices in test_fold_combinations: # Sort test folds by their temporal order sorted_test_folds = sorted(test_fold_indices) path = [] for fold_idx in sorted_test_folds: start, end = self.fold_bounds_[fold_idx] path.append(indices[start:end]) paths.append(path) return paths
[docs] def get_fold_info(self, X: Any) -> dict[str, Any]: """Get detailed information about the fold structure. Parameters ---------- X : array-like Training data. Returns ------- Dict[str, Any] Dictionary containing: - n_samples: Total number of samples - n_folds: Number of folds - n_test_folds: Number of test folds per split - n_splits: Total number of splits - n_test_paths: Number of backtest paths - fold_sizes: List of fold sizes - purge_gap: Purge gap setting - embargo_size: Embargo size in samples """ n_samples = len(X) fold_bounds = self._compute_fold_bounds(n_samples) fold_sizes = [end - start for start, end in fold_bounds] return { "n_samples": n_samples, "n_folds": self.n_folds, "n_test_folds": self.n_test_folds, "n_splits": self.n_splits, "n_test_paths": self.n_test_paths, "fold_sizes": fold_sizes, "purge_gap": self.purge_gap, "embargo_size": self._get_embargo_size(n_samples), }
def __repr__(self) -> str: return ( f"CombinatorialPurgedKFold(n_folds={self.n_folds}, " f"n_test_folds={self.n_test_folds}, " f"purge_gap={self.purge_gap}, " f"embargo_pct={self.embargo_pct})" )