Source code for endgame.validation.adversarial

from __future__ import annotations

"""Adversarial Validation for detecting train/test distribution drift."""

from typing import Any

import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import StratifiedKFold, cross_val_predict

from endgame.core.base import EndgameEstimator
from endgame.core.types import AdversarialValidationResult

try:
    import polars as pl
    HAS_POLARS = True
except ImportError:
    HAS_POLARS = False

try:
    import pandas as pd
    HAS_PANDAS = True
except ImportError:
    HAS_PANDAS = False


[docs] class AdversarialValidator(EndgameEstimator): """Detects train/test distribution drift using adversarial validation. Trains a classifier to distinguish train from test data. High AUC (>0.5) indicates distribution drift. Feature importances identify drifting features. This is a critical technique documented across winning solutions to prevent leaderboard overfitting when CV doesn't correlate with public LB. Parameters ---------- estimator : sklearn-compatible classifier, optional The classifier used for adversarial validation. If None, uses LightGBM if available, else RandomForest. sample_frac : float, default=1.0 Fraction of data to use (for large datasets). cv : int, default=5 Number of cross-validation folds. threshold : float, default=0.7 AUC threshold above which to flag significant drift. random_state : int, optional Random seed for reproducibility. verbose : bool, default=False Enable verbose output. Attributes ---------- auc_score_ : float ROC-AUC score from adversarial validation. feature_importances_ : Dict[str, float] Feature importance in distinguishing train/test. drifted_features_ : List[str] Features contributing most to drift (sorted by importance). Examples -------- >>> from endgame.validation import AdversarialValidator >>> av = AdversarialValidator(threshold=0.6) >>> result = av.check_drift(X_train, X_test) >>> print(f"Drift AUC: {result.auc_score:.3f}") >>> if result.drift_severity == 'severe': ... # Remove drifted features ... drop_cols = result.drifted_features[:5] """ def __init__( self, estimator: BaseEstimator | None = None, sample_frac: float = 1.0, cv: int = 5, threshold: float = 0.7, random_state: int | None = None, verbose: bool = False, ): super().__init__(random_state=random_state, verbose=verbose) self.estimator = estimator self.sample_frac = sample_frac self.cv = cv self.threshold = threshold self.auc_score_: float | None = None self.feature_importances_: dict[str, float] | None = None self.drifted_features_: list[str] | None = None self._estimator: BaseEstimator | None = None def _get_default_estimator(self) -> BaseEstimator: """Get the default estimator for adversarial validation.""" # Try LightGBM first (fastest and most accurate) try: from lightgbm import LGBMClassifier return LGBMClassifier( n_estimators=100, learning_rate=0.1, num_leaves=31, max_depth=5, verbosity=-1, n_jobs=-1, random_state=self.random_state, ) except ImportError: pass # Fall back to RandomForest from sklearn.ensemble import RandomForestClassifier return RandomForestClassifier( n_estimators=100, max_depth=5, n_jobs=-1, random_state=self.random_state, ) def _to_numpy_with_columns( self, X: Any, ) -> tuple[np.ndarray, list[str]]: """Convert input to numpy array and extract column names.""" if HAS_PANDAS and isinstance(X, pd.DataFrame): return X.values, list(X.columns) if HAS_POLARS and isinstance(X, (pl.DataFrame, pl.LazyFrame)): if isinstance(X, pl.LazyFrame): X = X.collect() return X.to_numpy(), list(X.columns) X_arr = np.asarray(X) if X_arr.ndim == 1: X_arr = X_arr.reshape(-1, 1) columns = [f"feature_{i}" for i in range(X_arr.shape[1])] return X_arr, columns
[docs] def check_drift( self, X_train: Any, X_test: Any, ) -> AdversarialValidationResult: """Check for distribution drift between train and test data. Parameters ---------- X_train : array-like of shape (n_train_samples, n_features) Training features. X_test : array-like of shape (n_test_samples, n_features) Test features. Returns ------- AdversarialValidationResult Result containing: - auc_score: float (>0.5 indicates drift) - drifted_features: List[str] (features with high importance) - feature_importances: Dict[str, float] - drift_severity: str ('none', 'mild', 'moderate', 'severe') """ X_train_arr, columns = self._to_numpy_with_columns(X_train) X_test_arr, _ = self._to_numpy_with_columns(X_test) # Sample if requested if self.sample_frac < 1.0: rng = np.random.RandomState(self.random_state) n_train = int(len(X_train_arr) * self.sample_frac) n_test = int(len(X_test_arr) * self.sample_frac) train_idx = rng.choice(len(X_train_arr), n_train, replace=False) test_idx = rng.choice(len(X_test_arr), n_test, replace=False) X_train_arr = X_train_arr[train_idx] X_test_arr = X_test_arr[test_idx] # Create adversarial labels: 0 = train, 1 = test X_combined = np.vstack([X_train_arr, X_test_arr]) y_combined = np.concatenate([ np.zeros(len(X_train_arr)), np.ones(len(X_test_arr)), ]) # Handle NaN/inf values X_combined = np.nan_to_num(X_combined, nan=0.0, posinf=0.0, neginf=0.0) # Get estimator self._estimator = self.estimator or self._get_default_estimator() if self.estimator is not None: self._estimator = clone(self._estimator) # Cross-validate to get OOF predictions cv = StratifiedKFold( n_splits=self.cv, shuffle=True, random_state=self.random_state, ) self._log(f"Running adversarial validation with {self.cv}-fold CV...") try: oof_proba = cross_val_predict( self._estimator, X_combined, y_combined, cv=cv, method="predict_proba", )[:, 1] except Exception as e: self._log(f"predict_proba failed: {e}, using predict", level="warn") oof_proba = cross_val_predict( self._estimator, X_combined, y_combined, cv=cv, method="predict", ) # Compute AUC from sklearn.metrics import roc_auc_score self.auc_score_ = roc_auc_score(y_combined, oof_proba) self._log(f"Adversarial validation AUC: {self.auc_score_:.4f}") # Fit on full data to get feature importances self._estimator.fit(X_combined, y_combined) # Extract feature importances if hasattr(self._estimator, "feature_importances_"): importances = self._estimator.feature_importances_ elif hasattr(self._estimator, "coef_"): importances = np.abs(self._estimator.coef_).ravel() else: # Fall back to permutation importance from sklearn.inspection import permutation_importance perm_result = permutation_importance( self._estimator, X_combined, y_combined, n_repeats=5, random_state=self.random_state, ) importances = perm_result.importances_mean # Normalize importances if importances.sum() > 0: importances = importances / importances.sum() self.feature_importances_ = dict(zip(columns, importances)) self.drifted_features_ = sorted( columns, key=lambda x: self.feature_importances_[x], reverse=True, ) result = AdversarialValidationResult.from_auc( auc=self.auc_score_, importances=self.feature_importances_, ) self._log(f"Drift severity: {result.drift_severity}") if result.drift_severity != "none": top_features = result.drifted_features[:5] self._log(f"Top drifting features: {top_features}") self._is_fitted = True return result
[docs] def get_test_like_samples( self, X_train: Any, y_train: Any, X_test: Any, top_pct: float = 0.2, ) -> tuple[Any, Any]: """Get training samples most similar to test distribution. Uses adversarial validation predictions to identify training samples that the classifier thinks look like test samples. Parameters ---------- X_train : array-like Training features. y_train : array-like Training labels. X_test : array-like Test features. top_pct : float, default=0.2 Top percentage of test-like samples to return. Returns ------- X_selected : array-like Selected training features. y_selected : array-like Selected training labels. """ # Run adversarial validation if not already done if not self._is_fitted or self._estimator is None: self.check_drift(X_train, X_test) X_train_arr, _ = self._to_numpy_with_columns(X_train) y_train_arr = np.asarray(y_train) # Handle NaN/inf values X_train_arr_clean = np.nan_to_num(X_train_arr, nan=0.0, posinf=0.0, neginf=0.0) # Predict probability of being "test-like" proba = self._estimator.predict_proba(X_train_arr_clean)[:, 1] # Select top samples n_select = int(len(X_train_arr) * top_pct) top_indices = np.argsort(proba)[-n_select:] # Return in original format if possible if HAS_PANDAS and isinstance(X_train, pd.DataFrame): return X_train.iloc[top_indices], y_train_arr[top_indices] if HAS_POLARS and isinstance(X_train, pl.DataFrame): return X_train[top_indices], y_train_arr[top_indices] return X_train_arr[top_indices], y_train_arr[top_indices]
[docs] def suggest_features_to_drop( self, X_train: Any, X_test: Any, max_features: int = 10, min_importance: float = 0.05, ) -> list[str]: """Suggest features to drop to reduce drift. Parameters ---------- X_train : array-like Training features. X_test : array-like Test features. max_features : int, default=10 Maximum number of features to suggest. min_importance : float, default=0.05 Minimum importance threshold. Returns ------- List[str] Features suggested for removal. """ if not self._is_fitted: self.check_drift(X_train, X_test) suggestions = [] for feature in self.drifted_features_: if len(suggestions) >= max_features: break if self.feature_importances_[feature] >= min_importance: suggestions.append(feature) return suggestions