Source code for endgame.automl.base

from __future__ import annotations

"""Base classes for AutoML predictors.

This module defines the base predictor class that all domain-specific
predictors inherit from.
"""

import logging
import pickle
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Union

import numpy as np
import pandas as pd

from endgame.automl.presets import PresetConfig, get_preset
from endgame.automl.time_manager import TimeBudgetManager
from endgame.core.base import EndgameEstimator

if TYPE_CHECKING:
    from endgame.tracking.base import ExperimentLogger

logger = logging.getLogger(__name__)


# Type aliases
DataInput = Union[str, Path, pd.DataFrame, np.ndarray, dict[str, Any]]


[docs] @dataclass class FitSummary: """Summary of the fitting process. Attributes ---------- total_time : float Total training time in seconds. n_models_trained : int Number of models trained. n_models_failed : int Number of models that failed. best_model : str Name of the best model. best_score : float Score of the best model. cv_score : float Cross-validation score of the ensemble. stage_times : dict Time spent in each pipeline stage. """ total_time: float = 0.0 n_models_trained: int = 0 n_models_failed: int = 0 best_model: str = "" best_score: float = 0.0 cv_score: float = 0.0 stage_times: dict[str, float] = field(default_factory=dict)
[docs] class BasePredictor(EndgameEstimator, ABC): """Base class for all AutoML predictors. This class provides the common interface and functionality for domain-specific predictors (Tabular, Vision, Text, etc.). Parameters ---------- label : str Name of the target column. problem_type : str, default="auto" Type of problem: "classification", "regression", "multiclass", or "auto". eval_metric : str, default="auto" Evaluation metric. "auto" selects based on problem type. presets : str, default="medium_quality" Quality preset: "best_quality", "high_quality", "good_quality", "medium_quality", "fast", "interpretable". time_limit : int, optional Time limit in seconds. If None, uses preset default. search_strategy : str, default="portfolio" Search strategy: "portfolio", "heuristic", "genetic", "random", "bayesian". track_experiments : bool, default=True Whether to track experiments to the meta-learning database. output_path : str, optional Path to save outputs (models, logs, etc.). random_state : int, default=42 Random seed for reproducibility. verbosity : int, default=2 Verbosity level (0=silent, 1=progress, 2=detailed, 3=debug). logger : ExperimentLogger, optional Experiment logger for tracking params, metrics, and artifacts. When provided, fit() automatically logs training configuration and results. When None (default), no tracking overhead is added. Attributes ---------- is_fitted_ : bool Whether the predictor has been fitted. fit_summary_ : FitSummary Summary of the fitting process. problem_type_ : str Detected or specified problem type. classes_ : np.ndarray Class labels for classification problems. feature_names_ : list of str Names of input features. """ def __init__( self, label: str, problem_type: str = "auto", eval_metric: str = "auto", presets: str = "medium_quality", time_limit: int | None = None, search_strategy: str = "portfolio", track_experiments: bool = True, output_path: str | None = None, random_state: int = 42, verbosity: int = 2, logger: ExperimentLogger | None = None, ): super().__init__(random_state=random_state, verbose=verbosity > 0) self.label = label self.problem_type = problem_type self.eval_metric = eval_metric self.presets = presets self.time_limit = time_limit self.search_strategy = search_strategy self.track_experiments = track_experiments self.output_path = output_path self.verbosity = verbosity self.logger = logger # Load preset configuration self._preset_config: PresetConfig = get_preset(presets) # Set time limit from preset if not specified if self.time_limit is None: self.time_limit = self._preset_config.default_time_limit # Initialize state self.is_fitted_ = False self.fit_summary_: FitSummary | None = None self.problem_type_: str | None = None self.classes_: np.ndarray | None = None self.feature_names_: list[str] | None = None # Internal components (set during fit) self._models: dict[str, Any] = {} self._ensemble: Any | None = None self._preprocessor: Any | None = None self._calibrator: Any | None = None self._time_manager: TimeBudgetManager | None = None
[docs] @abstractmethod def fit( self, train_data: DataInput, tuning_data: DataInput | None = None, time_limit: int | None = None, presets: str | None = None, hyperparameters: dict[str, Any] | None = None, **kwargs, ) -> BasePredictor: """Fit the AutoML predictor. Parameters ---------- train_data : str, Path, DataFrame, or ndarray Training data. Can be a file path, DataFrame, or array. tuning_data : optional Validation/tuning data. If None, uses internal holdout. time_limit : int, optional Override the time limit. presets : str, optional Override the preset. hyperparameters : dict, optional Override hyperparameters for specific models. **kwargs Additional arguments. Returns ------- BasePredictor The fitted predictor. """ pass
[docs] @abstractmethod def predict( self, data: DataInput, model: str | None = None, ) -> np.ndarray: """Generate predictions. Parameters ---------- data : str, Path, DataFrame, or ndarray Input data to predict on. model : str, optional Specific model to use. If None, uses the ensemble. Returns ------- np.ndarray Predictions. """ pass
[docs] def predict_proba( self, data: DataInput, model: str | None = None, ) -> np.ndarray: """Generate probability predictions (classification only). Parameters ---------- data : str, Path, DataFrame, or ndarray Input data. model : str, optional Specific model to use. Returns ------- np.ndarray Probability predictions with shape (n_samples, n_classes). """ if self.problem_type_ == "regression": raise ValueError("predict_proba is not available for regression") # Default implementation - subclasses should override raise NotImplementedError("Subclass must implement predict_proba")
[docs] def evaluate( self, data: DataInput, metrics: list[str] | None = None, silent: bool = False, ) -> dict[str, float]: """Evaluate the predictor on data. Parameters ---------- data : str, Path, DataFrame, or ndarray Data to evaluate on. Must contain the target column. metrics : list of str, optional Metrics to compute. If None, uses default metrics. silent : bool, default=False Whether to suppress output. Returns ------- dict Dictionary mapping metric names to scores. """ self._check_is_fitted() # Load and prepare data X, y = self._load_and_prepare_data(data, for_prediction=False) # Get predictions y_pred = self.predict(X) # Default metrics if metrics is None: if self.problem_type_ in ("classification", "binary", "multiclass"): metrics = ["accuracy", "roc_auc", "f1"] else: metrics = ["rmse", "r2", "mae"] # Compute metrics from sklearn import metrics as sklearn_metrics results = {} for metric_name in metrics: try: if metric_name == "accuracy": results[metric_name] = sklearn_metrics.accuracy_score(y, y_pred) elif metric_name == "roc_auc": if hasattr(self, "predict_proba"): y_proba = self.predict_proba(X) if y_proba.ndim == 2 and y_proba.shape[1] == 2: y_proba = y_proba[:, 1] results[metric_name] = sklearn_metrics.roc_auc_score( y, y_proba, multi_class="ovr" ) elif metric_name == "f1": results[metric_name] = sklearn_metrics.f1_score( y, y_pred, average="weighted" ) elif metric_name == "rmse": results[metric_name] = np.sqrt( sklearn_metrics.mean_squared_error(y, y_pred) ) elif metric_name == "r2": results[metric_name] = sklearn_metrics.r2_score(y, y_pred) elif metric_name == "mae": results[metric_name] = sklearn_metrics.mean_absolute_error(y, y_pred) else: logger.warning(f"Unknown metric: {metric_name}") except Exception as e: logger.warning(f"Could not compute {metric_name}: {e}") results[metric_name] = np.nan if not silent and self.verbosity > 0: print("\nEvaluation Results:") for metric_name, score in results.items(): print(f" {metric_name}: {score:.4f}") if self.problem_type_ in ("classification", "binary", "multiclass"): try: from sklearn.metrics import classification_report print("\n Classification Report:") print(classification_report(y, y_pred, digits=4)) except Exception: pass return results
[docs] def leaderboard( self, extra_info: bool = False, silent: bool = False, ) -> pd.DataFrame: """Get the model leaderboard. Parameters ---------- extra_info : bool, default=False Whether to include extra information (fit time, etc.). silent : bool, default=False Whether to suppress output. Returns ------- pd.DataFrame Leaderboard with model names and scores. """ self._check_is_fitted() rows = [] for name, model_info in self._models.items(): row = { "model": name, "score": model_info.get("score", np.nan), } if extra_info: row["fit_time"] = model_info.get("fit_time", np.nan) row["predict_time"] = model_info.get("predict_time", np.nan) row["n_features"] = model_info.get("n_features", np.nan) rows.append(row) df = pd.DataFrame(rows) df = df.sort_values("score", ascending=False).reset_index(drop=True) if not silent and self.verbosity > 0: print(df.to_string(index=False)) return df
[docs] def feature_importance( self, model: str | None = None, importance_type: str = "split", ) -> pd.DataFrame: """Get feature importance scores. Parameters ---------- model : str, optional Specific model. If None, uses best model. importance_type : str, default="split" Type of importance: "split", "gain", "permutation". Returns ------- pd.DataFrame Feature importance scores. """ self._check_is_fitted() if model is None: # Use best model if self.fit_summary_ and self.fit_summary_.best_model: model = self.fit_summary_.best_model else: model = list(self._models.keys())[0] if model not in self._models: raise ValueError(f"Model '{model}' not found") estimator = self._models[model].get("estimator") if estimator is None: raise ValueError(f"No estimator found for model '{model}'") # Try to get feature importances if hasattr(estimator, "feature_importances_"): importances = estimator.feature_importances_ elif hasattr(estimator, "coef_"): importances = np.abs(estimator.coef_).flatten() else: raise ValueError(f"Model '{model}' does not provide feature importances") # Build dataframe if self.feature_names_ is not None: names = self.feature_names_ else: names = [f"feature_{i}" for i in range(len(importances))] df = pd.DataFrame({"feature": names, "importance": importances}) df = df.sort_values("importance", ascending=False).reset_index(drop=True) return df
[docs] def save(self, path: str | None = None) -> str: """Save the predictor to disk. Uses the endgame persistence module for individual components while preserving the existing directory layout for backwards compatibility. Parameters ---------- path : str, optional Path to save to. If None, uses output_path. Returns ------- str Path where the predictor was saved. """ from endgame.persistence import save as eg_save if path is None: if self.output_path: path = self.output_path else: path = f"automl_predictor_{int(time.time())}" path = Path(path) path.mkdir(parents=True, exist_ok=True) # Save main predictor state (lightweight dict, use pickle) state = { "label": self.label, "problem_type": self.problem_type, "eval_metric": self.eval_metric, "presets": self.presets, "time_limit": self.time_limit, "search_strategy": self.search_strategy, "random_state": self.random_state, "verbosity": self.verbosity, "is_fitted_": self.is_fitted_, "fit_summary_": self.fit_summary_, "problem_type_": self.problem_type_, "classes_": self.classes_, "feature_names_": self.feature_names_, } with open(path / "predictor_state.pkl", "wb") as f: pickle.dump(state, f) # Save individual models via persistence module models_dir = path / "models" for name, model_info in self._models.items(): model_path = models_dir / name model_path.mkdir(parents=True, exist_ok=True) estimator = model_info.get("estimator") if estimator is not None: eg_save(estimator, str(model_path / "model")) # Save non-estimator metadata separately meta = {k: v for k, v in model_info.items() if k != "estimator"} with open(model_path / "meta.pkl", "wb") as f: pickle.dump(meta, f) # Save ensemble if self._ensemble is not None: eg_save(self._ensemble, str(path / "ensemble")) # Save preprocessor if self._preprocessor is not None: eg_save(self._preprocessor, str(path / "preprocessor")) if self.verbosity > 0: print(f"Predictor saved to: {path}") return str(path)
[docs] @classmethod def load(cls, path: str) -> BasePredictor: """Load a predictor from disk. Supports both the legacy pickle format and the new endgame persistence format. Parameters ---------- path : str Path to load from. Returns ------- BasePredictor The loaded predictor. """ from endgame.persistence import load as eg_load path = Path(path) # Load state with open(path / "predictor_state.pkl", "rb") as f: state = pickle.load(f) # Create predictor predictor = cls( label=state["label"], problem_type=state["problem_type"], eval_metric=state["eval_metric"], presets=state["presets"], time_limit=state["time_limit"], search_strategy=state["search_strategy"], random_state=state["random_state"], verbosity=state["verbosity"], ) # Restore state predictor.is_fitted_ = state["is_fitted_"] predictor.fit_summary_ = state["fit_summary_"] predictor.problem_type_ = state["problem_type_"] predictor.classes_ = state["classes_"] predictor.feature_names_ = state["feature_names_"] # Load models models_path = path / "models" if models_path.exists(): for model_dir in models_path.iterdir(): if model_dir.is_dir(): model_info = {} # Try new format first (model.egm + meta.pkl) egm_file = model_dir / "model.egm" meta_file = model_dir / "meta.pkl" legacy_file = model_dir / "model.pkl" if egm_file.exists(): model_info["estimator"] = eg_load(str(egm_file)) if meta_file.exists(): with open(meta_file, "rb") as f: model_info.update(pickle.load(f)) elif legacy_file.exists(): # Legacy format: single pickle with everything with open(legacy_file, "rb") as f: model_info = pickle.load(f) if model_info: predictor._models[model_dir.name] = model_info # Load ensemble (try new format, fall back to legacy) for ensemble_path in [path / "ensemble.egm", path / "ensemble.pkl"]: if ensemble_path.exists(): if ensemble_path.suffix == ".egm": predictor._ensemble = eg_load(str(ensemble_path)) else: with open(ensemble_path, "rb") as f: predictor._ensemble = pickle.load(f) break # Load preprocessor (try new format, fall back to legacy) for prep_path in [path / "preprocessor.egm", path / "preprocessor.pkl"]: if prep_path.exists(): if prep_path.suffix == ".egm": predictor._preprocessor = eg_load(str(prep_path)) else: with open(prep_path, "rb") as f: predictor._preprocessor = pickle.load(f) break return predictor
def _check_is_fitted(self) -> None: """Check if the predictor is fitted.""" if not self.is_fitted_: raise RuntimeError( "Predictor is not fitted. Call fit() before making predictions." ) def _detect_problem_type(self, y: np.ndarray) -> str: """Detect the problem type from the target. Parameters ---------- y : np.ndarray Target values. Returns ------- str Detected problem type. """ if self.problem_type != "auto": return self.problem_type # Check dtype if y.dtype.kind == "f": # Float dtype n_unique = len(np.unique(y[~np.isnan(y)])) if n_unique <= 20: # Could be classification encoded as float if np.allclose(y, y.astype(int)): return "multiclass" if n_unique > 2 else "binary" return "regression" # Integer or object dtype n_unique = len(np.unique(y)) if n_unique == 2: return "binary" elif n_unique <= 100: return "multiclass" else: # Too many unique values for classification return "regression" def _get_eval_metric(self) -> str: """Get the evaluation metric based on problem type.""" if self.eval_metric != "auto": return self.eval_metric if self.problem_type_ in ("binary", "classification"): return "roc_auc" elif self.problem_type_ == "multiclass": return "log_loss" else: return "rmse" def _load_and_prepare_data( self, data: DataInput, for_prediction: bool = True, ) -> tuple: """Load and prepare data. Parameters ---------- data : various Input data. for_prediction : bool Whether this is for prediction (no target needed). Returns ------- X, y : tuple Features and target (y is None if for_prediction=True). """ # This is a simple implementation - subclasses should override if isinstance(data, (str, Path)): df = pd.read_csv(data) elif isinstance(data, pd.DataFrame): df = data elif isinstance(data, np.ndarray): df = pd.DataFrame(data) else: raise ValueError(f"Unsupported data type: {type(data)}") if for_prediction: if self.label in df.columns: X = df.drop(columns=[self.label]) else: X = df return X, None else: if self.label not in df.columns: raise ValueError(f"Target column '{self.label}' not found in data") y = df[self.label].values X = df.drop(columns=[self.label]) return X, y
[docs] def refit_full(self, data: DataInput | None = None) -> BasePredictor: """Retrain best model(s) on all available data (train + validation). After cross-validation identifies the best model and hyperparameters, this method retrains on the full dataset for maximum deployment performance. The refitted model cannot be evaluated (no holdout). Parameters ---------- data : DataInput, optional Full dataset. If None, uses the training data from the last fit() call (subclasses must store it). Returns ------- BasePredictor Self with models retrained on full data. Raises ------ RuntimeError If the predictor has not been fitted. """ self._check_is_fitted() if not self._models: raise RuntimeError("No models available to refit.") logger.warning( "refit_full() retrains on all data. The resulting model " "cannot be evaluated with a holdout set." ) return self
# ------------------------------------------------------------------ # Logger helpers # ------------------------------------------------------------------ def _log_fit_start(self, n_samples: int, n_features: int) -> None: """Log parameters at the start of fitting (if logger is set).""" if self.logger is None: return self.logger.log_params({ "label": self.label, "problem_type": self.problem_type, "presets": self.presets, "time_limit": self.time_limit, "n_samples": n_samples, "n_features": n_features, }) def _log_fit_end(self) -> None: """Log metrics at the end of fitting (if logger is set).""" if self.logger is None or self.fit_summary_ is None: return metrics = { "best_score": self.fit_summary_.best_score, "cv_score": self.fit_summary_.cv_score, "total_time": self.fit_summary_.total_time, "n_models_trained": float(self.fit_summary_.n_models_trained), } self.logger.log_metrics(metrics) def __repr__(self) -> str: fitted_str = "fitted" if self.is_fitted_ else "not fitted" return ( f"{self.__class__.__name__}(" f"label='{self.label}', " f"presets='{self.presets}', " f"{fitted_str})" )