Source code for endgame.benchmark.learning_curve

from __future__ import annotations

"""Learning Curve Experiments following LCDB protocol.

Generates learning curves at standard anchor points for systematic
comparison of sample efficiency across models.

References
----------
- Mohr et al. "LCDB 1.0: An Extensive Learning Curves Database" (2022)
"""

import gc
import time
import warnings
from dataclasses import dataclass, field

# Suppress harmless sklearn/LightGBM feature name warnings
warnings.filterwarnings(
    "ignore",
    message="X does not have valid feature names",
    category=UserWarning,
)

import numpy as np
from sklearn.base import BaseEstimator, clone, is_classifier
from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    f1_score,
    log_loss,
    mean_squared_error,
    r2_score,
    roc_auc_score,
)
from sklearn.model_selection import ShuffleSplit, StratifiedShuffleSplit
from sklearn.preprocessing import LabelEncoder

try:
    import polars as pl
    HAS_POLARS = True
except ImportError:
    HAS_POLARS = False

try:
    import pandas as pd
    HAS_PANDAS = True
except ImportError:
    HAS_PANDAS = False

from endgame.benchmark.loader import DatasetInfo, SuiteLoader, TaskType

# Default LCDB anchor points (fractions of training set)
LCDB_ANCHORS = [0.025, 0.05, 0.10, 0.20, 0.40, 0.80, 1.0]

# Reduced anchors for faster experiments
FAST_ANCHORS = [0.1, 0.25, 0.5, 1.0]


[docs] @dataclass class LearningCurveConfig: """Configuration for learning curve experiments. Parameters ---------- anchors : List[float] Training set fractions (LCDB protocol default). n_seeds : int Number of random seeds per anchor point. cv_folds : int Cross-validation folds per seed (0 = holdout only). test_fraction : float Holdout test set fraction. metrics_classification : List[str] Metrics for classification tasks. metrics_regression : List[str] Metrics for regression tasks. timeout_per_fit : int Timeout per model fit in seconds. random_state : int Base random seed. verbose : bool Enable verbose output. """ anchors: list[float] = field(default_factory=lambda: LCDB_ANCHORS.copy()) n_seeds: int = 5 cv_folds: int = 0 # 0 = holdout only (faster) test_fraction: float = 0.2 metrics_classification: list[str] = field(default_factory=lambda: [ "accuracy", "balanced_accuracy", "f1_weighted" ]) metrics_regression: list[str] = field(default_factory=lambda: [ "r2", "neg_mean_squared_error" ]) timeout_per_fit: int = 600 random_state: int = 42 verbose: bool = True
[docs] @dataclass class LearningCurveRecord: """Single learning curve data point. Attributes ---------- dataset_name : str Name of the dataset. model_name : str Name of the model. anchor : float Training set fraction. n_train : int Actual number of training samples. seed : int Random seed used. metrics : Dict[str, float] Performance metrics. fit_time : float Training time in seconds. status : str 'success' or 'error'. error_message : str, optional Error message if failed. """ dataset_name: str model_name: str anchor: float n_train: int seed: int metrics: dict[str, float] fit_time: float status: str = "success" error_message: str | None = None
[docs] @dataclass class LearningCurveResults: """Container for learning curve results with analysis methods. Attributes ---------- records : List[LearningCurveRecord] All experiment records. config : LearningCurveConfig Configuration used. """ records: list[LearningCurveRecord] = field(default_factory=list) config: LearningCurveConfig | None = None
[docs] def add_record(self, record: LearningCurveRecord): """Add a record to results.""" self.records.append(record)
[docs] def to_dataframe(self): """Convert results to DataFrame. Returns ------- DataFrame Results in tabular format. """ rows = [] for r in self.records: row = { "dataset": r.dataset_name, "model": r.model_name, "anchor": r.anchor, "n_train": r.n_train, "seed": r.seed, "fit_time": r.fit_time, "status": r.status, } row.update(r.metrics) if r.error_message: row["error"] = r.error_message rows.append(row) if HAS_POLARS: return pl.DataFrame(rows) elif HAS_PANDAS: return pd.DataFrame(rows) else: return rows
[docs] def save(self, path: str): """Save results to file. Parameters ---------- path : str Output path (.parquet, .csv, or .json). """ df = self.to_dataframe() if path.endswith('.parquet'): if HAS_POLARS: df.write_parquet(path) elif HAS_PANDAS: df.to_parquet(path, index=False) elif path.endswith('.csv'): if HAS_POLARS: df.write_csv(path) elif HAS_PANDAS: df.to_csv(path, index=False) else: raise ValueError(f"Unsupported format: {path}")
[docs] def get_learning_curve( self, dataset: str, model: str, metric: str = "accuracy", ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Get learning curve for a specific dataset/model. Parameters ---------- dataset : str Dataset name. model : str Model name. metric : str Metric to retrieve. Returns ------- anchors : ndarray Training fractions. means : ndarray Mean metric values. stds : ndarray Standard deviations. """ # Filter records filtered = [ r for r in self.records if r.dataset_name == dataset and r.model_name == model and r.status == "success" ] if not filtered: return np.array([]), np.array([]), np.array([]) # Group by anchor anchor_values = {} for r in filtered: if r.anchor not in anchor_values: anchor_values[r.anchor] = [] if metric in r.metrics: anchor_values[r.anchor].append(r.metrics[metric]) anchors = sorted(anchor_values.keys()) means = [np.mean(anchor_values[a]) for a in anchors] stds = [np.std(anchor_values[a]) for a in anchors] return np.array(anchors), np.array(means), np.array(stds)
[docs] def compute_aulc( self, dataset: str, model: str, metric: str = "accuracy", ) -> float: """Compute Area Under Learning Curve. Higher AULC indicates better sample efficiency (learns faster). Parameters ---------- dataset : str Dataset name. model : str Model name. metric : str Metric to use. Returns ------- float Area under learning curve (normalized to [0, 1]). """ anchors, means, _ = self.get_learning_curve(dataset, model, metric) if len(anchors) < 2: return 0.0 # Trapezoidal integration aulc = np.trapz(means, anchors) # Normalize by max possible area (1.0 * anchor range) max_area = anchors[-1] - anchors[0] return aulc / max_area if max_area > 0 else 0.0
[docs] def summary(self, metric: str = "accuracy") -> dict[str, dict[str, float]]: """Generate summary statistics. Parameters ---------- metric : str Primary metric for summary. Returns ------- dict Summary with AULC and final performance per model. """ # Get unique datasets and models datasets = set(r.dataset_name for r in self.records) models = set(r.model_name for r in self.records) summary = {} for model in models: model_stats = {"aulc": [], "final": []} for dataset in datasets: aulc = self.compute_aulc(dataset, model, metric) if aulc > 0: model_stats["aulc"].append(aulc) # Final performance (anchor = 1.0) anchors, means, _ = self.get_learning_curve(dataset, model, metric) if len(means) > 0: model_stats["final"].append(means[-1]) summary[model] = { "mean_aulc": np.mean(model_stats["aulc"]) if model_stats["aulc"] else 0, "mean_final": np.mean(model_stats["final"]) if model_stats["final"] else 0, "n_datasets": len(model_stats["aulc"]), } return summary
[docs] def plot_learning_curves( self, dataset: str, metric: str = "accuracy", models: list[str] | None = None, ax=None, **kwargs, ): """Plot learning curves for a dataset. Parameters ---------- dataset : str Dataset name. metric : str Metric to plot. models : List[str], optional Models to include (default: all). ax : matplotlib.axes.Axes, optional Axes to plot on. **kwargs Additional arguments to plt.plot. Returns ------- ax : matplotlib.axes.Axes The axes with the plot. """ try: import matplotlib.pyplot as plt except ImportError: raise ImportError("matplotlib required for plotting") if ax is None: _, ax = plt.subplots(figsize=(10, 6)) all_models = set(r.model_name for r in self.records if r.dataset_name == dataset) if models is None: models = sorted(all_models) else: models = [m for m in models if m in all_models] for model in models: anchors, means, stds = self.get_learning_curve(dataset, model, metric) if len(anchors) == 0: continue line, = ax.plot(anchors, means, marker='o', label=model, **kwargs) ax.fill_between( anchors, means - stds, means + stds, alpha=0.2, color=line.get_color(), ) ax.set_xlabel("Training Set Fraction") ax.set_ylabel(metric.replace("_", " ").title()) ax.set_title(f"Learning Curves: {dataset}") ax.legend() ax.grid(True, alpha=0.3) return ax
[docs] class LearningCurveExperiment: """Run learning curve experiments across datasets. Implements the LCDB (Learning Curve Database) protocol for systematic evaluation of sample efficiency. Parameters ---------- suite : str or List[DatasetInfo] Benchmark suite name or list of datasets. config : LearningCurveConfig, optional Experiment configuration. max_datasets : int, optional Maximum number of datasets. verbose : bool Enable verbose output. Examples -------- >>> from endgame.benchmark import LearningCurveExperiment, LearningCurveConfig >>> from endgame.models import LGBMWrapper >>> >>> config = LearningCurveConfig(anchors=[0.1, 0.5, 1.0], n_seeds=3) >>> exp = LearningCurveExperiment(suite="sklearn-classic", config=config) >>> >>> models = [ ... ("LGBM", LGBMWrapper(preset="fast")), ... ] >>> results = exp.run(models) >>> print(results.summary()) """ def __init__( self, suite: str | list[DatasetInfo], config: LearningCurveConfig | None = None, max_datasets: int | None = None, verbose: bool = True, ): self.suite = suite self.config = config or LearningCurveConfig() self.max_datasets = max_datasets self.verbose = verbose self._datasets: list[DatasetInfo] = [] def _log(self, msg: str): """Print message if verbose.""" if self.verbose: print(f"[LearningCurve] {msg}") def _load_datasets(self) -> list[DatasetInfo]: """Load datasets from suite.""" if isinstance(self.suite, list): return self.suite[:self.max_datasets] if self.max_datasets else self.suite loader = SuiteLoader( suite=self.suite, max_datasets=self.max_datasets, random_state=self.config.random_state, verbose=self.verbose, ) return list(loader.load()) def _subsample_data( self, X: np.ndarray, y: np.ndarray, fraction: float, seed: int, stratify: bool = True, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Stratified subsampling for anchor point. Parameters ---------- X : ndarray Features. y : ndarray Targets. fraction : float Fraction to use for training. seed : int Random seed. stratify : bool Use stratified sampling. Returns ------- X_train, X_test, y_train, y_test """ n_samples = len(X) n_train = max(10, int(n_samples * fraction * (1 - self.config.test_fraction))) n_test = max(10, int(n_samples * self.config.test_fraction)) # Ensure we don't exceed available samples if n_train + n_test > n_samples: n_test = max(10, n_samples // 5) n_train = n_samples - n_test if stratify and len(np.unique(y)) > 1: splitter = StratifiedShuffleSplit( n_splits=1, test_size=n_test, train_size=n_train, random_state=seed, ) else: splitter = ShuffleSplit( n_splits=1, test_size=n_test, train_size=n_train, random_state=seed, ) train_idx, test_idx = next(splitter.split(X, y)) return X[train_idx], X[test_idx], y[train_idx], y[test_idx] def _evaluate_model( self, model: BaseEstimator, X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, y_test: np.ndarray, is_classification: bool, ) -> tuple[dict[str, float], float]: """Fit model and compute metrics. Returns ------- metrics : dict Performance metrics. fit_time : float Training time. """ start = time.time() with warnings.catch_warnings(): warnings.simplefilter("ignore") model.fit(X_train, y_train) fit_time = time.time() - start metrics = {} if is_classification: y_pred = model.predict(X_test) metrics["accuracy"] = accuracy_score(y_test, y_pred) metrics["balanced_accuracy"] = balanced_accuracy_score(y_test, y_pred) n_classes = len(np.unique(y_test)) if n_classes == 2: metrics["f1"] = f1_score(y_test, y_pred, zero_division=0) else: metrics["f1_weighted"] = f1_score( y_test, y_pred, average="weighted", zero_division=0 ) # Probability-based metrics if hasattr(model, "predict_proba"): try: y_proba = model.predict_proba(X_test) if n_classes == 2: metrics["roc_auc"] = roc_auc_score(y_test, y_proba[:, 1]) else: metrics["roc_auc_ovr"] = roc_auc_score( y_test, y_proba, multi_class="ovr", average="weighted" ) metrics["log_loss"] = log_loss(y_test, y_proba) except Exception: pass else: y_pred = model.predict(X_test) metrics["r2"] = r2_score(y_test, y_pred) metrics["mse"] = mean_squared_error(y_test, y_pred) metrics["rmse"] = np.sqrt(metrics["mse"]) return metrics, fit_time
[docs] def run( self, models: list[tuple[str, BaseEstimator]], output_file: str | None = None, continue_on_error: bool = True, ) -> LearningCurveResults: """Run learning curve experiments. Parameters ---------- models : List[Tuple[str, BaseEstimator]] List of (name, model) tuples. output_file : str, optional Path to save results. continue_on_error : bool Continue if a model fails. Returns ------- LearningCurveResults Experiment results. """ self._log("Starting learning curve experiment") self._log(f"Anchors: {self.config.anchors}") self._log(f"Seeds per anchor: {self.config.n_seeds}") # Load datasets self._datasets = self._load_datasets() self._log(f"Loaded {len(self._datasets)} datasets") results = LearningCurveResults(config=self.config) total_experiments = ( len(self._datasets) * len(models) * len(self.config.anchors) * self.config.n_seeds ) completed = 0 for dataset in self._datasets: self._log(f"\nDataset: {dataset.name} ({dataset.n_samples} samples)") is_classification = dataset.task_type != TaskType.REGRESSION # Prepare data X = dataset.X.copy() y = dataset.y.copy() if is_classification: le = LabelEncoder() y = le.fit_transform(y) for model_name, model_template in models: self._log(f" Model: {model_name}") for anchor in self.config.anchors: for seed in range(self.config.n_seeds): completed += 1 actual_seed = self.config.random_state + seed try: # Subsample data X_train, X_test, y_train, y_test = self._subsample_data( X, y, anchor, actual_seed, stratify=is_classification, ) # Clone model model = clone(model_template) # Fit and evaluate metrics, fit_time = self._evaluate_model( model, X_train, y_train, X_test, y_test, is_classification, ) record = LearningCurveRecord( dataset_name=dataset.name, model_name=model_name, anchor=anchor, n_train=len(X_train), seed=seed, metrics=metrics, fit_time=fit_time, status="success", ) if self.verbose and seed == 0: primary_metric = "accuracy" if is_classification else "r2" val = metrics.get(primary_metric, 0) self._log( f" anchor={anchor:.0%}, n={len(X_train)}: " f"{primary_metric}={val:.4f}" ) except Exception as e: record = LearningCurveRecord( dataset_name=dataset.name, model_name=model_name, anchor=anchor, n_train=0, seed=seed, metrics={}, fit_time=0.0, status="error", error_message=str(e), ) if not continue_on_error: raise if self.verbose: self._log(f" ERROR at anchor={anchor:.0%}: {e}") results.add_record(record) # Clean up gc.collect() # Save results if output_file: results.save(output_file) self._log(f"\nResults saved to: {output_file}") self._log(f"\nExperiment complete! {len(results.records)} records.") return results
[docs] def quick_learning_curve( model: BaseEstimator, X: np.ndarray, y: np.ndarray, anchors: list[float] | None = None, n_seeds: int = 3, test_fraction: float = 0.2, random_state: int = 42, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Quick learning curve for a single model/dataset. Parameters ---------- model : BaseEstimator Model to evaluate. X : ndarray Features. y : ndarray Targets. anchors : List[float], optional Training fractions. n_seeds : int Seeds per anchor. test_fraction : float Test set fraction. random_state : int Random seed. Returns ------- anchors : ndarray Training fractions. means : ndarray Mean accuracies. stds : ndarray Standard deviations. """ if anchors is None: anchors = FAST_ANCHORS is_clf = is_classifier(model) results = {a: [] for a in anchors} for anchor in anchors: for seed in range(n_seeds): actual_seed = random_state + seed # Split data n_samples = len(X) n_test = max(10, int(n_samples * test_fraction)) n_train = max(10, int(n_samples * anchor * (1 - test_fraction))) if is_clf: splitter = StratifiedShuffleSplit( n_splits=1, test_size=n_test, train_size=n_train, random_state=actual_seed, ) else: splitter = ShuffleSplit( n_splits=1, test_size=n_test, train_size=n_train, random_state=actual_seed, ) train_idx, test_idx = next(splitter.split(X, y)) # Fit and score m = clone(model) m.fit(X[train_idx], y[train_idx]) if is_clf: score = accuracy_score(y[test_idx], m.predict(X[test_idx])) else: score = r2_score(y[test_idx], m.predict(X[test_idx])) results[anchor].append(score) anchors_arr = np.array(anchors) means = np.array([np.mean(results[a]) for a in anchors]) stds = np.array([np.std(results[a]) for a in anchors]) return anchors_arr, means, stds