from __future__ import annotations
"""Benchmark runner for systematic model evaluation.
Orchestrates experiments across multiple datasets and models/pipelines.
"""
import gc
import hashlib
import json
import multiprocessing
import os
import time
import warnings
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FuturesTimeoutError
from dataclasses import dataclass, field
from typing import Any
# Suppress harmless sklearn/LightGBM feature name warnings
warnings.filterwarnings(
"ignore",
message="X does not have valid feature names",
category=UserWarning,
)
import numpy as np
from sklearn.base import BaseEstimator, clone, is_classifier, is_regressor
from sklearn.metrics import (
brier_score_loss,
log_loss,
make_scorer,
matthews_corrcoef,
roc_auc_score,
)
from sklearn.model_selection import KFold, StratifiedKFold, cross_validate
from sklearn.preprocessing import LabelEncoder
def _specificity_score(y_true, y_pred):
"""Compute specificity (true negative rate) for binary classification."""
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_true, y_pred)
if cm.shape == (2, 2):
tn, fp, fn, tp = cm.ravel()
return tn / (tn + fp) if (tn + fp) > 0 else 0.0
return 0.0
def _fpr_score(y_true, y_pred):
"""Compute false positive rate for binary classification."""
return 1.0 - _specificity_score(y_true, y_pred)
from endgame.benchmark.loader import DatasetInfo, SuiteLoader, TaskType
from endgame.benchmark.profiler import MetaFeatureSet, MetaProfiler
from endgame.benchmark.tracker import (
ExperimentRecord,
ExperimentTracker,
get_experiment_hash,
serialize_pipeline,
)
try:
import polars as pl
HAS_POLARS = True
except ImportError:
HAS_POLARS = False
try:
import pandas as _pd_check # noqa: F401
HAS_PANDAS = True
except ImportError:
HAS_PANDAS = False
class TimeoutException(Exception):
"""Exception raised when a model training exceeds the time limit."""
pass
def _run_func_in_process(func: Callable, result_queue: multiprocessing.Queue, *args, **kwargs):
"""Worker function to run in a subprocess."""
try:
result = func(*args, **kwargs)
result_queue.put(("success", result))
except Exception as e:
result_queue.put(("error", str(e)))
def _run_with_timeout(func: Callable, timeout: int, *args, **kwargs) -> Any:
"""Run a function with a timeout using multiprocessing.
This implementation actually terminates the computation when timeout is reached,
unlike ThreadPoolExecutor which only stops waiting but lets the thread continue.
Parameters
----------
func : Callable
Function to run.
timeout : int
Timeout in seconds.
*args, **kwargs
Arguments to pass to the function.
Returns
-------
Any
Result of the function.
Raises
------
TimeoutException
If the function exceeds the timeout.
Exception
If the function raises an exception.
"""
# First try with ThreadPoolExecutor (faster, works for most cases)
# Fall back to multiprocessing only if needed for hard timeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except FuturesTimeoutError:
# Thread is still running but we hit timeout
# For now, just raise - the thread will eventually complete or be cleaned up
# A more aggressive approach would use multiprocessing, but that has
# serialization issues with sklearn models
raise TimeoutException(f"Exceeded timeout of {timeout}s")
def _get_default_cache_dir() -> str:
"""Get default cache directory for meta-features."""
# Use XDG_CACHE_HOME if available, otherwise ~/.cache
cache_home = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache"))
return os.path.join(cache_home, "endgame", "meta_features")
[docs]
@dataclass
class BenchmarkConfig:
"""Configuration for benchmark runs.
Attributes
----------
suite : str
Benchmark suite name or list of task IDs.
max_datasets : int, optional
Maximum number of datasets to run.
max_samples : int, optional
Maximum samples per dataset.
cv_folds : int
Number of cross-validation folds.
scoring_classification : List[str]
Metrics for classification tasks.
scoring_regression : List[str]
Metrics for regression tasks.
profile_datasets : bool
Whether to extract meta-features.
profile_groups : List[str]
Meta-feature groups to extract.
cache_meta_features : bool
Whether to cache meta-features to disk.
meta_features_cache_dir : str, optional
Directory to cache meta-features. Defaults to ~/.cache/endgame/meta_features.
timeout_per_fit : int
Timeout per model fit in seconds.
n_jobs : int
Number of parallel jobs for CV.
random_state : int
Random seed.
verbose : bool
Enable verbose output.
skip_completed : bool
Skip experiments that already succeeded.
"""
suite: str = "sklearn-classic"
max_datasets: int | None = None
max_samples: int | None = None
cv_folds: int = 5
scoring_classification: list[str] = field(
default_factory=lambda: ["accuracy", "f1_weighted", "roc_auc_ovr_weighted"]
)
scoring_regression: list[str] = field(
default_factory=lambda: ["r2", "neg_mean_squared_error", "neg_mean_absolute_error"]
)
profile_datasets: bool = True
profile_groups: list[str] = field(
default_factory=lambda: ["simple", "statistical"]
)
cache_meta_features: bool = True
meta_features_cache_dir: str | None = None
timeout_per_fit: int = 300 # 5 minutes
n_jobs: int = 1
random_state: int = 42
verbose: bool = True
skip_completed: bool = True # Skip experiments that already succeeded
[docs]
class BenchmarkRunner:
"""Run systematic benchmarks across datasets and models.
Orchestrates the complete benchmark workflow:
1. Load datasets from benchmark suite
2. Profile datasets (extract meta-features)
3. Run cross-validation for each model on each dataset
4. Record results with full provenance
Parameters
----------
suite : str, default="sklearn-classic"
Benchmark suite name.
config : BenchmarkConfig, optional
Full configuration object.
max_datasets : int, optional
Override maximum number of datasets.
fast_run : bool, default=False
Quick run with reduced settings.
verbose : bool, default=True
Enable verbose output.
**kwargs
Additional configuration parameters.
Examples
--------
>>> from sklearn.ensemble import RandomForestClassifier
>>> from sklearn.linear_model import LogisticRegression
>>>
>>> models = [
... ("RF", RandomForestClassifier(n_estimators=100, random_state=42)),
... ("LR", LogisticRegression(max_iter=1000)),
... ]
>>>
>>> runner = BenchmarkRunner(suite="sklearn-classic")
>>> results = runner.run(models)
>>> print(results.summary())
>>>
>>> # Save results
>>> results.save("benchmark_results.parquet")
"""
def __init__(
self,
suite: str = "sklearn-classic",
config: BenchmarkConfig | None = None,
max_datasets: int | None = None,
fast_run: bool = False,
verbose: bool = True,
**kwargs,
):
if config is not None:
self.config = config
else:
self.config = BenchmarkConfig(
suite=suite,
max_datasets=max_datasets,
verbose=verbose,
**kwargs,
)
# Fast run overrides
if fast_run:
self.config.max_datasets = min(self.config.max_datasets or 5, 5)
self.config.max_samples = 1000
self.config.cv_folds = 3
self.config.profile_groups = ["simple"]
self.verbose = verbose
self._tracker = ExperimentTracker(name=f"benchmark_{suite}")
self._profiler = MetaProfiler(
groups=self.config.profile_groups,
random_state=self.config.random_state,
verbose=False,
)
# Meta-feature cache directory
self._cache_dir = self.config.meta_features_cache_dir or _get_default_cache_dir()
if self.config.cache_meta_features:
os.makedirs(self._cache_dir, exist_ok=True)
# Results cache
self._datasets: list[DatasetInfo] = []
self._meta_features: dict[str, MetaFeatureSet] = {}
self._completed_config_hashes: set = set() # Set of config hashes for completed experiments
def _load_completed_experiments(self, output_file: str) -> None:
"""Load completed experiments from existing results file.
Parameters
----------
output_file : str
Path to the results file.
"""
if not os.path.exists(output_file):
return
try:
if HAS_POLARS:
df = pl.read_parquet(output_file)
# Filter for successful experiments only
successful = df.filter(pl.col('status') == 'success')
# Load config hashes for completed experiments
if 'config_hash' in successful.columns:
for row in successful.iter_rows(named=True):
if row.get('config_hash'):
self._completed_config_hashes.add(row['config_hash'])
elif HAS_PANDAS:
import pandas as pd
df = pd.read_parquet(output_file)
successful = df[df['status'] == 'success']
if 'config_hash' in successful.columns:
for _, row in successful.iterrows():
if row.get('config_hash'):
self._completed_config_hashes.add(row['config_hash'])
if self._completed_config_hashes:
self._log(f"Loaded {len(self._completed_config_hashes)} completed experiment configs from {output_file}")
except Exception as e:
self._log(f"Warning: Could not load existing results: {e}")
def _log(self, message: str) -> None:
"""Print message if verbose."""
if self.verbose:
print(f"[BenchmarkRunner] {message}")
def _get_dataset_cache_key(self, dataset: DatasetInfo) -> str:
"""Generate a cache key for a dataset based on its properties.
The key includes dataset name, shape, and a hash of the data to detect changes.
"""
# Create a fingerprint from dataset properties
fingerprint_data = {
"name": dataset.name,
"n_samples": dataset.n_samples,
"n_features": dataset.n_features,
"task_type": dataset.task_type.value,
"profile_groups": sorted(self.config.profile_groups),
}
# Add a hash of the first few rows for data identity
if dataset.X is not None and len(dataset.X) > 0:
# Sample a small portion to compute hash quickly
sample_size = min(100, len(dataset.X))
data_sample = dataset.X[:sample_size].tobytes() if hasattr(dataset.X, 'tobytes') else str(dataset.X[:sample_size]).encode()
fingerprint_data["data_hash"] = hashlib.md5(data_sample).hexdigest()[:16]
# Create a hash of the fingerprint
fingerprint_str = json.dumps(fingerprint_data, sort_keys=True)
cache_key = hashlib.md5(fingerprint_str.encode()).hexdigest()
return f"{dataset.name}_{cache_key[:12]}"
def _get_cached_meta_features(self, dataset: DatasetInfo) -> MetaFeatureSet | None:
"""Try to load cached meta-features for a dataset.
Returns None if not cached or cache is disabled.
"""
if not self.config.cache_meta_features:
return None
cache_key = self._get_dataset_cache_key(dataset)
cache_file = os.path.join(self._cache_dir, f"{cache_key}.json")
if not os.path.exists(cache_file):
return None
try:
with open(cache_file) as f:
cached = json.load(f)
# Reconstruct MetaFeatureSet
return MetaFeatureSet(
features=cached.get("features", {}),
groups=cached.get("groups", {}),
extraction_time=cached.get("extraction_time", 0.0),
errors=cached.get("errors", []),
)
except (OSError, json.JSONDecodeError, KeyError):
# Cache is corrupted, delete it
try:
os.remove(cache_file)
except OSError:
pass
return None
def _cache_meta_features(self, dataset: DatasetInfo, meta_features: MetaFeatureSet) -> None:
"""Cache meta-features for a dataset to disk."""
if not self.config.cache_meta_features:
return
cache_key = self._get_dataset_cache_key(dataset)
cache_file = os.path.join(self._cache_dir, f"{cache_key}.json")
try:
cached_data = {
"features": meta_features.features,
"groups": meta_features.groups,
"extraction_time": meta_features.extraction_time,
"errors": meta_features.errors,
"dataset_name": dataset.name,
"n_samples": dataset.n_samples,
"n_features": dataset.n_features,
}
with open(cache_file, 'w') as f:
json.dump(cached_data, f, indent=2)
except (OSError, TypeError) as e:
self._log(f"Warning: Could not cache meta-features for {dataset.name}: {e}")
[docs]
def run(
self,
models: list[tuple[str, BaseEstimator] | tuple[str, BaseEstimator, BaseEstimator]],
output_file: str | None = None,
continue_on_error: bool = True,
) -> ExperimentTracker:
"""Run benchmark on all models and datasets.
Parameters
----------
models : List[Union[Tuple[str, BaseEstimator], Tuple[str, BaseEstimator, BaseEstimator]]]
List of model specifications. Each can be either:
- (name, estimator): Single estimator used for all tasks
- (name, classifier, regressor): Pair of estimators, classifier used for
classification tasks and regressor for regression tasks. Either can be
None to skip that task type.
output_file : str, optional
Path to save results.
continue_on_error : bool, default=True
Continue if a model fails on a dataset.
Returns
-------
ExperimentTracker
Tracker with all experiment results.
"""
self._log(f"Starting benchmark on suite: {self.config.suite}")
# Extract model names from either 2-tuple or 3-tuple format
model_names = [m[0] for m in models]
self._log(f"Models: {model_names}")
# Load completed experiments if skip_completed is enabled
if self.config.skip_completed and output_file:
self._load_completed_experiments(output_file)
# Load datasets
loader = SuiteLoader(
suite=self.config.suite,
max_datasets=self.config.max_datasets,
max_samples=self.config.max_samples,
random_state=self.config.random_state,
verbose=self.verbose,
)
self._datasets = list(loader.load())
self._log(f"Loaded {len(self._datasets)} datasets")
# Profile datasets (with caching)
if self.config.profile_datasets:
self._log("Profiling datasets...")
cached_count = 0
profiled_count = 0
for dataset in self._datasets:
# Try to load from cache first
cached_meta = self._get_cached_meta_features(dataset)
if cached_meta is not None:
self._meta_features[dataset.name] = cached_meta
cached_count += 1
continue
# Not cached, profile the dataset
try:
task_type = "classification" if dataset.task_type != TaskType.REGRESSION else "regression"
meta = self._profiler.profile(
dataset.X,
dataset.y,
categorical_indicator=dataset.categorical_indicator,
task_type=task_type,
)
self._meta_features[dataset.name] = meta
# Cache the result
self._cache_meta_features(dataset, meta)
profiled_count += 1
except Exception as e:
self._log(f"Failed to profile {dataset.name}: {e}")
self._meta_features[dataset.name] = MetaFeatureSet()
profiled_count += 1
if cached_count > 0:
self._log(f" Loaded {cached_count} from cache, profiled {profiled_count} new datasets")
# Run experiments
total_experiments = len(self._datasets) * len(models)
completed = 0
for dataset in self._datasets:
self._log(f"\nDataset: {dataset.name} ({dataset.n_samples} samples, {dataset.n_features} features)")
is_regression = dataset.task_type == TaskType.REGRESSION
task_type = "regression" if is_regression else "classification"
for model_spec in models:
completed += 1
# Parse model specification - supports both 2-tuple and 3-tuple formats
model_name = model_spec[0]
if len(model_spec) == 2:
# (name, estimator) format - use same model for all tasks
model = model_spec[1]
elif len(model_spec) == 3:
# (name, classifier, regressor) format - select based on task
classifier, regressor = model_spec[1], model_spec[2]
model = regressor if is_regression else classifier
if model is None:
self._log(f" [{completed}/{total_experiments}] Skipping {model_name} (no {task_type} variant)")
continue
else:
self._log(f" [{completed}/{total_experiments}] Skipping {model_name} (invalid model spec)")
continue
# Compute config hash for this experiment
# This includes dataset, model name, hyperparameters, and task type
try:
hyperparams = model.get_params()
except Exception:
hyperparams = {}
config_hash = get_experiment_hash(
dataset_name=dataset.name,
model_name=model_name,
hyperparameters=hyperparams,
task_type=task_type,
)
# Check if this exact experiment config was already completed successfully
if self.config.skip_completed and config_hash in self._completed_config_hashes:
self._log(f" [{completed}/{total_experiments}] Skipping {model_name} (already completed)")
continue
self._log(f" [{completed}/{total_experiments}] Running {model_name}...")
try:
record = self._run_single_experiment(
dataset=dataset,
model_name=model_name,
model=model,
)
if record.status == "success":
primary_metric = list(record.metrics.keys())[0] if record.metrics else "unknown"
primary_value = record.metrics.get(primary_metric, 0)
self._log(f" {primary_metric}: {primary_value:.4f} (fit: {record.fit_time:.2f}s)")
# Add to completed set so we skip if interrupted and restarted
self._completed_config_hashes.add(config_hash)
else:
self._log(f" FAILED: {record.error_message}")
except Exception as e:
if continue_on_error:
self._log(f" ERROR: {e}")
self._tracker.log_failure(
dataset_name=dataset.name,
model_name=model_name,
error_message=str(e),
n_samples=dataset.n_samples,
n_features=dataset.n_features,
task_type=dataset.task_type.value,
)
else:
raise
# Save results after each experiment for resumability
if output_file:
self._tracker.save(output_file, append=True, deduplicate=True)
# Clean up memory
gc.collect()
# Final save (in case no experiments ran)
if output_file:
self._log(f"\nResults saved to: {output_file}")
self._log(f"\nBenchmark complete! {len(self._tracker)} experiments recorded.")
return self._tracker
def _run_single_experiment(
self,
dataset: DatasetInfo,
model_name: str,
model: BaseEstimator,
) -> ExperimentRecord:
"""Run a single experiment (one model on one dataset)."""
# Determine task type
is_classification = dataset.task_type != TaskType.REGRESSION
# Clone model first (needed for scoring check)
try:
model_clone = clone(model)
except Exception:
# Some models don't support sklearn clone
model_clone = model
# Explicitly check if model is a classifier/regressor
# This helps with models that sklearn might misidentify
model_is_classifier = is_classifier(model_clone)
model_is_regressor = is_regressor(model_clone)
# Warn if there's a mismatch between task type and model type
if is_classification and model_is_regressor and not model_is_classifier:
self._log(f" Warning: Model {model_name} detected as regressor but task is classification")
elif not is_classification and model_is_classifier and not model_is_regressor:
self._log(f" Warning: Model {model_name} detected as classifier but task is regression")
# Get scoring - pass model to check what methods it supports
if is_classification:
scoring = self._get_classification_scoring(dataset, model_clone)
else:
scoring = self._get_regression_scoring()
# Get CV splitter
cv = self._get_cv_splitter(dataset)
# Prepare data
X = dataset.X.copy()
y = dataset.y.copy()
# Encode target for classification
if is_classification:
le = LabelEncoder()
y = le.fit_transform(y)
# Run cross-validation with timeout
start_time = time.time()
timeout = self.config.timeout_per_fit
def _run_cv():
"""Run cross-validation (wrapped for timeout)."""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return cross_validate(
model_clone,
X,
y,
cv=cv,
scoring=scoring,
return_train_score=False,
n_jobs=self.config.n_jobs,
error_score="raise",
)
try:
if timeout and timeout > 0:
cv_results = _run_with_timeout(_run_cv, timeout)
else:
cv_results = _run_cv()
fit_time = time.time() - start_time
# Compute metrics
metrics = {}
cv_scores = None
# Handle both dict and list scoring
score_names = list(scoring.keys()) if isinstance(scoring, dict) else scoring
for score_name in score_names:
key = f"test_{score_name}"
if key in cv_results:
scores = cv_results[key]
# Handle negative scores (sklearn convention)
if score_name.startswith("neg_"):
scores = -scores
metric_name = score_name[4:] # Remove "neg_" prefix
else:
metric_name = score_name
metrics[metric_name] = float(np.mean(scores))
if cv_scores is None:
cv_scores = scores.tolist()
# Get pipeline config
pipeline_config = serialize_pipeline(model)
# Get hyperparameters
hyperparameters = model.get_params()
# Get meta-features
meta_features = {}
if dataset.name in self._meta_features:
meta_features = self._meta_features[dataset.name].to_dict()
# Record experiment
record = self._tracker.log_experiment(
dataset_name=dataset.name,
dataset_id=str(dataset.openml_id) if dataset.openml_id else None,
model_name=model_name,
pipeline_config=pipeline_config,
hyperparameters=self._serialize_hyperparameters(hyperparameters),
metrics=metrics,
meta_features=meta_features,
cv_scores=cv_scores,
fit_time=fit_time,
predict_time=float(np.sum(cv_results.get("score_time", [0]))),
n_samples=dataset.n_samples,
n_features=dataset.n_features,
task_type=dataset.task_type.value,
status="success",
)
return record
except TimeoutException as e:
fit_time = time.time() - start_time
# Get meta-features even for failures
meta_features = {}
if dataset.name in self._meta_features:
meta_features = self._meta_features[dataset.name].to_dict()
error_msg = f"TIMEOUT: {e}"
record = self._tracker.log_failure(
dataset_name=dataset.name,
dataset_id=str(dataset.openml_id) if dataset.openml_id else None,
model_name=model_name,
error_message=error_msg,
n_samples=dataset.n_samples,
n_features=dataset.n_features,
task_type=dataset.task_type.value,
meta_features=meta_features,
)
return record
except Exception as e:
fit_time = time.time() - start_time
# Get meta-features even for failures
meta_features = {}
if dataset.name in self._meta_features:
meta_features = self._meta_features[dataset.name].to_dict()
record = self._tracker.log_failure(
dataset_name=dataset.name,
dataset_id=str(dataset.openml_id) if dataset.openml_id else None,
model_name=model_name,
error_message=str(e),
n_samples=dataset.n_samples,
n_features=dataset.n_features,
task_type=dataset.task_type.value,
meta_features=meta_features,
)
return record
def _get_classification_scoring(self, dataset: DatasetInfo, model: BaseEstimator | None = None) -> list[str] | dict[str, Any]:
"""Get scoring metrics for classification.
Returns either a list of scoring strings or a dict with make_scorer objects
that explicitly specify response_method to avoid sklearn's auto-detection issues.
Metrics computed:
- accuracy: Overall accuracy
- balanced_accuracy: Balanced accuracy (accounts for class imbalance)
- f1 / f1_weighted: F1 score (weighted for multiclass)
- precision / precision_weighted: Precision
- recall / recall_weighted: Recall (sensitivity/TPR)
- mcc: Matthews correlation coefficient
- roc_auc: Area under ROC curve (if predict_proba available)
- log_loss: Log loss / cross-entropy (if predict_proba available)
- brier_score: Brier score (binary only, if predict_proba available)
- specificity: True negative rate (binary only)
- fpr: False positive rate (binary only)
"""
# Check what response methods the model supports
has_predict_proba = model is not None and hasattr(model, 'predict_proba')
has_decision_function = model is not None and hasattr(model, 'decision_function')
is_binary = dataset.n_classes == 2
# Build scoring dict with explicit response_method for problematic metrics
scoring_dict = {}
# === Metrics that work with predict() ===
# Accuracy always works
scoring_dict["accuracy"] = "accuracy"
scoring_dict["balanced_accuracy"] = "balanced_accuracy"
# F1, Precision, Recall
if is_binary:
scoring_dict["f1"] = "f1"
scoring_dict["precision"] = "precision"
scoring_dict["recall"] = "recall" # Same as sensitivity/TPR
# Specificity and FPR for binary
scoring_dict["specificity"] = make_scorer(_specificity_score)
scoring_dict["fpr"] = make_scorer(_fpr_score)
else:
scoring_dict["f1_weighted"] = "f1_weighted"
scoring_dict["precision_weighted"] = "precision_weighted"
scoring_dict["recall_weighted"] = "recall_weighted"
# Matthews Correlation Coefficient - works for both binary and multiclass
scoring_dict["mcc"] = make_scorer(matthews_corrcoef)
# === Metrics that need predict_proba ===
if has_predict_proba:
# ROC-AUC
if is_binary:
scoring_dict["roc_auc"] = make_scorer(
roc_auc_score,
response_method="predict_proba",
)
# Brier score (only for binary)
def brier_binary(y_true, y_proba):
# y_proba is the probability of positive class
if y_proba.ndim == 2:
y_proba = y_proba[:, 1]
return brier_score_loss(y_true, y_proba)
scoring_dict["brier_score"] = make_scorer(
brier_binary,
response_method="predict_proba",
greater_is_better=False,
)
elif dataset.n_classes > 2 and dataset.n_classes <= 10:
# For multiclass, roc_auc_score needs additional parameters
def roc_auc_multiclass(y_true, y_score):
return roc_auc_score(y_true, y_score, multi_class='ovr', average='weighted')
scoring_dict["roc_auc_ovr_weighted"] = make_scorer(
roc_auc_multiclass,
response_method="predict_proba",
)
# Log loss (works for both binary and multiclass)
if dataset.n_classes <= 10: # Skip for high cardinality targets
scoring_dict["log_loss"] = make_scorer(
log_loss,
response_method="predict_proba",
greater_is_better=False,
)
return scoring_dict
def _get_regression_scoring(self) -> list[str]:
"""Get scoring metrics for regression."""
return ["r2", "neg_mean_squared_error", "neg_mean_absolute_error"]
def _get_cv_splitter(self, dataset: DatasetInfo):
"""Get appropriate CV splitter."""
if dataset.cv_splits:
return dataset.cv_splits[:self.config.cv_folds]
if dataset.task_type != TaskType.REGRESSION:
return StratifiedKFold(
n_splits=self.config.cv_folds,
shuffle=True,
random_state=self.config.random_state,
)
else:
return KFold(
n_splits=self.config.cv_folds,
shuffle=True,
random_state=self.config.random_state,
)
def _serialize_hyperparameters(self, params: dict[str, Any]) -> dict[str, Any]:
"""Serialize hyperparameters to JSON-compatible format."""
result = {}
for key, value in params.items():
if value is None or isinstance(value, (int, float, str, bool)):
result[key] = value
elif isinstance(value, np.ndarray):
result[key] = value.tolist()
elif isinstance(value, (list, tuple)):
result[key] = str(value)
elif hasattr(value, "__name__"):
result[key] = value.__name__
else:
result[key] = str(type(value).__name__)
return result
@property
def tracker(self) -> ExperimentTracker:
"""Get the experiment tracker."""
return self._tracker
@property
def datasets(self) -> list[DatasetInfo]:
"""Get loaded datasets."""
return self._datasets
@property
def meta_features(self) -> dict[str, MetaFeatureSet]:
"""Get extracted meta-features."""
return self._meta_features
[docs]
def get_results_dataframe(self):
"""Get results as DataFrame."""
return self._tracker.to_dataframe()
[docs]
def quick_benchmark(
model: BaseEstimator,
model_name: str = "model",
suite: str = "quick-test",
**kwargs,
) -> ExperimentTracker:
"""Quick benchmark a single model on test datasets.
Parameters
----------
model : BaseEstimator
Model to benchmark.
model_name : str, default="model"
Name for the model.
suite : str, default="quick-test"
Benchmark suite.
**kwargs
Additional arguments to BenchmarkRunner.
Returns
-------
ExperimentTracker
Results tracker.
Examples
--------
>>> from sklearn.ensemble import RandomForestClassifier
>>> results = quick_benchmark(RandomForestClassifier(), "RF")
>>> print(results.summary())
"""
runner = BenchmarkRunner(suite=suite, fast_run=True, **kwargs)
return runner.run([(model_name, model)])
[docs]
def compare_models(
models: list[tuple[str, BaseEstimator]],
suite: str = "sklearn-classic",
**kwargs,
) -> ExperimentTracker:
"""Compare multiple models on benchmark datasets.
Parameters
----------
models : List[Tuple[str, BaseEstimator]]
List of (name, model) tuples.
suite : str, default="sklearn-classic"
Benchmark suite.
**kwargs
Additional arguments to BenchmarkRunner.
Returns
-------
ExperimentTracker
Results tracker.
"""
runner = BenchmarkRunner(suite=suite, **kwargs)
return runner.run(models)