Source code for endgame.benchmark.metalearner
from __future__ import annotations
"""Meta-learning for automatic model selection.
Uses benchmark results and meta-features to predict optimal models/pipelines
for new datasets.
"""
from dataclasses import dataclass, field
from typing import Any
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder, StandardScaler
from endgame.benchmark.profiler import MetaFeatureSet, MetaProfiler
from endgame.benchmark.tracker import ExperimentTracker
try:
import polars as pl
HAS_POLARS = True
except ImportError:
HAS_POLARS = False
@dataclass
class ModelRecommendation:
"""Recommendation for a model/pipeline.
Attributes
----------
model_name : str
Recommended model name.
confidence : float
Confidence score (0-1).
predicted_score : float
Predicted performance score.
reasoning : str
Explanation for the recommendation.
alternatives : List[Tuple[str, float]]
Alternative models with their scores.
similar_datasets : List[str]
Most similar datasets from training.
"""
model_name: str
confidence: float = 0.0
predicted_score: float = 0.0
reasoning: str = ""
alternatives: list[tuple[str, float]] = field(default_factory=list)
similar_datasets: list[str] = field(default_factory=list)
[docs]
class MetaLearner:
"""Learn to predict optimal models from dataset meta-features.
Trains a meta-model that predicts which model will perform best
on a new dataset based on its meta-features.
Parameters
----------
approach : str, default="ranking"
Meta-learning approach:
- "ranking": Predict model rankings
- "classification": Predict best model (classification)
- "regression": Predict model scores (regression)
base_estimator : BaseEstimator, optional
Base model for meta-learning. If None, uses RandomForest.
metric : str, default="accuracy"
Target metric to optimize.
n_top_models : int, default=3
Number of top models to consider for recommendations.
random_state : int, default=42
Random seed.
verbose : bool, default=False
Enable verbose output.
Examples
--------
>>> # Train meta-learner from benchmark results
>>> meta_learner = MetaLearner()
>>> meta_learner.fit(tracker)
>>>
>>> # Get recommendation for new dataset
>>> recommendation = meta_learner.recommend(X_new, y_new)
>>> print(f"Best model: {recommendation.model_name}")
"""
def __init__(
self,
approach: str = "ranking",
base_estimator: BaseEstimator | None = None,
metric: str = "accuracy",
n_top_models: int = 3,
random_state: int = 42,
verbose: bool = False,
):
self.approach = approach
self.base_estimator = base_estimator
self.metric = metric
self.n_top_models = n_top_models
self.random_state = random_state
self.verbose = verbose
self._meta_model = None
self._scaler = StandardScaler()
self._label_encoder = LabelEncoder()
self._feature_names: list[str] = []
self._model_names: list[str] = []
self._profiler = MetaProfiler(
groups=["simple", "statistical"],
random_state=random_state,
verbose=False,
)
# Training data storage
self._X_meta: np.ndarray | None = None
self._y_meta: np.ndarray | None = None
self._dataset_features: dict[str, np.ndarray] = {}
self._is_fitted = False
def _log(self, message: str) -> None:
"""Print message if verbose."""
if self.verbose:
print(f"[MetaLearner] {message}")
[docs]
def fit(
self,
tracker: ExperimentTracker,
metric: str | None = None,
) -> MetaLearner:
"""Fit meta-learner from benchmark results.
Parameters
----------
tracker : ExperimentTracker
Tracker containing benchmark results.
metric : str, optional
Override target metric.
Returns
-------
self
"""
metric = metric or self.metric
metric_col = f"metric_{metric}"
df = tracker.to_dataframe()
# Get successful experiments
if HAS_POLARS and isinstance(df, pl.DataFrame):
df = df.filter(pl.col("status") == "success")
# Get unique models and datasets
model_names = df["model_name"].unique().to_list()
dataset_names = df["dataset_name"].unique().to_list()
# Get meta-feature columns
mf_cols = sorted([c for c in df.columns if c.startswith("mf_")])
else:
df = df[df["status"] == "success"]
model_names = df["model_name"].unique().tolist()
dataset_names = df["dataset_name"].unique().tolist()
mf_cols = sorted([c for c in df.columns if c.startswith("mf_")])
if not mf_cols:
raise ValueError("No meta-features found in tracker. Enable profile_datasets=True in BenchmarkRunner.")
self._model_names = model_names
self._feature_names = [c[3:] for c in mf_cols] # Remove "mf_" prefix
self._log(f"Building meta-learning dataset from {len(dataset_names)} datasets, {len(model_names)} models")
self._log(f"Meta-features: {len(mf_cols)}")
# Build training data based on approach
if self.approach == "classification":
X_meta, y_meta = self._build_classification_data(df, mf_cols, metric_col)
elif self.approach == "regression":
X_meta, y_meta = self._build_regression_data(df, mf_cols, metric_col)
else: # ranking
X_meta, y_meta = self._build_ranking_data(df, mf_cols, metric_col)
if len(X_meta) == 0:
raise ValueError("No valid training samples")
# Scale features
X_meta = np.nan_to_num(X_meta, nan=0.0)
X_scaled = self._scaler.fit_transform(X_meta)
self._X_meta = X_scaled
self._y_meta = y_meta
# Store dataset features for similarity computation
if HAS_POLARS and isinstance(df, pl.DataFrame):
for dataset_name in dataset_names:
ds_row = df.filter(pl.col("dataset_name") == dataset_name).head(1)
if len(ds_row) > 0:
features = np.array([
float(ds_row[col][0]) if ds_row[col][0] is not None else 0.0
for col in mf_cols
])
self._dataset_features[dataset_name] = features
else:
for dataset_name in dataset_names:
ds_row = df[df["dataset_name"] == dataset_name].iloc[0]
features = np.array([
float(ds_row[col]) if ds_row[col] is not None and not np.isnan(ds_row[col]) else 0.0
for col in mf_cols
])
self._dataset_features[dataset_name] = features
# Create and train meta-model
if self.base_estimator is not None:
self._meta_model = self.base_estimator
else:
if self.approach == "classification":
self._meta_model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=self.random_state,
n_jobs=-1,
)
else:
self._meta_model = RandomForestRegressor(
n_estimators=100,
max_depth=10,
random_state=self.random_state,
n_jobs=-1,
)
self._log(f"Training meta-model ({type(self._meta_model).__name__})...")
self._meta_model.fit(X_scaled, y_meta)
# Compute CV score of meta-model
if len(X_scaled) >= 5:
cv_scores = cross_val_score(
self._meta_model,
X_scaled,
y_meta,
cv=min(5, len(X_scaled)),
)
self._log(f"Meta-model CV score: {np.mean(cv_scores):.4f} (+/- {np.std(cv_scores):.4f})")
self._is_fitted = True
return self
def _build_classification_data(
self,
df,
mf_cols: list[str],
metric_col: str,
) -> tuple[np.ndarray, np.ndarray]:
"""Build data for classification approach (predict best model)."""
X_list = []
y_list = []
if HAS_POLARS and isinstance(df, pl.DataFrame):
dataset_names = df["dataset_name"].unique().to_list()
for dataset_name in dataset_names:
ds_df = df.filter(pl.col("dataset_name") == dataset_name)
if len(ds_df) == 0:
continue
# Get meta-features
features = np.array([
float(ds_df[col][0]) if ds_df[col][0] is not None else 0.0
for col in mf_cols
])
# Find best model
scores = ds_df[metric_col].to_numpy()
models = ds_df["model_name"].to_list()
valid_mask = ~np.isnan(scores)
if not np.any(valid_mask):
continue
best_idx = np.nanargmax(scores)
best_model = models[best_idx]
X_list.append(features)
y_list.append(best_model)
else:
dataset_names = df["dataset_name"].unique().tolist()
for dataset_name in dataset_names:
ds_df = df[df["dataset_name"] == dataset_name]
if len(ds_df) == 0:
continue
# Get meta-features
features = np.array([
float(ds_df[col].iloc[0]) if not np.isnan(ds_df[col].iloc[0]) else 0.0
for col in mf_cols
])
# Find best model
scores = ds_df[metric_col].values
models = ds_df["model_name"].tolist()
valid_mask = ~np.isnan(scores)
if not np.any(valid_mask):
continue
best_idx = np.nanargmax(scores)
best_model = models[best_idx]
X_list.append(features)
y_list.append(best_model)
X = np.array(X_list)
y = self._label_encoder.fit_transform(y_list)
return X, y
def _build_regression_data(
self,
df,
mf_cols: list[str],
metric_col: str,
) -> tuple[np.ndarray, np.ndarray]:
"""Build data for regression approach (predict score per model)."""
X_list = []
y_list = []
# For each (dataset, model) pair, predict the score
if HAS_POLARS and isinstance(df, pl.DataFrame):
for row in df.iter_rows(named=True):
features = np.array([
float(row[col]) if row[col] is not None else 0.0
for col in mf_cols
])
# Add model indicator (one-hot encoded)
model_idx = self._model_names.index(row["model_name"])
model_indicator = np.zeros(len(self._model_names))
model_indicator[model_idx] = 1
X_list.append(np.concatenate([features, model_indicator]))
score = row[metric_col]
y_list.append(score if score is not None else 0.0)
else:
for _, row in df.iterrows():
features = np.array([
float(row[col]) if not np.isnan(row[col]) else 0.0
for col in mf_cols
])
model_idx = self._model_names.index(row["model_name"])
model_indicator = np.zeros(len(self._model_names))
model_indicator[model_idx] = 1
X_list.append(np.concatenate([features, model_indicator]))
score = row[metric_col]
y_list.append(score if not np.isnan(score) else 0.0)
return np.array(X_list), np.array(y_list)
def _build_ranking_data(
self,
df,
mf_cols: list[str],
metric_col: str,
) -> tuple[np.ndarray, np.ndarray]:
"""Build data for ranking approach (predict relative ranks)."""
# Same as classification but with rank as target
X_list = []
y_list = []
if HAS_POLARS and isinstance(df, pl.DataFrame):
dataset_names = df["dataset_name"].unique().to_list()
for dataset_name in dataset_names:
ds_df = df.filter(pl.col("dataset_name") == dataset_name)
if len(ds_df) == 0:
continue
features = np.array([
float(ds_df[col][0]) if ds_df[col][0] is not None else 0.0
for col in mf_cols
])
scores = ds_df[metric_col].to_numpy()
models = ds_df["model_name"].to_list()
# Compute ranks (1 = best)
valid_mask = ~np.isnan(scores)
if not np.any(valid_mask):
continue
# Get best model (rank 1)
best_idx = np.nanargmax(scores)
best_model = models[best_idx]
X_list.append(features)
y_list.append(best_model)
else:
dataset_names = df["dataset_name"].unique().tolist()
for dataset_name in dataset_names:
ds_df = df[df["dataset_name"] == dataset_name]
if len(ds_df) == 0:
continue
features = np.array([
float(ds_df[col].iloc[0]) if not np.isnan(ds_df[col].iloc[0]) else 0.0
for col in mf_cols
])
scores = ds_df[metric_col].values
models = ds_df["model_name"].tolist()
valid_mask = ~np.isnan(scores)
if not np.any(valid_mask):
continue
best_idx = np.nanargmax(scores)
best_model = models[best_idx]
X_list.append(features)
y_list.append(best_model)
X = np.array(X_list)
y = self._label_encoder.fit_transform(y_list)
return X, y
[docs]
def recommend(
self,
X: np.ndarray,
y: np.ndarray,
categorical_indicator: list[bool] | None = None,
task_type: str = "classification",
) -> ModelRecommendation:
"""Get model recommendation for a new dataset.
Parameters
----------
X : np.ndarray
Feature matrix.
y : np.ndarray
Target variable.
categorical_indicator : List[bool], optional
Boolean mask for categorical features.
task_type : str, default="classification"
Task type: "classification" or "regression".
Returns
-------
ModelRecommendation
Recommended model with confidence and alternatives.
"""
if not self._is_fitted:
raise RuntimeError("MetaLearner must be fitted before calling recommend()")
# Extract meta-features
meta_features = self._profiler.profile(
X, y,
categorical_indicator=categorical_indicator,
task_type=task_type,
)
return self.recommend_from_features(meta_features)
[docs]
def recommend_from_features(
self,
meta_features: MetaFeatureSet | dict[str, float],
) -> ModelRecommendation:
"""Get recommendation from pre-computed meta-features.
Parameters
----------
meta_features : MetaFeatureSet or Dict
Pre-computed meta-features.
Returns
-------
ModelRecommendation
Recommended model.
"""
if not self._is_fitted:
raise RuntimeError("MetaLearner must be fitted before calling recommend_from_features()")
if isinstance(meta_features, MetaFeatureSet):
features_dict = meta_features.to_dict()
else:
features_dict = meta_features
# Build feature vector
X = np.array([features_dict.get(f, 0.0) for f in self._feature_names])
X = np.nan_to_num(X, nan=0.0).reshape(1, -1)
X_scaled = self._scaler.transform(X)
# Get prediction
if self.approach == "classification" or self.approach == "ranking":
# Predict probabilities for each model
if hasattr(self._meta_model, "predict_proba"):
probs = self._meta_model.predict_proba(X_scaled)[0]
else:
# Use decision function or default
pred = self._meta_model.predict(X_scaled)[0]
probs = np.zeros(len(self._label_encoder.classes_))
# Convert predicted class to index
pred_idx = int(pred) if isinstance(pred, (int, np.integer)) else 0
if pred_idx < len(probs):
probs[pred_idx] = 1.0
# Get top models
top_indices = np.argsort(probs)[::-1][:self.n_top_models]
best_idx = top_indices[0]
best_model = self._label_encoder.inverse_transform([best_idx])[0]
confidence = float(probs[best_idx])
alternatives = [
(self._label_encoder.inverse_transform([idx])[0], float(probs[idx]))
for idx in top_indices[1:]
]
else:
# Regression approach: predict score for each model
predictions = []
for i, model_name in enumerate(self._model_names):
model_indicator = np.zeros(len(self._model_names))
model_indicator[i] = 1
X_with_model = np.concatenate([X_scaled[0], model_indicator]).reshape(1, -1)
pred = self._meta_model.predict(X_with_model)[0]
predictions.append((model_name, pred))
# Sort by predicted score
predictions.sort(key=lambda x: x[1], reverse=True)
best_model = predictions[0][0]
confidence = 1.0 / (1 + len(self._model_names)) # Lower confidence for regression
alternatives = predictions[1:self.n_top_models]
# Find similar datasets
similar = self._find_similar_datasets(X[0], n=3)
# Build reasoning
reasoning = self._build_reasoning(
best_model,
confidence,
features_dict,
similar,
)
return ModelRecommendation(
model_name=best_model,
confidence=confidence,
predicted_score=confidence, # Approximate
reasoning=reasoning,
alternatives=alternatives,
similar_datasets=[s[0] for s in similar],
)
def _find_similar_datasets(
self,
features: np.ndarray,
n: int = 3,
) -> list[tuple[str, float]]:
"""Find most similar datasets from training data."""
features = np.nan_to_num(features, nan=0.0)
features_scaled = self._scaler.transform(features.reshape(1, -1))[0]
similarities = []
for dataset_name, ds_features in self._dataset_features.items():
ds_features_scaled = self._scaler.transform(ds_features.reshape(1, -1))[0]
# Cosine similarity
norm1 = np.linalg.norm(features_scaled)
norm2 = np.linalg.norm(ds_features_scaled)
if norm1 > 0 and norm2 > 0:
similarity = np.dot(features_scaled, ds_features_scaled) / (norm1 * norm2)
else:
similarity = 0.0
similarities.append((dataset_name, float(similarity)))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:n]
def _build_reasoning(
self,
model_name: str,
confidence: float,
features: dict[str, float],
similar: list[tuple[str, float]],
) -> str:
"""Build human-readable reasoning for recommendation."""
lines = [f"Recommended: {model_name} (confidence: {confidence:.2f})"]
# Dataset characteristics
n_samples = features.get("nr_inst", 0)
n_features = features.get("nr_attr", 0)
lines.append(f"Dataset: {int(n_samples)} samples, {int(n_features)} features")
# Similar datasets
if similar:
similar_str = ", ".join([f"{name} ({sim:.2f})" for name, sim in similar[:2]])
lines.append(f"Similar to: {similar_str}")
return " | ".join(lines)
[docs]
def get_feature_importances(self) -> dict[str, float]:
"""Get feature importances from meta-model.
Returns
-------
Dict[str, float]
Feature name to importance mapping.
"""
if not self._is_fitted:
raise RuntimeError("MetaLearner must be fitted first")
if hasattr(self._meta_model, "feature_importances_"):
importances = self._meta_model.feature_importances_
# Handle regression approach (includes model indicators)
if len(importances) > len(self._feature_names):
importances = importances[:len(self._feature_names)]
return dict(sorted(
zip(self._feature_names, importances),
key=lambda x: x[1],
reverse=True,
))
return {}
[docs]
class PipelineRecommender:
"""Recommend complete pipelines (preprocessing + model) for new datasets.
Extends MetaLearner to recommend full preprocessing pipelines
in addition to models.
Parameters
----------
meta_learner : MetaLearner, optional
Pre-trained meta-learner.
preprocessing_options : List[str], default=["none", "scaling", "imputation"]
Available preprocessing options.
verbose : bool, default=False
Enable verbose output.
Examples
--------
>>> recommender = PipelineRecommender()
>>> recommender.fit(tracker)
>>> pipeline = recommender.recommend_pipeline(X, y)
>>> print(pipeline)
"""
def __init__(
self,
meta_learner: MetaLearner | None = None,
preprocessing_options: list[str] | None = None,
verbose: bool = False,
):
self.meta_learner = meta_learner or MetaLearner(verbose=verbose)
self.preprocessing_options = preprocessing_options or [
"none",
"standard_scaling",
"robust_scaling",
"imputation",
"imputation+scaling",
]
self.verbose = verbose
[docs]
def fit(
self,
tracker: ExperimentTracker,
**kwargs,
) -> PipelineRecommender:
"""Fit recommender from benchmark results."""
self.meta_learner.fit(tracker, **kwargs)
return self
[docs]
def recommend_pipeline(
self,
X: np.ndarray,
y: np.ndarray,
categorical_indicator: list[bool] | None = None,
task_type: str = "classification",
) -> dict[str, Any]:
"""Recommend a complete pipeline.
Parameters
----------
X : np.ndarray
Feature matrix.
y : np.ndarray
Target variable.
categorical_indicator : List[bool], optional
Boolean mask for categorical features.
task_type : str
Task type.
Returns
-------
Dict[str, Any]
Pipeline recommendation with model and preprocessing.
"""
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import RobustScaler, StandardScaler
# Get model recommendation
model_rec = self.meta_learner.recommend(
X, y,
categorical_indicator=categorical_indicator,
task_type=task_type,
)
# Analyze data characteristics
has_missing = np.any(np.isnan(X))
has_outliers = self._detect_outliers(X)
# Choose preprocessing
preprocessing_steps = []
if has_missing:
preprocessing_steps.append(("imputer", SimpleImputer(strategy="median")))
if has_outliers:
preprocessing_steps.append(("scaler", RobustScaler()))
else:
preprocessing_steps.append(("scaler", StandardScaler()))
return {
"model_name": model_rec.model_name,
"model_confidence": model_rec.confidence,
"preprocessing": preprocessing_steps,
"alternatives": model_rec.alternatives,
"reasoning": model_rec.reasoning,
"has_missing": has_missing,
"has_outliers": has_outliers,
}
def _detect_outliers(self, X: np.ndarray, threshold: float = 0.1) -> bool:
"""Detect if dataset has significant outliers."""
X_clean = np.nan_to_num(X, nan=0.0)
outlier_count = 0
total = X_clean.shape[0] * X_clean.shape[1]
for col in range(X_clean.shape[1]):
q1, q3 = np.percentile(X_clean[:, col], [25, 75])
iqr = q3 - q1
if iqr > 0:
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
outlier_count += np.sum((X_clean[:, col] < lower) | (X_clean[:, col] > upper))
return (outlier_count / total) > threshold