Source code for endgame.automl.orchestrator

from __future__ import annotations

"""Pipeline orchestrator for AutoML.

This module coordinates the execution of all AutoML pipeline stages
with intelligent time budget management and graceful degradation.
"""

import atexit
import logging
import os
import time
import weakref
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any

import numpy as np
import pandas as pd

from endgame.automl.presets import PRESETS, PresetConfig
from endgame.automl.search.base import PipelineConfig, SearchResult
from endgame.automl.time_manager import TimeBudgetManager

logger = logging.getLogger(__name__)

# Track non-daemon child processes so they are cleaned up on parent exit.
_active_children: list[weakref.ref] = []


def _cleanup_children():
    import multiprocessing as _mp
    for ref in _active_children:
        p = ref()
        if p is not None and p.is_alive():
            try:
                p.terminate()
                p.join(timeout=3)
                if p.is_alive():
                    p.kill()
            except Exception:
                pass


atexit.register(_cleanup_children)


# ── Process-based model training with hard kill ─────────────────────

def _train_worker(func, args, kwargs, result_queue):
    """Target for the child process.  Runs ``func(*args, **kwargs)``
    and puts the result (or exception) on ``result_queue``."""
    import os
    import signal
    import traceback

    # Allow the child to use most available cores.  We run one model
    # at a time so there's no contention — let BLAS/OpenMP parallelise.
    n_cpus = str(max(1, os.cpu_count() - 2))
    for key in ("LOKY_MAX_CPU_COUNT", "OMP_NUM_THREADS",
                "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS"):
        os.environ[key] = n_cpus

    # Force CPU-only mode unless GPU is explicitly enabled via kwargs.
    # CUDA can't re-initialize after fork(), so forked processes always
    # disable CUDA.  GPU training uses the thread-based fallback path.
    use_gpu = kwargs.pop("_use_gpu", False)
    if not use_gpu:
        os.environ["CUDA_VISIBLE_DEVICES"] = ""

    signal.signal(signal.SIGTERM, lambda *_: os._exit(1))
    try:
        result = func(*args, **kwargs)
        # result is (oof_pred, score) — pure numpy/float, always picklable.
        # The model is never sent through the queue; the parent refits
        # in-process to avoid pickle/segfault issues with PyTorch etc.
        result_queue.put(("ok", result))
    except BaseException as e:
        try:
            result_queue.put(("error", e))
        except Exception:
            result_queue.put((
                "error",
                RuntimeError(
                    f"{type(e).__name__}: {e}\n"
                    f"{traceback.format_exc()}"
                ),
            ))


def _train_with_timeout(func, *args, sample_weights=None, use_gpu=False, **kwargs):
    """Run *func* in a child **process** with a hard timeout.

    If the child exceeds the deadline, ``Process.kill()`` sends
    SIGKILL which immediately frees all RAM — including any
    joblib / loky workers the model spawned.

    Falls back to thread-based execution if fork is unavailable
    (e.g. macOS with spawn-only context) or if ``use_gpu=True``
    (CUDA cannot re-initialize after fork).

    Parameters match ``_train_model(config, X, y, task_type,
    time_budget, sample_weights=...)``.  The ``time_budget`` arg
    (5th positional) doubles as the deadline.
    """
    import multiprocessing as mp
    import sys

    time_budget = args[4]  # 5th positional = time_budget
    deadline = time_budget + min(30, time_budget * 0.3)

    # Prefer fork (child inherits memory = fast, supports bound methods).
    # Fall back to thread if fork unavailable or GPU mode is active
    # (CUDA cannot re-init after fork).
    use_process = True
    if use_gpu:
        use_process = False
    else:
        try:
            ctx = mp.get_context("fork")
        except ValueError:
            use_process = False

    if not use_process:
        # Thread-based execution (required for GPU, fallback for no-fork)
        import os
        from concurrent.futures import ThreadPoolExecutor
        from concurrent.futures import TimeoutError as FuturesTimeout

        _env_backup = {}
        n_cpus = str(max(1, (os.cpu_count() or 4) - 2))
        for key in ("LOKY_MAX_CPU_COUNT", "OMP_NUM_THREADS",
                     "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS"):
            _env_backup[key] = os.environ.get(key)
            os.environ[key] = n_cpus

        # Only force CPU when GPU is not requested
        if not use_gpu:
            _env_backup["CUDA_VISIBLE_DEVICES"] = os.environ.get("CUDA_VISIBLE_DEVICES")
            os.environ["CUDA_VISIBLE_DEVICES"] = ""

        pool = ThreadPoolExecutor(max_workers=1)
        future = pool.submit(func, *args, sample_weights=sample_weights)
        try:
            return future.result(timeout=deadline)
        except FuturesTimeout:
            raise TimeoutError(
                f"Model training exceeded {deadline:.0f}s deadline"
            )
        finally:
            pool.shutdown(wait=False, cancel_futures=True)
            for key, val in _env_backup.items():
                if val is None:
                    os.environ.pop(key, None)
                else:
                    os.environ[key] = val

    q = ctx.Queue()
    p = ctx.Process(
        target=_train_worker,
        args=(func, args, {"sample_weights": sample_weights, "_use_gpu": use_gpu}, q),
        daemon=False,
    )
    p.start()
    _active_children.append(weakref.ref(p))

    # Drain the queue BEFORE joining.  The child sends a single message:
    #   ("ok", (oof_pred, score))  or  ("error", exception)
    # No fitted model is sent — the parent refits in-process.
    result = None
    try:
        result = q.get(timeout=deadline)
    except KeyboardInterrupt:
        p.terminate()
        p.join(timeout=5)
        if p.is_alive():
            p.kill()
            p.join(timeout=2)
        raise
    except Exception:
        pass

    p.join(timeout=30)
    if p.is_alive():
        p.terminate()
        p.join(timeout=5)
        if p.is_alive():
            p.kill()
            p.join(timeout=2)
        if result is None:
            raise TimeoutError(f"Model training exceeded {deadline:.0f}s deadline")

    if result is None:
        if p.exitcode is not None and p.exitcode != 0:
            detail = f"exit code {p.exitcode}"
            if p.exitcode < 0:
                import signal as _signal
                try:
                    detail = f"signal {_signal.Signals(-p.exitcode).name}"
                except (ValueError, AttributeError):
                    detail = f"signal {-p.exitcode}"
            raise RuntimeError(f"Model training process died ({detail})")

        raise RuntimeError(
            "Model training process exited normally but produced no result "
            "(likely an unhandled exception in the child)"
        )

    status, payload = result
    if status == "error":
        raise payload
    return payload


[docs] @dataclass class StageResult: """Result from executing a pipeline stage. Attributes ---------- stage_name : str Name of the stage. success : bool Whether the stage completed successfully. duration : float Time taken in seconds. output : Any Stage output data. error : str, optional Error message if stage failed. metadata : dict Additional metadata. """ stage_name: str success: bool duration: float output: Any = None error: str | None = None metadata: dict[str, Any] = field(default_factory=dict)
[docs] @dataclass class PipelineResult: """Complete result from running the AutoML pipeline. Attributes ---------- best_model : Any Best trained model. ensemble : Any Final ensemble (if built). score : float Best validation score. scores : dict All evaluation metrics. stage_results : dict Results from each pipeline stage. total_time : float Total execution time in seconds. metadata : dict Additional pipeline metadata. """ best_model: Any ensemble: Any score: float scores: dict[str, float] stage_results: dict[str, StageResult] total_time: float metadata: dict[str, Any] = field(default_factory=dict)
class BaseStageExecutor(ABC): """Base class for pipeline stage executors.""" @abstractmethod def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Execute the stage. Parameters ---------- context : dict Pipeline context with data and intermediate results. time_budget : float Time budget for this stage in seconds. Returns ------- StageResult Result of stage execution. """ pass class ProfilingExecutor(BaseStageExecutor): """Executes the data profiling stage.""" def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Profile the dataset to extract meta-features. Parameters ---------- context : dict Must contain 'X' and 'y' keys. time_budget : float Time budget in seconds. Returns ------- StageResult Contains meta_features dict in output. """ start_time = time.time() try: X = context["X"] y = context["y"] task_type = context.get("task_type", "classification") # Try to use MetaProfiler if available try: from endgame.benchmark.profiler import MetaProfiler profiler = MetaProfiler() meta_features = profiler.profile(X, y, task_type=task_type) meta_features_dict = meta_features.features except (ImportError, AttributeError): meta_features_dict = self._basic_profile(X, y, task_type) except Exception: # MetaProfiler failed (e.g. mixed dtypes), fall back meta_features_dict = self._basic_profile(X, y, task_type) duration = time.time() - start_time return StageResult( stage_name="profiling", success=True, duration=duration, output={"meta_features": meta_features_dict}, metadata={"n_features_computed": len(meta_features_dict)}, ) except Exception as e: duration = time.time() - start_time logger.error(f"Profiling failed: {e}") return StageResult( stage_name="profiling", success=False, duration=duration, output={"meta_features": {}}, error=str(e), ) def _basic_profile( self, X: pd.DataFrame | np.ndarray, y: np.ndarray, task_type: str, ) -> dict[str, float]: """Basic profiling when MetaProfiler is unavailable. Parameters ---------- X : array-like Feature matrix. y : array-like Target vector. task_type : str Task type. Returns ------- dict Basic meta-features. """ if isinstance(X, pd.DataFrame): n_cat = len(X.select_dtypes(include=["object", "category"]).columns) X_num = X.select_dtypes(include=[np.number]) n_samples, n_features = X.shape else: X_num = None n_cat = 0 n_samples, n_features = X.shape features = { "nr_inst": float(n_samples), "nr_attr": float(n_features), "nr_cat": float(n_cat), "nr_num": float(n_features - n_cat), "dimensionality": n_features / max(n_samples, 1), } # Missing value statistics if isinstance(X, pd.DataFrame): features["pct_missing"] = X.isna().mean().mean() elif np.issubdtype(X.dtype, np.floating): features["pct_missing"] = float(np.isnan(X).mean()) else: features["pct_missing"] = 0.0 # Missing value rates per column if isinstance(X, pd.DataFrame): missing_rate = X.isna().mean().mean() elif np.issubdtype(X.dtype, np.floating): missing_rate = float(np.isnan(X).mean()) else: missing_rate = 0.0 features["missing_rate"] = missing_rate # Class imbalance for classification if task_type == "classification": _, counts = np.unique(y, return_counts=True) features["class_imbalance"] = counts.max() / max(counts.min(), 1) features["nr_class"] = float(len(counts)) features["class_imbalance_ratio"] = float(counts.min()) / float(max(counts.max(), 1)) # Time series detection: check for datetime columns n_datetime_cols = 0 if isinstance(X, pd.DataFrame): for col in X.columns: if pd.api.types.is_datetime64_any_dtype(X[col]): n_datetime_cols += 1 elif X[col].dtype == "object": try: pd.to_datetime(X[col].dropna().head(20)) n_datetime_cols += 1 except (ValueError, TypeError): pass features["n_datetime_cols"] = float(n_datetime_cols) features["is_time_series"] = float(n_datetime_cols > 0) # Boolean columns n_boolean = 0 if isinstance(X, pd.DataFrame): for col in X.columns: if pd.api.types.is_bool_dtype(X[col]) or (X[col].nunique() == 2 and pd.api.types.is_numeric_dtype(X[col]) and set(X[col].dropna().unique()).issubset({0, 1})): n_boolean += 1 features["n_boolean"] = float(n_boolean) # Target skewness for regression if task_type == "regression": try: from scipy.stats import skew features["target_skewness"] = float(skew(y)) except ImportError: features["target_skewness"] = 0.0 return features class DataCleaningExecutor(BaseStageExecutor): """Executes the data cleaning stage. Handles outlier removal and label noise detection based on the feature engineering level. """ def __init__(self, feature_engineering: str = "none"): self.feature_engineering = feature_engineering def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Clean data by removing outliers and noisy labels. Parameters ---------- context : dict Must contain 'X' and 'y' keys. time_budget : float Time budget in seconds. Returns ------- StageResult Contains X_cleaned, y_cleaned, clean_mask in output. """ start_time = time.time() level = self.feature_engineering X = context["X"] y = context["y"] if level in ("none", "light"): duration = time.time() - start_time return StageResult( stage_name="data_cleaning", success=True, duration=duration, output={"X_cleaned": X, "y_cleaned": y, "clean_mask": None}, metadata={"skipped": True, "level": level}, ) try: X_arr = X.values if isinstance(X, pd.DataFrame) else X n_samples = X_arr.shape[0] clean_mask = np.ones(n_samples, dtype=bool) outliers_removed = 0 noisy_removed = 0 # Moderate+: Outlier detection via IsolationForest if level in ("moderate", "aggressive"): elapsed = time.time() - start_time if elapsed < time_budget * 0.6: try: from endgame.anomaly.isolation_forest import IsolationForestDetector detector = IsolationForestDetector( n_estimators=100, contamination="auto", ) outlier_preds = detector.fit_predict(X_arr) outlier_mask = outlier_preds == -1 outlier_frac = outlier_mask.sum() / n_samples # Safety guard: only remove if <10% flagged if outlier_frac < 0.10: clean_mask &= ~outlier_mask outliers_removed = int(outlier_mask.sum()) logger.info( f"DataCleaning: removed {outliers_removed} outliers " f"({outlier_frac:.1%})" ) else: logger.info( f"DataCleaning: skipping outlier removal " f"({outlier_frac:.1%} > 10% threshold)" ) except ImportError: logger.debug("IsolationForestDetector not available, skipping outlier removal") except Exception as e: logger.debug(f"Outlier detection failed: {e}") # Aggressive: Label noise detection if level == "aggressive": elapsed = time.time() - start_time if elapsed < time_budget * 0.9: try: from endgame.preprocessing.noise_detection import ConfidentLearningFilter clf = ConfidentLearningFilter(base_estimator="rf", cv=3) noisy_mask = clf.fit_predict(X_arr[clean_mask], y[clean_mask]) noisy_frac = noisy_mask.sum() / clean_mask.sum() # Safety guard: only remove if <5% flagged if noisy_frac < 0.05: # Map back to original indices clean_indices = np.where(clean_mask)[0] noisy_indices = clean_indices[noisy_mask] clean_mask[noisy_indices] = False noisy_removed = int(noisy_mask.sum()) logger.info( f"DataCleaning: removed {noisy_removed} noisy labels " f"({noisy_frac:.1%})" ) else: logger.info( f"DataCleaning: skipping noise removal " f"({noisy_frac:.1%} > 5% threshold)" ) except ImportError: logger.debug("ConfidentLearningFilter not available, skipping noise detection") except Exception as e: logger.debug(f"Noise detection failed: {e}") # Apply mask if isinstance(X, pd.DataFrame): X_cleaned = X.iloc[clean_mask].reset_index(drop=True) else: X_cleaned = X_arr[clean_mask] y_cleaned = y[clean_mask] duration = time.time() - start_time return StageResult( stage_name="data_cleaning", success=True, duration=duration, output={ "X_cleaned": X_cleaned, "y_cleaned": y_cleaned, "clean_mask": clean_mask, }, metadata={ "level": level, "outliers_removed": outliers_removed, "noisy_removed": noisy_removed, "samples_remaining": int(clean_mask.sum()), }, ) except Exception as e: duration = time.time() - start_time logger.error(f"Data cleaning failed: {e}") return StageResult( stage_name="data_cleaning", success=False, duration=duration, output={"X_cleaned": X, "y_cleaned": y, "clean_mask": None}, error=str(e), ) class PreprocessingExecutor(BaseStageExecutor): """Executes the preprocessing stage. Uses endgame's advanced preprocessing modules when the preset's feature_engineering level is 'light', 'moderate', or 'aggressive'. Falls back to basic sklearn for 'none'/'fast' levels. """ def __init__(self, feature_engineering: str = "none"): """Initialize preprocessing executor. Parameters ---------- feature_engineering : str, default="none" Feature engineering level: "none", "light", "moderate", "aggressive". """ self.feature_engineering = feature_engineering def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Build and fit preprocessing pipeline. Parameters ---------- context : dict Must contain 'X', 'y', and 'meta_features'. time_budget : float Time budget in seconds. Returns ------- StageResult Contains preprocessor and X_processed in output. """ start_time = time.time() try: X = context["X"] y = context["y"] meta_features = context.get("meta_features", {}) task_type = context.get("task_type", "classification") # Build preprocessing pipeline preprocessor = self._build_preprocessor( X, y, meta_features, task_type, ) # Fit and transform X_processed = preprocessor.fit_transform(X, y) duration = time.time() - start_time return StageResult( stage_name="preprocessing", success=True, duration=duration, output={ "preprocessor": preprocessor, "X_processed": X_processed, }, metadata={ "n_steps": len(preprocessor.steps) if hasattr(preprocessor, "steps") else 1, "feature_engineering": self.feature_engineering, }, ) except Exception as e: logger.error(f"Preprocessing failed: {e}") # Fallback: try basic preprocessor before giving up if self.feature_engineering not in ("none", "fast"): try: X = context["X"] y = context["y"] if isinstance(X, pd.DataFrame): numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist() categorical_cols = X.select_dtypes( include=["object", "category"] ).columns.tolist() else: numeric_cols = list(range(X.shape[1])) categorical_cols = [] preprocessor = self._build_basic_preprocessor( numeric_cols, categorical_cols, X, ) X_processed = preprocessor.fit_transform(X, y) duration = time.time() - start_time logger.info("Fell back to basic preprocessor successfully") return StageResult( stage_name="preprocessing", success=True, duration=duration, output={ "preprocessor": preprocessor, "X_processed": X_processed, }, metadata={ "fallback": True, "original_error": str(e), }, ) except Exception as fallback_e: logger.error(f"Fallback preprocessing also failed: {fallback_e}") duration = time.time() - start_time return StageResult( stage_name="preprocessing", success=False, duration=duration, output={ "preprocessor": None, "X_processed": context["X"], }, error=str(e), ) def _build_preprocessor( self, X: pd.DataFrame | np.ndarray, y: np.ndarray, meta_features: dict[str, float], task_type: str = "classification", ): """Build preprocessing pipeline based on data characteristics and preset. The pipeline varies by feature_engineering level: - none/fast: SimpleImputer(median) + StandardScaler + OneHotEncoder - light: KNNImputer for numeric, SafeTargetEncoder for high-cardinality categoricals (>20 unique), OneHotEncoder for low-cardinality - moderate: MICEImputer, SafeTargetEncoder, AutoBalancer when minority class ratio < 0.2 - aggressive: All of moderate + CorrelationSelector(threshold=0.95) Parameters ---------- X : array-like Feature matrix. y : array-like Target vector. meta_features : dict Dataset meta-features. task_type : str, default="classification" Task type. Returns ------- Pipeline Preprocessing pipeline. """ level = self.feature_engineering # Identify column types if isinstance(X, pd.DataFrame): numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist() categorical_cols = X.select_dtypes(include=["object", "category"]).columns.tolist() else: numeric_cols = list(range(X.shape[1])) categorical_cols = [] # Split categoricals into high/low cardinality for target encoding high_card_cols = [] low_card_cols = [] if isinstance(X, pd.DataFrame) and categorical_cols: for col in categorical_cols: if X[col].nunique() > 20: high_card_cols.append(col) else: low_card_cols.append(col) else: low_card_cols = categorical_cols # === Build the ColumnTransformer === if level in ("none", "fast"): return self._build_basic_preprocessor( numeric_cols, categorical_cols, X, ) if level == "light": return self._build_light_preprocessor( numeric_cols, low_card_cols, high_card_cols, X, ) if level == "moderate": return self._build_moderate_preprocessor( numeric_cols, low_card_cols, high_card_cols, X, y, meta_features, task_type, ) if level == "aggressive": return self._build_aggressive_preprocessor( numeric_cols, low_card_cols, high_card_cols, X, y, meta_features, task_type, ) # Default fallback return self._build_basic_preprocessor(numeric_cols, categorical_cols, X) def _build_basic_preprocessor(self, numeric_cols, categorical_cols, X): """Build basic sklearn preprocessor (none/fast level).""" from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler transformers = [] if numeric_cols: numeric_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ]) transformers.append(("numeric", numeric_pipeline, numeric_cols)) if categorical_cols: categorical_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), ]) transformers.append(("categorical", categorical_pipeline, categorical_cols)) if transformers: return ColumnTransformer(transformers, remainder="passthrough") else: return Pipeline([ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ]) def _build_light_preprocessor(self, numeric_cols, low_card_cols, high_card_cols, X): """Build light preprocessor with KNNImputer and SafeTargetEncoder.""" from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler transformers = [] if numeric_cols: try: from endgame.preprocessing.imputation import KNNImputer as EgKNNImputer numeric_pipeline = Pipeline([ ("imputer", EgKNNImputer(n_neighbors=5)), ("scaler", StandardScaler()), ]) except ImportError: numeric_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ]) transformers.append(("numeric", numeric_pipeline, numeric_cols)) if low_card_cols: categorical_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), ]) transformers.append(("cat_low", categorical_pipeline, low_card_cols)) if high_card_cols: try: from endgame.preprocessing.encoding import SafeTargetEncoder transformers.append(("cat_high", SafeTargetEncoder(cols=high_card_cols), high_card_cols)) except ImportError: categorical_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False, max_categories=20)), ]) transformers.append(("cat_high", categorical_pipeline, high_card_cols)) if transformers: return ColumnTransformer(transformers, remainder="passthrough") else: return Pipeline([ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ]) def _build_moderate_preprocessor( self, numeric_cols, low_card_cols, high_card_cols, X, y, meta_features, task_type, ): """Build moderate preprocessor with MICEImputer, SafeTargetEncoder, AutoBalancer.""" from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler transformers = [] if numeric_cols: try: from endgame.preprocessing.imputation import MICEImputer numeric_pipeline = Pipeline([ ("imputer", MICEImputer(max_iter=10)), ("scaler", StandardScaler()), ]) except ImportError: numeric_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ]) transformers.append(("numeric", numeric_pipeline, numeric_cols)) if low_card_cols: categorical_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)), ]) transformers.append(("cat_low", categorical_pipeline, low_card_cols)) if high_card_cols: try: from endgame.preprocessing.encoding import SafeTargetEncoder transformers.append(("cat_high", SafeTargetEncoder(cols=high_card_cols), high_card_cols)) except ImportError: categorical_pipeline = Pipeline([ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False, max_categories=20)), ]) transformers.append(("cat_high", categorical_pipeline, high_card_cols)) if transformers: return ColumnTransformer(transformers, remainder="passthrough") return Pipeline([ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ]) def _build_aggressive_preprocessor( self, numeric_cols, low_card_cols, high_card_cols, X, y, meta_features, task_type, ): """Build aggressive preprocessor: moderate + CorrelationSelector.""" moderate_preprocessor = self._build_moderate_preprocessor( numeric_cols, low_card_cols, high_card_cols, X, y, meta_features, task_type, ) # Wrap with CorrelationSelector from sklearn.pipeline import Pipeline try: from endgame.feature_selection.filter.correlation import CorrelationSelector return Pipeline([ ("preprocessing", moderate_preprocessor), ("feature_selection", CorrelationSelector(threshold=0.95)), ]) except ImportError: return moderate_preprocessor class AdvancedFeatureEngineeringExecutor(BaseStageExecutor): """Executes the advanced feature engineering stage. Applies PCA, MRMR, Boruta, DAE, and AutoCluster features based on the feature engineering level. """ class _FeatureAugmenter: """Wraps a transformer so that transform() concatenates new features with input.""" def __init__(self, transformer, method: str = "transform"): self.transformer = transformer self.method = method def transform(self, X): func = getattr(self.transformer, self.method) new_features = func(X) if new_features.ndim == 1: new_features = new_features.reshape(-1, 1) return np.hstack([X, new_features]) def __init__(self, feature_engineering: str = "none"): self.feature_engineering = feature_engineering def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Apply advanced feature engineering. Parameters ---------- context : dict Must contain 'X_processed' and 'y'. time_budget : float Time budget in seconds. Returns ------- StageResult Contains X_engineered and feature_transformers in output. """ start_time = time.time() level = self.feature_engineering # Pick the most-processed data available X = context.get("X_processed", context.get("X_cleaned", context.get("X"))) y = context.get("y_cleaned", context.get("y")) if level in ("none", "light"): duration = time.time() - start_time return StageResult( stage_name="feature_engineering", success=True, duration=duration, output={"X_engineered": X, "feature_transformers": []}, metadata={"skipped": True, "level": level}, ) try: X_arr = X.values if isinstance(X, pd.DataFrame) else np.asarray(X) n_samples, n_features = X_arr.shape feature_transformers = [] # PCA for high-dimensional data (d > 500) if n_features > 500: elapsed = time.time() - start_time if elapsed < time_budget * 0.3: try: from endgame.dimensionality_reduction.linear import PCAReducer pca = PCAReducer(n_components=0.95) X_arr = pca.fit_transform(X_arr) feature_transformers.append(("pca", pca)) logger.info( f"FeatureEngineering: PCA reduced {n_features} -> " f"{X_arr.shape[1]} features" ) except ImportError: logger.debug("PCAReducer not available, skipping PCA") except Exception as e: logger.debug(f"PCA failed: {e}") if level == "moderate": # MRMR feature selection elapsed = time.time() - start_time if elapsed < time_budget * 0.8: try: from endgame.feature_selection.filter.mrmr import MRMRSelector n_select = max(10, X_arr.shape[1] // 2) mrmr = MRMRSelector(n_features=n_select) X_arr = mrmr.fit_transform(X_arr, y) feature_transformers.append(("mrmr", mrmr)) logger.info(f"FeatureEngineering: MRMR selected {X_arr.shape[1]} features") except ImportError: logger.debug("MRMRSelector not available, skipping MRMR") except Exception as e: logger.debug(f"MRMR failed: {e}") elif level == "aggressive": # Boruta feature selection elapsed = time.time() - start_time if elapsed < time_budget * 0.5: try: from endgame.feature_selection.wrapper.boruta import BorutaSelector boruta = BorutaSelector(max_iter=50) X_arr = boruta.fit_transform(X_arr, y) feature_transformers.append(("boruta", boruta)) logger.info(f"FeatureEngineering: Boruta selected {X_arr.shape[1]} features") except ImportError: logger.debug("BorutaSelector not available, skipping Boruta") except Exception as e: logger.debug(f"Boruta failed: {e}") # Denoising autoencoder features elapsed = time.time() - start_time if elapsed < time_budget * 0.75: try: from endgame.preprocessing.dae import DenoisingAutoEncoder dae = DenoisingAutoEncoder(hidden_dims=[128, 64]) dae_features = dae.fit_transform(X_arr) X_arr = np.hstack([X_arr, dae_features]) feature_transformers.append(("dae", self._FeatureAugmenter(dae))) logger.info(f"FeatureEngineering: DAE added {dae_features.shape[1]} features") except ImportError: logger.debug("DenoisingAutoEncoder not available, skipping DAE") except Exception as e: logger.debug(f"DAE failed: {e}") # AutoCluster features elapsed = time.time() - start_time if elapsed < time_budget * 0.9: try: from endgame.clustering.auto import AutoCluster clusterer = AutoCluster() cluster_labels = clusterer.fit_predict(X_arr).reshape(-1, 1) X_arr = np.hstack([X_arr, cluster_labels]) feature_transformers.append(("cluster", self._FeatureAugmenter(clusterer, method="predict"))) logger.info("FeatureEngineering: added cluster label feature") except ImportError: logger.debug("AutoCluster not available, skipping clustering") except Exception as e: logger.debug(f"AutoCluster failed: {e}") duration = time.time() - start_time return StageResult( stage_name="feature_engineering", success=True, duration=duration, output={ "X_engineered": X_arr, "feature_transformers": feature_transformers, }, metadata={ "level": level, "n_transformers": len(feature_transformers), "output_features": X_arr.shape[1], }, ) except Exception as e: duration = time.time() - start_time logger.error(f"Feature engineering failed: {e}") return StageResult( stage_name="feature_engineering", success=False, duration=duration, output={"X_engineered": X, "feature_transformers": []}, error=str(e), ) class DataAugmentationExecutor(BaseStageExecutor): """Executes the data augmentation stage. Uses triage-informed SMOTE and sample weighting for imbalanced classification tasks. """ def __init__(self, feature_engineering: str = "none"): self.feature_engineering = feature_engineering def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Apply data augmentation for imbalanced classification. Parameters ---------- context : dict Must contain feature data and 'y'. time_budget : float Time budget in seconds. Returns ------- StageResult Contains X_augmented, y_augmented, sample_weights in output. """ start_time = time.time() level = self.feature_engineering task_type = context.get("task_type", "classification") # Pick the most-processed data X = context.get("X_engineered", context.get("X_processed", context.get("X_cleaned", context.get("X")))) y = context.get("y_cleaned", context.get("y")) # Skip for regression or none/light level if task_type == "regression" or level in ("none", "light"): duration = time.time() - start_time return StageResult( stage_name="data_augmentation", success=True, duration=duration, output={ "X_augmented": X, "y_augmented": y, "sample_weights": None, }, metadata={"skipped": True, "reason": "regression or light level"}, ) # Check class imbalance meta_features = context.get("meta_features", {}) imbalance_ratio = meta_features.get("class_imbalance_ratio", 1.0) if imbalance_ratio >= 0.3: duration = time.time() - start_time return StageResult( stage_name="data_augmentation", success=True, duration=duration, output={ "X_augmented": X, "y_augmented": y, "sample_weights": None, }, metadata={"skipped": True, "reason": "balanced data"}, ) try: X_arr = X.values if isinstance(X, pd.DataFrame) else np.asarray(X) sample_weights = None if level == "moderate": try: from endgame.preprocessing.imbalance import AutoBalancer balancer = AutoBalancer(strategy="auto") X_arr, y = balancer.fit_resample(X_arr, y) logger.info(f"DataAugmentation: AutoBalancer -> {len(y)} samples") except ImportError: logger.debug("No augmentation modules available") elif level == "aggressive": # Aggressive augmentation: apply class weighting logger.debug("Aggressive augmentation: using class weights") duration = time.time() - start_time return StageResult( stage_name="data_augmentation", success=True, duration=duration, output={ "X_augmented": X_arr, "y_augmented": y, "sample_weights": sample_weights, }, metadata={"level": level, "n_samples": len(y)}, ) except Exception as e: duration = time.time() - start_time logger.error(f"Data augmentation failed: {e}") return StageResult( stage_name="data_augmentation", success=False, duration=duration, output={ "X_augmented": X, "y_augmented": y, "sample_weights": None, }, error=str(e), ) class ModelSelectionExecutor(BaseStageExecutor): """Executes the model selection stage.""" def __init__(self, search_strategy=None): """Initialize model selection executor. Parameters ---------- search_strategy : BaseSearchStrategy, optional Search strategy to use. If None, uses PortfolioSearch. """ self.search_strategy = search_strategy def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Select model configurations to train. Parameters ---------- context : dict Must contain 'meta_features' and 'max_models'. time_budget : float Time budget in seconds. Returns ------- StageResult Contains model_configs in output. """ start_time = time.time() try: meta_features = context.get("meta_features", {}) max_models = context.get("max_models", 5) task_type = context.get("task_type", "classification") # Initialize search strategy if needed if self.search_strategy is None: from endgame.automl.search.portfolio import PortfolioSearch self.search_strategy = PortfolioSearch( task_type=task_type, preset=context.get("preset", "medium_quality"), ) # Get model configurations configs = self.search_strategy.suggest( meta_features=meta_features, n_suggestions=max_models, ) duration = time.time() - start_time return StageResult( stage_name="model_selection", success=True, duration=duration, output={"model_configs": configs}, metadata={"n_models_selected": len(configs)}, ) except Exception as e: duration = time.time() - start_time logger.error(f"Model selection failed: {e}") # Fallback to LGBM fallback_config = PipelineConfig( model_name="lgbm", model_params={"n_estimators": 1000, "learning_rate": 0.05}, ) return StageResult( stage_name="model_selection", success=False, duration=duration, output={"model_configs": [fallback_config]}, error=str(e), ) class ModelTrainingExecutor(BaseStageExecutor): """Executes the model training stage.""" def __init__( self, cv_folds: int = 5, parallel: bool = True, feature_engineering: str = "none", verbose: int = 1, min_model_time: float = 300.0, max_model_time: float = 600.0, eval_metric: str = "auto", early_stopping_rounds: int = 50, use_gpu: bool = False, ): """Initialize model training executor. Parameters ---------- cv_folds : int, default=5 Number of cross-validation folds. parallel : bool, default=True Whether to train models in parallel. feature_engineering : str, default="none" Feature engineering level (used to decide target transform). verbose : int, default=1 Verbosity level. min_model_time : float, default=300.0 Minimum time budget (seconds) guaranteed to each model. Models are trained in priority order; once the remaining stage budget drops below this floor, training stops. max_model_time : float, default=600.0 Hard ceiling (seconds) for any single model. If a model exceeds this, its training thread is abandoned and the pipeline moves on to the next model. eval_metric : str, default="auto" Evaluation metric for scoring models. early_stopping_rounds : int, default=50 Early stopping patience for GBDT models during CV. use_gpu : bool, default=False Whether to enable GPU acceleration. """ self.cv_folds = cv_folds self.parallel = parallel self.feature_engineering = feature_engineering self.verbose = verbose self.min_model_time = min_model_time self.max_model_time = max_model_time self.eval_metric = eval_metric self.early_stopping_rounds = early_stopping_rounds self.use_gpu = use_gpu def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Train models with cross-validation. Parameters ---------- context : dict Must contain 'X_processed', 'y', 'model_configs'. time_budget : float Time budget in seconds. Returns ------- StageResult Contains trained_models and oof_predictions in output. """ # Prevent CUDA from being initialized in the parent process. # _refit_model runs here and PyTorch models can lazily init CUDA # which would poison all subsequent fork()ed children with # "Cannot re-initialize CUDA in forked subprocess". os.environ["CUDA_VISIBLE_DEVICES"] = "" # Auto-select CV strategy based on data characteristics self._cv_strategy = self._select_cv_strategy(context) start_time = time.time() # Initialise outside try so partial results survive exceptions trained_models: dict[str, Any] = {} oof_predictions: dict[str, np.ndarray] = {} results: list = [] try: # Use most-processed data available X = context.get("X_augmented", context.get("X_engineered", context.get("X_processed", context.get("X_cleaned", context.get("X"))))) y = context.get("y_augmented", context.get("y_cleaned", context.get("y"))) # Keep a reference to raw (pre-preprocessing) data so that # configs with their own imputer/encoder can start from # unprocessed features instead of the global pipeline output. X_raw = context.get("X_cleaned", context.get("X")) configs = context["model_configs"] task_type = context.get("task_type", "classification") sample_weights = context.get("sample_weights") # --- Budget planning --- # Each model is guaranteed at least min_model_time. If the # stage budget can't accommodate every model at that rate, # we train as many as we can in priority order (configs are # already sorted by the search strategy). Remaining models # will be tried in the continuous loop. if time_budget == float("inf") or time_budget <= 0: max_trainable = len(configs) else: max_trainable = max(1, int(time_budget / self.min_model_time)) if max_trainable < len(configs): if self.verbose > 0: print( f" Budget ({time_budget:.0f}s) allows " f"{max_trainable}/{len(configs)} models " f"(≥{self.min_model_time:.0f}s each); " f"rest deferred to continuous loop" ) configs = configs[:max_trainable] for i, config in enumerate(configs): model_start = time.time() # Stop if remaining budget can't give the next model # a meaningful amount of time if time_budget == float("inf"): remaining = float("inf") else: remaining = time_budget - (time.time() - start_time) if remaining < self.min_model_time and trained_models: if self.verbose > 0: print( f" Remaining budget ({remaining:.0f}s) < " f"min_model_time ({self.min_model_time:.0f}s) — " f"stopping after {i}/{len(configs)} models" ) break # Per-model budget: fair share of what's left, clamped # between min_model_time and max_model_time. if remaining == float("inf"): model_budget = self.max_model_time else: models_left = max(len(configs) - i, 1) model_budget = max(remaining / models_left, self.min_model_time) model_budget = min(model_budget, remaining, self.max_model_time) if self.verbose > 0: print( f" [{i+1}/{len(configs)}] Training {config.model_name} " f"(budget {model_budget:.0f}s)...", end="", flush=True, ) try: # If the config has its own imputer or encoder, use raw # data so the per-config preprocessing can apply its own # strategy. Otherwise use globally-preprocessed data. _has_preproc = any( s[0] in ("imputer", "encoder") for s in (config.preprocessing or []) ) X_for_config = X_raw if (_has_preproc and X_raw is not None) else X oof_pred, score = _train_with_timeout( self._cv_score_model, config, X_for_config, y, task_type, model_budget, sample_weights=sample_weights, use_gpu=self.use_gpu, ) oof_predictions[config.model_name] = oof_pred # Skip refit for partial-data bandit rungs — these # configs are just being scored for selection, not # used for final prediction. frac = config.metadata.get("data_fraction", 1.0) if frac < 1.0: if self.verbose > 0: print( f" score={score:.4f} " f"({time.time()-model_start:.1f}s, " f"{frac:.0%} data, skip refit)", ) else: # Refit on all data in the parent process — no # pickle boundary, so PyTorch/C-extension models # that segfault during serialization work fine. refit_start = time.time() try: model = self._refit_model( config, X_for_config, y, task_type, sample_weights=sample_weights, ) trained_models[config.model_name] = model refit_time = time.time() - refit_start print( f" refit {refit_time:.0f}s", end="", flush=True, ) except Exception as refit_err: logger.warning( f"Refit failed for {config.model_name}: {refit_err}" ) result = SearchResult( config=config, score=score, scores={"primary": score}, fit_time=time.time() - model_start, oof_predictions=oof_pred, success=True, ) results.append(result) if self.verbose > 0 and frac >= 1.0: print(f" score={score:.4f} ({time.time()-model_start:.1f}s)") logger.debug(f"Trained {config.model_name}: score={score:.4f}") except TimeoutError: elapsed = time.time() - model_start if self.verbose > 0: print(f" KILLED after {elapsed:.0f}s (budget was {model_budget:.0f}s)") logger.warning( f"Model {config.model_name} exceeded time budget " f"({elapsed:.0f}s > {model_budget:.0f}s), killed" ) result = SearchResult( config=config, score=float("-inf"), fit_time=elapsed, success=False, error=f"Killed after {elapsed:.0f}s", ) results.append(result) except RuntimeError as e: err_str = str(e) is_cuda_oom = "CUDA out of memory" in err_str if is_cuda_oom and self.use_gpu: if self.verbose > 0: print(" CUDA OOM — falling back to CPU") logger.warning( f"CUDA OOM for {config.model_name}, " f"falling back to CPU retraining" ) # Retry with GPU disabled for this model try: _prev = os.environ.get("CUDA_VISIBLE_DEVICES") os.environ["CUDA_VISIBLE_DEVICES"] = "" oof_pred, score = self._cv_score_model( config, X_for_config, y, task_type, model_budget, sample_weights=sample_weights, ) oof_predictions[config.model_name] = oof_pred model = self._refit_model( config, X_for_config, y, task_type, sample_weights=sample_weights, ) trained_models[config.model_name] = model result = SearchResult( config=config, score=score, scores={"primary": score}, fit_time=time.time() - model_start, oof_predictions=oof_pred, success=True, ) results.append(result) if self.verbose > 0: print(f" score={score:.4f} (CPU fallback, {time.time()-model_start:.1f}s)") except Exception as cpu_err: if self.verbose > 0: print(f" CPU fallback FAILED ({cpu_err})") logger.warning(f"CPU fallback failed for {config.model_name}: {cpu_err}") result = SearchResult( config=config, score=float("-inf"), fit_time=time.time() - model_start, success=False, error=str(cpu_err), ) results.append(result) finally: if _prev is None: os.environ.pop("CUDA_VISIBLE_DEVICES", None) else: os.environ["CUDA_VISIBLE_DEVICES"] = _prev continue # Not a CUDA OOM — fall through to generic handler if self.verbose > 0: print(f" FAILED ({e})") logger.warning(f"Failed to train {config.model_name}: {e}") result = SearchResult( config=config, score=float("-inf"), fit_time=time.time() - model_start, success=False, error=str(e), ) results.append(result) except Exception as e: if self.verbose > 0: print(f" FAILED ({e})") logger.warning(f"Failed to train {config.model_name}: {e}") result = SearchResult( config=config, score=float("-inf"), fit_time=time.time() - model_start, success=False, error=str(e), ) results.append(result) except Exception as e: logger.error(f"Model training failed: {e}") # --- Fallback: if all models failed/killed, train a fast model --- if not trained_models and results: logger.warning( "All models failed or were killed. " "Training fast fallback model (HistGradientBoosting)." ) if self.verbose > 0: print(" All models killed/failed — training fallback model...", end="", flush=True) try: X = context.get("X_augmented", context.get("X_engineered", context.get("X_processed", context.get("X_cleaned", context.get("X"))))) y = context.get("y_augmented", context.get("y_cleaned", context.get("y"))) task_type = context.get("task_type", "classification") if task_type == "regression": from sklearn.ensemble import HistGradientBoostingRegressor fallback = HistGradientBoostingRegressor( max_iter=100, max_depth=6, random_state=42, ) else: from sklearn.ensemble import HistGradientBoostingClassifier fallback = HistGradientBoostingClassifier( max_iter=100, max_depth=6, random_state=42, ) fallback.fit(X, y) trained_models["fallback_hgb"] = fallback if self.verbose > 0: print(" OK (fallback)") logger.info("Fallback model trained successfully.") except Exception as fallback_err: if self.verbose > 0: print(f" FAILED ({fallback_err})") logger.error(f"Fallback model also failed: {fallback_err}") duration = time.time() - start_time return StageResult( stage_name="model_training", success=len(trained_models) > 0, duration=duration, output={ "trained_models": trained_models, "oof_predictions": oof_predictions, "results": results, }, metadata={ "n_models_trained": len(trained_models), "n_models_failed": max(0, len(results) - len(trained_models)), }, ) def _cv_score_model( self, config: PipelineConfig, X: np.ndarray, y: np.ndarray, task_type: str, time_budget: float, sample_weights: np.ndarray | None = None, ) -> tuple[np.ndarray, float]: """Score a model via cross-validation (no final fit). Runs in a forked child process. Returns only numpy/float data so there are never pickling issues. The parent calls ``_refit_model`` afterwards in-process. If the config has ``metadata["data_fraction"] < 1.0`` (set by BanditSearch), only that fraction of data is used for CV. """ import inspect from sklearn.base import clone from sklearn.model_selection import KFold, StratifiedKFold # ── Data fraction subsampling (BanditSearch support) ──────── data_fraction = config.metadata.get("data_fraction", 1.0) if data_fraction < 1.0: n = len(X) n_sub = max(20, int(n * data_fraction)) if n_sub < n: rng = np.random.RandomState(42) if task_type in ("classification", "binary", "multiclass"): # Stratified subsample to preserve class balance from sklearn.model_selection import StratifiedShuffleSplit sss = StratifiedShuffleSplit( n_splits=1, train_size=n_sub, random_state=42, ) idx, _ = next(sss.split(X, y)) else: idx = rng.choice(n, size=n_sub, replace=False) idx.sort() X = X[idx] y = y[idx] if sample_weights is not None: sample_weights = sample_weights[idx] model = self._instantiate_model(config, task_type) if task_type == "regression" and self.feature_engineering in ("moderate", "aggressive"): try: from scipy.stats import skew target_skewness = skew(y) if abs(target_skewness) > 1.0: from endgame.preprocessing.target_transform import TargetTransformer model = TargetTransformer(regressor=model, method="auto") except ImportError: pass model = self._wrap_with_preprocessing(model, config) supports_sw = False if sample_weights is not None: try: fit_sig = inspect.signature(model.fit) supports_sw = "sample_weight" in fit_sig.parameters except (ValueError, TypeError): pass # Use the intelligently-selected CV strategy (set by execute()) cv = getattr(self, "_cv_strategy", None) if cv is None: if task_type == "classification": cv = StratifiedKFold(n_splits=self.cv_folds, shuffle=True, random_state=42) else: cv = KFold(n_splits=self.cv_folds, shuffle=True, random_state=42) use_proba = task_type == "classification" and hasattr(model, "predict_proba") # ── Detect GBDT models for early stopping ──────────────────── _GBDT_NAMES = {"lgbm", "xgb", "catboost", "ngboost"} is_gbdt = config.model_name in _GBDT_NAMES es_rounds = getattr(self, "early_stopping_rounds", 50) if is_gbdt else 0 oof_pred = None cv_start = time.time() for fold_idx, (train_idx, val_idx) in enumerate(cv.split(X, y)): fold_start = time.time() X_tr, X_val = X[train_idx], X[val_idx] y_tr, y_val = y[train_idx], y[val_idx] fold_model = clone(model) fit_kwargs: dict[str, Any] = {} if supports_sw and sample_weights is not None: fit_kwargs["sample_weight"] = sample_weights[train_idx] # ── Early stopping for GBDTs ───────────────────────────── if es_rounds > 0: try: fit_kwargs.update( self._get_early_stopping_kwargs( fold_model, config.model_name, X_val, y_val, es_rounds, ) ) except Exception: pass # Fall back to training without early stopping fold_model.fit(X_tr, y_tr, **fit_kwargs) if use_proba: preds = fold_model.predict_proba(X_val) else: preds = fold_model.predict(X_val) if oof_pred is None: if preds.ndim > 1: oof_pred = np.zeros((len(y), preds.shape[1]), dtype=preds.dtype) else: oof_pred = np.zeros(len(y), dtype=preds.dtype) oof_pred[val_idx] = preds fold_time = time.time() - fold_start n_total_folds = cv.get_n_splits(X, y) if hasattr(cv, "get_n_splits") else self.cv_folds print( f" fold {fold_idx + 1}/{n_total_folds} {fold_time:.0f}s", end="", flush=True, ) score = self._score_oof(oof_pred, y, task_type) return oof_pred, score def _refit_model( self, config: PipelineConfig, X: np.ndarray, y: np.ndarray, task_type: str, sample_weights: np.ndarray | None = None, ) -> Any: """Refit a model on all data in the parent process (no pickle).""" import inspect model = self._instantiate_model(config, task_type) if task_type == "regression" and self.feature_engineering in ("moderate", "aggressive"): try: from scipy.stats import skew target_skewness = skew(y) if abs(target_skewness) > 1.0: from endgame.preprocessing.target_transform import TargetTransformer model = TargetTransformer(regressor=model, method="auto") except ImportError: pass model = self._wrap_with_preprocessing(model, config) supports_sw = False if sample_weights is not None: try: fit_sig = inspect.signature(model.fit) supports_sw = "sample_weight" in fit_sig.parameters except (ValueError, TypeError): pass if supports_sw and sample_weights is not None: model.fit(X, y, sample_weight=sample_weights) else: model.fit(X, y) return model def _select_cv_strategy(self, context: dict[str, Any]) -> Any: """Auto-select a CV splitter based on data characteristics. Examines meta-features and context to choose the most appropriate cross-validation strategy: - Time series data -> PurgedTimeSeriesSplit (if time column detected) - Grouped data -> StratifiedGroupKFold (if groups present) - Small datasets (< 500 samples) -> RepeatedStratifiedKFold / RepeatedKFold - Imbalanced classification (minority < 10%) -> StratifiedKFold - Default classification -> StratifiedKFold - Default regression -> KFold """ from sklearn.model_selection import ( KFold, RepeatedKFold, RepeatedStratifiedKFold, StratifiedKFold, ) task_type = context.get("task_type", "classification") meta_features = context.get("meta_features", {}) y = context.get("y_augmented", context.get("y_cleaned", context.get("y"))) n_folds = self.cv_folds is_clf = task_type in ("classification", "binary", "multiclass") n_samples = len(y) if y is not None else meta_features.get("nr_inst", 10_000) # ── Time series: use purged time series split ─────────────── if meta_features.get("is_timeseries") or context.get("time_column"): try: from endgame.validation import PurgedTimeSeriesSplit cv = PurgedTimeSeriesSplit( n_splits=n_folds, embargo_pct=0.01, ) if self.verbose > 0: print(f" CV strategy: PurgedTimeSeriesSplit (n_splits={n_folds})") return cv except ImportError: pass # ── Grouped data: use group-aware splitting ───────────────── groups = context.get("groups") if groups is not None: try: from endgame.validation import StratifiedGroupKFold as SGKFold cv = SGKFold(n_splits=min(n_folds, len(np.unique(groups)))) if self.verbose > 0: print(f" CV strategy: StratifiedGroupKFold (n_splits={cv.n_splits})") return cv except ImportError: from sklearn.model_selection import GroupKFold cv = GroupKFold(n_splits=min(n_folds, len(np.unique(groups)))) if self.verbose > 0: print(f" CV strategy: GroupKFold (n_splits={cv.n_splits})") return cv # ── Small datasets: use repeated k-fold for more stable estimates if n_samples < 500: n_repeats = 3 if is_clf: cv = RepeatedStratifiedKFold( n_splits=n_folds, n_repeats=n_repeats, random_state=42, ) if self.verbose > 0: print( f" CV strategy: RepeatedStratifiedKFold " f"({n_folds}x{n_repeats}, small dataset: {n_samples} samples)" ) else: cv = RepeatedKFold( n_splits=n_folds, n_repeats=n_repeats, random_state=42, ) if self.verbose > 0: print( f" CV strategy: RepeatedKFold " f"({n_folds}x{n_repeats}, small dataset: {n_samples} samples)" ) return cv # ── Default: stratified for classification, plain for regression if is_clf: cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42) if self.verbose > 1: print(f" CV strategy: StratifiedKFold (n_splits={n_folds})") else: cv = KFold(n_splits=n_folds, shuffle=True, random_state=42) if self.verbose > 1: print(f" CV strategy: KFold (n_splits={n_folds})") return cv def _score_oof( self, oof_pred: np.ndarray, y: np.ndarray, task_type: str, ) -> float: """Score OOF predictions using the configured eval_metric.""" metric = self.eval_metric is_clf = task_type in ("classification", "binary", "multiclass") if metric == "auto": metric = "roc_auc" if is_clf else "rmse" if is_clf and oof_pred.ndim > 1: # For binary: use probability of positive class if oof_pred.shape[1] == 2: oof_proba = oof_pred[:, 1] else: oof_proba = oof_pred pred_labels = np.argmax(oof_pred, axis=1) elif is_clf: oof_proba = oof_pred pred_labels = (oof_pred > 0.5).astype(int) else: oof_proba = None pred_labels = None if metric == "roc_auc": from sklearn.metrics import roc_auc_score try: if oof_proba is not None and oof_proba.ndim > 1: return roc_auc_score(y, oof_proba, multi_class="ovr") return roc_auc_score(y, oof_proba) except ValueError: from sklearn.metrics import accuracy_score return accuracy_score(y, pred_labels) elif metric == "log_loss": from sklearn.metrics import log_loss return -log_loss(y, oof_proba if oof_proba is not None else oof_pred) elif metric == "accuracy": from sklearn.metrics import accuracy_score return accuracy_score(y, pred_labels) elif metric == "f1": from sklearn.metrics import f1_score return f1_score(y, pred_labels, average="binary" if len(np.unique(y)) == 2 else "macro") elif metric in ("rmse", "neg_rmse"): from sklearn.metrics import root_mean_squared_error return -root_mean_squared_error(y, oof_pred) elif metric in ("r2", "r_squared"): from sklearn.metrics import r2_score return r2_score(y, oof_pred) elif metric == "mae": from sklearn.metrics import mean_absolute_error return -mean_absolute_error(y, oof_pred) else: if is_clf: from sklearn.metrics import accuracy_score return accuracy_score(y, pred_labels) from sklearn.metrics import r2_score return r2_score(y, oof_pred) @staticmethod def _get_early_stopping_kwargs( model: Any, model_name: str, X_val: np.ndarray, y_val: np.ndarray, early_stopping_rounds: int, ) -> dict[str, Any]: """Build early-stopping fit kwargs for GBDT models. Handles both bare estimators and sklearn Pipelines (where preprocessing steps must transform X_val before it reaches the final estimator). Returns a dict of keyword arguments to pass to ``model.fit()``. """ from sklearn.pipeline import Pipeline # If the model is wrapped in a Pipeline, we need to transform # X_val through the preprocessing steps and prefix the kwarg # names with the final step name. if isinstance(model, Pipeline): # Transform X_val through all steps except the final estimator X_val_transformed = X_val for step_name, step_transformer in model.steps[:-1]: if hasattr(step_transformer, "transform"): X_val_transformed = step_transformer.transform(X_val_transformed) final_step_name = model.steps[-1][0] prefix = f"{final_step_name}__" else: X_val_transformed = X_val prefix = "" kwargs: dict[str, Any] = {} if model_name == "lgbm": kwargs[f"{prefix}eval_set"] = [(X_val_transformed, y_val)] try: import lightgbm as lgb kwargs[f"{prefix}callbacks"] = [ lgb.early_stopping(early_stopping_rounds, verbose=False), lgb.log_evaluation(period=0), ] except ImportError: # LightGBM not available — skip early stopping return {} elif model_name == "xgb": kwargs[f"{prefix}eval_set"] = [(X_val_transformed, y_val)] kwargs[f"{prefix}verbose"] = False elif model_name == "catboost": kwargs[f"{prefix}eval_set"] = (X_val_transformed, y_val) kwargs[f"{prefix}early_stopping_rounds"] = early_stopping_rounds kwargs[f"{prefix}verbose"] = 0 elif model_name == "ngboost": kwargs[f"{prefix}X_val"] = X_val_transformed kwargs[f"{prefix}Y_val"] = y_val kwargs[f"{prefix}early_stopping_rounds"] = early_stopping_rounds else: return {} return kwargs @staticmethod def _wrap_with_preprocessing( model: Any, config: PipelineConfig, ) -> Any: """Wrap *model* in a sklearn Pipeline if the config specifies per-model preprocessing or feature-selection steps. The global preprocessing (run by ``PreprocessingExecutor``) has already been applied to X. These per-config steps add *extra* transformations so that different pipeline configs can explore different scaling, encoding, or feature-selection strategies. """ steps_spec = config.preprocessing or [] if not steps_spec: return model from sklearn.pipeline import Pipeline pipeline_steps: list[tuple[str, Any]] = [] for step_name, params in steps_spec: try: transformer = _build_preprocessing_step(step_name, params) if transformer is not None: pipeline_steps.append((step_name, transformer)) except Exception as e: logger.debug(f"Could not build preprocessing step {step_name}: {e}") if not pipeline_steps: return model pipeline_steps.append(("model", model)) return Pipeline(pipeline_steps) def _instantiate_model(self, config: PipelineConfig, task_type: str): """Instantiate a model from configuration. Parameters ---------- config : PipelineConfig Model configuration. task_type : str Task type. Returns ------- model Instantiated model. """ from endgame.automl.model_registry import get_model_class model_class = get_model_class(config.model_name) if model_class is None: # Fallback to sklearn if config.model_name == "lgbm": if task_type == "classification": from lightgbm import LGBMClassifier model_class = LGBMClassifier else: from lightgbm import LGBMRegressor model_class = LGBMRegressor else: raise ValueError(f"Unknown model: {config.model_name}") return model_class(**config.model_params) class _ImportanceMaskSelector: """Sklearn-compatible feature selector using a precomputed boolean mask. Used by iterative feature selection: the mask is derived from aggregate feature importances of previously trained models. """ def __init__(self, mask: list[bool]): self._mask = np.asarray(mask, dtype=bool) def fit(self, X, y=None): return self def transform(self, X): if hasattr(X, "iloc"): return X.iloc[:, self._mask[:X.shape[1]]] return X[:, self._mask[:X.shape[1]]] def fit_transform(self, X, y=None): return self.fit(X, y).transform(X) def get_support(self, indices=False): if indices: return np.where(self._mask)[0] return self._mask def _build_preprocessing_step(step_name: str, params: dict) -> Any: """Build a single sklearn-compatible preprocessing transformer. Called by ``ModelTrainingExecutor._wrap_with_preprocessing`` to construct per-config pipeline steps. Returns ``None`` if the step cannot be built (missing dependency, unknown name, etc.). """ p = params or {} if step_name == "scaler": method = p.get("method", "standard") if method == "standard": from sklearn.preprocessing import StandardScaler return StandardScaler() elif method == "robust": from sklearn.preprocessing import RobustScaler return RobustScaler() elif method == "quantile": from sklearn.preprocessing import QuantileTransformer return QuantileTransformer(output_distribution="normal", random_state=42) elif method == "minmax": from sklearn.preprocessing import MinMaxScaler return MinMaxScaler() elif step_name == "feature_selection": method = p.get("method", "variance_threshold") if method == "variance_threshold": from sklearn.feature_selection import VarianceThreshold return VarianceThreshold(threshold=p.get("threshold", 0.01)) elif method == "mutual_info": from sklearn.feature_selection import SelectKBest, mutual_info_classif return SelectKBest(mutual_info_classif, k=min(p.get("k", 20), 50)) elif method == "boruta": try: from endgame.preprocessing.feature_selection import BorutaSelector return BorutaSelector(max_iter=30) except ImportError: return None elif method == "importance_mask": # Importance-based mask from iterative feature selection feedback mask = p.get("mask") if mask is not None: return _ImportanceMaskSelector(mask) elif step_name == "imputer": strategy = p.get("strategy", "median") if strategy == "knn": from sklearn.impute import KNNImputer return KNNImputer(n_neighbors=p.get("n_neighbors", 5)) else: from sklearn.impute import SimpleImputer return SimpleImputer(strategy=strategy) elif step_name == "encoder": method = p.get("method", "ordinal") if method == "ordinal": from sklearn.preprocessing import OrdinalEncoder return OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1) elif method == "onehot": from sklearn.preprocessing import OneHotEncoder return OneHotEncoder(handle_unknown="ignore", sparse_output=False, max_categories=20) elif method == "target": try: from endgame.preprocessing.encoding import SafeTargetEncoder return SafeTargetEncoder() except ImportError: from sklearn.preprocessing import OrdinalEncoder return OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1) elif step_name == "dim_reduction": method = p.get("method", "pca") n_components = p.get("n_components", 0.95) if method == "pca": from sklearn.decomposition import PCA if isinstance(n_components, float) and 0 < n_components < 1: return PCA(n_components=n_components, random_state=42) # Integer n_components — wrap to avoid exceeding feature count from endgame.automl._safe_dim_reduce import SafePCA return SafePCA(n_components=int(n_components), random_state=42) elif method == "truncated_svd": from endgame.automl._safe_dim_reduce import SafeTruncatedSVD n_comp = int(n_components) if isinstance(n_components, (int, float)) and n_components >= 1 else 10 return SafeTruncatedSVD(n_components=n_comp, random_state=42) return None class EnsemblingExecutor(BaseStageExecutor): """Executes the ensembling stage. When ``method="auto"``, tries all available strategies (hill climbing, stacking, averaging) and picks the one with the best OOF score. """ def __init__(self, method: str = "hill_climbing"): """Initialize ensembling executor. Parameters ---------- method : str, default="hill_climbing" Ensemble method: "none", "hill_climbing", "stacking", "auto". "auto" tries all and picks the best. """ self.method = method def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Build ensemble from trained models.""" start_time = time.time() try: trained_models = context["trained_models"] oof_predictions = context["oof_predictions"] y = context.get("y_augmented", context.get("y_cleaned", context["y"])) task_type = context.get("task_type", "classification") if not trained_models: duration = time.time() - start_time return StageResult( stage_name="ensembling", success=False, duration=duration, output={"ensemble": None, "weights": {}}, error="No trained models available for ensembling", ) if len(trained_models) < 2 or self.method == "none": results_list = context.get("results", []) score_map = {} if isinstance(results_list, list): for r in results_list: if hasattr(r, "config") and hasattr(r, "score") and r.success: score_map[r.config.model_name] = r.score best_model_name = max( trained_models.keys(), key=lambda k: score_map.get(k, 0), ) ensemble = trained_models.get(best_model_name) duration = time.time() - start_time return StageResult( stage_name="ensembling", success=True, duration=duration, output={"ensemble": ensemble, "weights": {best_model_name: 1.0}}, metadata={"method": "single_best"}, ) # Try multiple methods and pick the best if self.method == "auto": ensemble, weights, chosen_method = self._auto_ensemble( trained_models, oof_predictions, y, task_type, ) elif self.method == "hill_climbing": ensemble, weights = self._hill_climbing_ensemble( trained_models, oof_predictions, y, task_type, ) chosen_method = "hill_climbing" elif self.method == "stacking": ensemble, weights = self._stacking_ensemble( trained_models, oof_predictions, y, task_type, ) chosen_method = "stacking" else: ensemble, weights = self._average_ensemble(trained_models) chosen_method = "averaging" duration = time.time() - start_time return StageResult( stage_name="ensembling", success=True, duration=duration, output={"ensemble": ensemble, "weights": weights}, metadata={"method": chosen_method, "n_models": len(weights)}, ) except Exception as e: duration = time.time() - start_time logger.error(f"Ensembling failed: {e}") if "trained_models" in context and context["trained_models"]: best_model = list(context["trained_models"].values())[0] else: best_model = None return StageResult( stage_name="ensembling", success=False, duration=duration, output={"ensemble": best_model, "weights": {}}, error=str(e), ) def _auto_ensemble( self, trained_models: dict[str, Any], oof_predictions: dict[str, np.ndarray], y: np.ndarray, task_type: str, ) -> tuple[Any, dict[str, float], str]: """Try all ensemble methods and return the best by OOF score.""" from sklearn.metrics import r2_score, roc_auc_score is_clf = task_type in ("classification", "binary", "multiclass") def _safe_roc_auc(y_true, y_pred): try: return roc_auc_score(y_true, y_pred) except ValueError: from sklearn.metrics import accuracy_score return accuracy_score(y_true, y_pred) score_fn = _safe_roc_auc if is_clf else r2_score candidates: list[tuple[Any, dict[str, float], str, float]] = [] valid_oof = { k: v for k, v in oof_predictions.items() if isinstance(v, np.ndarray) and len(v) == len(y) } # Hill climbing try: ens, wts = self._hill_climbing_ensemble( trained_models, valid_oof, y, task_type, ) candidates.append((ens, wts, "hill_climbing", 0.0)) except Exception as e: logger.debug(f"Hill climbing ensemble failed: {e}") # Stacking try: ens, wts = self._stacking_ensemble( trained_models, valid_oof, y, task_type, ) candidates.append((ens, wts, "stacking", 0.0)) except Exception as e: logger.debug(f"Stacking ensemble failed: {e}") # Optuna-optimized blending try: ens, wts = self._optimized_blend_ensemble( trained_models, valid_oof, y, task_type, ) candidates.append((ens, wts, "optimized_blend", 0.0)) except Exception as e: logger.debug(f"Optimized blend ensemble failed: {e}") # Power-weighted blending try: ens, wts = self._power_blend_ensemble( trained_models, valid_oof, y, task_type, ) candidates.append((ens, wts, "power_blend", 0.0)) except Exception as e: logger.debug(f"Power blend ensemble failed: {e}") # Rank averaging try: ens, wts = self._rank_average_ensemble( trained_models, valid_oof, y, task_type, ) candidates.append((ens, wts, "rank_average", 0.0)) except Exception as e: logger.debug(f"Rank average ensemble failed: {e}") # Uniform averaging (always available as a baseline) try: ens, wts = self._average_ensemble(trained_models) candidates.append((ens, wts, "averaging", 0.0)) except Exception as e: logger.debug(f"Averaging ensemble failed: {e}") # Score each candidate using OOF predictions for i, (ens, wts, method, _) in enumerate(candidates): try: model_names = list(valid_oof.keys()) oof_pred = None # Stacking: use the meta-estimator on the OOF stack if hasattr(ens, "meta_estimator") and hasattr(ens, "model_order"): meta_X = np.column_stack( [valid_oof[n] for n in ens.model_order if n in valid_oof] ) oof_pred = ens.meta_estimator.predict(meta_X) # Rank averaging: use the blender's rank transform elif hasattr(ens, "blender") and hasattr(ens.blender, "blend"): oof_preds_list = [ valid_oof[n] for n in ens.model_order if n in valid_oof ] if oof_preds_list: blended = ens.blender.blend(oof_preds_list) if is_clf: oof_pred = (blended > 0.5).astype(int) else: oof_pred = blended # Weighted ensembles (hill climbing, optimized blend, power blend, averaging) elif hasattr(ens, "weights"): total_w = sum(wts.get(n, 0) for n in model_names) if total_w > 0 and is_clf: proba = None for name in model_names: w = wts.get(name, 0) / total_w if w > 0 and name in valid_oof: p = valid_oof[name] if p.ndim == 1: p = np.column_stack([1 - p, p]) proba = p * w if proba is None else proba + p * w oof_pred = ( np.argmax(proba, axis=1) if proba is not None else np.zeros(len(y)) ) elif total_w > 0: oof_pred = sum( wts.get(n, 0) / total_w * valid_oof[n] for n in model_names if n in valid_oof and wts.get(n, 0) > 0 ) if oof_pred is not None: score = score_fn(y, oof_pred) candidates[i] = (ens, wts, method, score) except Exception as e: logger.debug(f"Ensemble scoring failed for {method}: {e}") if not candidates: return self._average_ensemble(trained_models) + ("averaging",) # Pick the best best = max(candidates, key=lambda x: x[3]) logger.info( f"Auto-ensemble: tried {[c[2] for c in candidates]}, " f"scores={[f'{c[3]:.4f}' for c in candidates]}, " f"chose {best[2]}" ) return best[0], best[1], best[2] def _hill_climbing_ensemble( self, trained_models: dict[str, Any], oof_predictions: dict[str, np.ndarray], y: np.ndarray, task_type: str, ) -> tuple[Any, dict[str, float]]: """Build hill climbing ensemble. Parameters ---------- trained_models : dict Trained models. oof_predictions : dict OOF predictions for each model. y : array-like Target vector. task_type : str Task type. Returns ------- tuple (ensemble, weights) """ try: from endgame.ensemble.hill_climbing import HillClimbingEnsemble preds_list = list(oof_predictions.values()) model_names = list(oof_predictions.keys()) hc = HillClimbingEnsemble( metric="roc_auc" if task_type == "classification" else "r2", n_iterations=100, ) hc.fit(preds_list, y) # hc.weights_ is {int_index: float_weight} — map to model names weights = { model_names[idx]: w for idx, w in hc.weights_.items() if idx < len(model_names) } ensemble = _WeightedEnsemble( models=trained_models, weights=weights, task_type=task_type, ) return ensemble, weights except ImportError: return self._average_ensemble(trained_models) def _stacking_ensemble( self, trained_models: dict[str, Any], oof_predictions: dict[str, np.ndarray], y: np.ndarray, task_type: str, ) -> tuple[Any, dict[str, float]]: """Build stacking ensemble using OOF predictions as meta-features. Parameters ---------- trained_models : dict Trained models. oof_predictions : dict OOF predictions for each model. y : array-like Target vector. task_type : str Task type. Returns ------- tuple (ensemble, weights) """ try: from sklearn.linear_model import LogisticRegression, Ridge model_names = list(oof_predictions.keys()) meta_X = np.column_stack(list(oof_predictions.values())) is_clf = task_type in ("classification", "binary", "multiclass") if is_clf: meta_est = LogisticRegression(max_iter=1000, solver="lbfgs") else: meta_est = Ridge(alpha=1.0) meta_est.fit(meta_X, y) weights = {name: 1.0 / len(model_names) for name in model_names} ensemble = _StackingEnsembleWrapper( models=trained_models, model_order=model_names, meta_estimator=meta_est, task_type=task_type, ) return ensemble, weights except Exception: return self._average_ensemble(trained_models) def _average_ensemble( self, trained_models: dict[str, Any], ) -> tuple[Any, dict[str, float]]: """Build simple averaging ensemble. Parameters ---------- trained_models : dict Trained models. Returns ------- tuple (ensemble, weights) """ weights = {name: 1.0 / len(trained_models) for name in trained_models} ensemble = _WeightedEnsemble( models=trained_models, weights=weights, task_type="classification", ) return ensemble, weights def _optimized_blend_ensemble( self, trained_models: dict[str, Any], oof_predictions: dict[str, np.ndarray], y: np.ndarray, task_type: str, ) -> tuple[Any, dict[str, float]]: """Optuna-optimized blend weights.""" from endgame.ensemble.blending import OptimizedBlender preds_list = list(oof_predictions.values()) model_names = list(oof_predictions.keys()) is_clf = task_type in ("classification", "binary", "multiclass") blender = OptimizedBlender( metric="roc_auc" if is_clf else "rmse", n_trials=30, maximize=is_clf, verbose=False, ) blender.fit(preds_list, y) weights = { model_names[idx]: w for idx, w in blender.weights_.items() if idx < len(model_names) } ensemble = _WeightedEnsemble( models=trained_models, weights=weights, task_type=task_type, ) return ensemble, weights def _power_blend_ensemble( self, trained_models: dict[str, Any], oof_predictions: dict[str, np.ndarray], y: np.ndarray, task_type: str, ) -> tuple[Any, dict[str, float]]: """Power-weighted blending based on individual OOF scores.""" from sklearn.metrics import r2_score, roc_auc_score from endgame.ensemble.blending import PowerBlender preds_list = list(oof_predictions.values()) model_names = list(oof_predictions.keys()) is_clf = task_type in ("classification", "binary", "multiclass") score_fn = roc_auc_score if is_clf else r2_score scores = [] for pred in preds_list: try: scores.append(score_fn(y, pred)) except Exception: scores.append(0.5 if is_clf else 0.0) blender = PowerBlender(scores=scores, power=3.0, higher_is_better=True) blender.fit() weights = { model_names[idx]: w for idx, w in blender.weights_.items() if idx < len(model_names) } ensemble = _WeightedEnsemble( models=trained_models, weights=weights, task_type=task_type, ) return ensemble, weights def _rank_average_ensemble( self, trained_models: dict[str, Any], oof_predictions: dict[str, np.ndarray], y: np.ndarray, task_type: str, ) -> tuple[Any, dict[str, float]]: """Rank-based averaging — robust to different prediction scales.""" from endgame.ensemble.blending import RankAverageBlender preds_list = list(oof_predictions.values()) model_names = list(oof_predictions.keys()) blender = RankAverageBlender() blender.fit() weights = {name: 1.0 / len(model_names) for name in model_names} ensemble = _RankAverageEnsembleWrapper( models=trained_models, model_order=model_names, blender=blender, task_type=task_type, ) return ensemble, weights class _RankAverageEnsembleWrapper: """Wrapper for rank-average ensemble that applies rank transform at predict time.""" def __init__( self, models: dict[str, Any], model_order: list[str], blender: Any, task_type: str = "classification", ): self.models = models self.model_order = model_order self.blender = blender self.task_type = task_type self.weights = {name: 1.0 / len(model_order) for name in model_order} def predict(self, X: np.ndarray) -> np.ndarray: preds = [] for name in self.model_order: if name in self.models: try: if self.task_type == "classification" and hasattr(self.models[name], "predict_proba"): p = self.models[name].predict_proba(X)[:, 1] else: p = self.models[name].predict(X) preds.append(p) except Exception: continue if not preds: return np.zeros(X.shape[0]) blended = self.blender.blend(preds) if self.task_type == "classification": return (blended > 0.5).astype(int) return blended def predict_proba(self, X: np.ndarray) -> np.ndarray: preds = [] for name in self.model_order: if name in self.models: try: if hasattr(self.models[name], "predict_proba"): p = self.models[name].predict_proba(X)[:, 1] else: p = self.models[name].predict(X) preds.append(p) except Exception: continue if not preds: p1 = np.full(X.shape[0], 0.5) return np.column_stack([1 - p1, p1]) blended = self.blender.blend(preds) return np.column_stack([1 - blended, blended]) class _WeightedEnsemble: """Simple weighted ensemble wrapper.""" def __init__( self, models: dict[str, Any], weights: dict[str, float], task_type: str = "classification", ): self.models = models self.weights = weights self.task_type = task_type def predict(self, X: np.ndarray) -> np.ndarray: """Make predictions. Parameters ---------- X : array-like Feature matrix. Returns ------- array-like Predictions. """ if self.task_type == "classification": proba = self.predict_proba(X) return np.argmax(proba, axis=1) else: preds = [] total_weight = sum(self.weights.values()) if total_weight == 0: total_weight = len(self.models) self.weights = {n: 1.0 for n in self.models} for name, model in self.models.items(): weight = self.weights.get(name, 0) / total_weight if weight > 0: preds.append(weight * model.predict(X)) return sum(preds) if preds else list(self.models.values())[0].predict(X) def predict_proba(self, X: np.ndarray) -> np.ndarray: """Predict class probabilities. Parameters ---------- X : array-like Feature matrix. Returns ------- array-like Class probabilities. """ proba = None total_weight = sum(self.weights.values()) # Fall back to equal weights if all weights are zero if total_weight == 0: total_weight = len(self.models) self.weights = {n: 1.0 for n in self.models} for name, model in self.models.items(): weight = self.weights.get(name, 0) / total_weight if weight > 0 and hasattr(model, "predict_proba"): model_proba = model.predict_proba(X) if proba is None: proba = weight * model_proba else: proba += weight * model_proba return proba if proba is not None else np.zeros((X.shape[0], 2)) class _StackingEnsembleWrapper: """Stacking ensemble: base model predictions -> meta-estimator.""" def __init__( self, models: dict[str, Any], model_order: list[str], meta_estimator: Any, task_type: str = "classification", ): self.models = models self.model_order = model_order self.meta_estimator = meta_estimator self.task_type = task_type def _meta_features(self, X: np.ndarray) -> np.ndarray: cols = [] for name in self.model_order: model = self.models[name] if hasattr(model, "predict_proba"): p = model.predict_proba(X) cols.append(p[:, 1] if p.ndim == 2 and p.shape[1] == 2 else p) else: cols.append(model.predict(X)) return np.column_stack(cols) def predict(self, X: np.ndarray) -> np.ndarray: meta_X = self._meta_features(X) return self.meta_estimator.predict(meta_X) def predict_proba(self, X: np.ndarray) -> np.ndarray: meta_X = self._meta_features(X) if hasattr(self.meta_estimator, "predict_proba"): return self.meta_estimator.predict_proba(meta_X) decision = self.meta_estimator.predict(meta_X) proba = np.column_stack([1 - decision, decision]) return proba class CalibrationExecutor(BaseStageExecutor): """Executes the calibration stage.""" def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Calibrate model probabilities. Parameters ---------- context : dict Must contain 'ensemble', 'X_val', 'y_val'. time_budget : float Time budget in seconds. Returns ------- StageResult Contains calibrator in output. """ start_time = time.time() try: ensemble = context.get("ensemble") X_val = context.get("X_val") y_val = context.get("y_val") task_type = context.get("task_type", "classification") if ensemble is None or task_type != "classification" or X_val is None: duration = time.time() - start_time return StageResult( stage_name="calibration", success=True, duration=duration, output={"calibrator": None}, metadata={"skipped": True}, ) # Get uncalibrated probabilities if hasattr(ensemble, "predict_proba"): uncalibrated_proba = ensemble.predict_proba(X_val) else: duration = time.time() - start_time return StageResult( stage_name="calibration", success=True, duration=duration, output={"calibrator": None}, metadata={"skipped": True, "reason": "no predict_proba"}, ) # Try different calibration methods calibrator = self._select_best_calibrator(uncalibrated_proba, y_val) duration = time.time() - start_time return StageResult( stage_name="calibration", success=True, duration=duration, output={"calibrator": calibrator}, metadata={"method": type(calibrator).__name__ if calibrator else None}, ) except Exception as e: duration = time.time() - start_time logger.error(f"Calibration failed: {e}") return StageResult( stage_name="calibration", success=False, duration=duration, output={"calibrator": None}, error=str(e), ) def _select_best_calibrator( self, proba: np.ndarray, y: np.ndarray, ): """Select best calibration method using ECE-based 3-fold CV. Tries PlattScaling, IsotonicCalibration, and BetaCalibration from endgame.calibration, selecting the one with lowest ECE. Falls back to sklearn LogisticRegression if endgame calibration is unavailable. Parameters ---------- proba : array-like Uncalibrated probabilities. y : array-like True labels. Returns ------- calibrator Best calibrator or None. """ # Extract 1D probabilities for binary case if proba.ndim > 1: proba_1d = proba[:, 1] if proba.shape[1] == 2 else proba else: proba_1d = proba try: from sklearn.model_selection import KFold from endgame.calibration.analysis import expected_calibration_error from endgame.calibration.scaling import ( BetaCalibration, IsotonicCalibration, PlattScaling, ) candidates = [ ("PlattScaling", PlattScaling()), ("IsotonicCalibration", IsotonicCalibration()), ("BetaCalibration", BetaCalibration()), ] # Add additional calibrators if available try: from endgame.calibration.venn_abers import VennABERS candidates.append(("VennABERS", VennABERS())) except ImportError: pass try: from endgame.calibration.scaling import TemperatureScaling candidates.append(("TemperatureScaling", TemperatureScaling())) except ImportError: pass try: from endgame.calibration.scaling import HistogramBinning candidates.append(("HistogramBinning", HistogramBinning())) except ImportError: pass best_calibrator = None best_ece = float("inf") best_name = None for name, calibrator_template in candidates: try: # 3-fold CV to estimate ECE ece_scores = [] kf = KFold(n_splits=3, shuffle=True, random_state=42) for train_idx, val_idx in kf.split(proba_1d): p_train, p_val = proba_1d[train_idx], proba_1d[val_idx] y_train, y_val = y[train_idx], y[val_idx] # Clone calibrator for each fold from sklearn.base import clone cal = clone(calibrator_template) cal.fit(p_train, y_train) p_cal = cal.transform(p_val) # Ensure 1D for ECE computation if p_cal.ndim > 1: p_cal = p_cal[:, 1] if p_cal.shape[1] == 2 else p_cal.ravel() ece = expected_calibration_error(y_val, p_cal) ece_scores.append(ece) mean_ece = np.mean(ece_scores) if mean_ece < best_ece: best_ece = mean_ece best_name = name best_calibrator = clone(calibrator_template) except Exception as e: logger.debug(f"Calibrator {name} failed during CV: {e}") continue if best_calibrator is not None: # Fit on full data best_calibrator.fit(proba_1d, y) logger.info(f"Selected calibrator: {best_name} (ECE={best_ece:.4f})") return best_calibrator except ImportError: logger.debug("endgame.calibration not available, using sklearn fallback") # Fallback to sklearn LogisticRegression try: from sklearn.linear_model import LogisticRegression calibrator = LogisticRegression() calibrator.fit(proba_1d.reshape(-1, 1), y) return calibrator except Exception: return None class PostTrainingExecutor(BaseStageExecutor): """Executes the post-training stage. Handles knowledge distillation and conformal prediction. """ def __init__(self, feature_engineering: str = "none"): self.feature_engineering = feature_engineering def execute( self, context: dict[str, Any], time_budget: float, ) -> StageResult: """Apply post-training optimizations. Parameters ---------- context : dict Must contain 'ensemble' and optionally validation data. time_budget : float Time budget in seconds. Returns ------- StageResult Contains distilled_model and conformal_predictor in output. """ start_time = time.time() level = self.feature_engineering task_type = context.get("task_type", "classification") if level in ("none", "light"): duration = time.time() - start_time return StageResult( stage_name="post_training", success=True, duration=duration, output={"distilled_model": None, "conformal_predictor": None}, metadata={"skipped": True, "level": level}, ) try: ensemble = context.get("ensemble") X_val = context.get("X_val") y_val = context.get("y_val") distilled_model = None conformal_predictor = None # Aggressive: Knowledge distillation if level == "aggressive" and ensemble is not None: elapsed = time.time() - start_time if elapsed < time_budget * 0.5: try: from endgame.ensemble.distillation import KnowledgeDistiller distiller = KnowledgeDistiller( teacher=ensemble, student=None, # Use default (LGBMClassifier) temperature=3.0, alpha=0.7, ) # Use training data for distillation X_train = context.get("X_augmented", context.get("X_engineered", context.get("X_processed", context.get("X_cleaned", context.get("X"))))) y_train = context.get("y_augmented", context.get("y_cleaned", context.get("y"))) distilled_model = distiller.fit(X_train, y_train) logger.info("PostTraining: knowledge distillation complete") except ImportError: logger.debug("KnowledgeDistiller not available") except Exception as e: logger.debug(f"Knowledge distillation failed: {e}") # Moderate+Aggressive: Conformal prediction (if validation data available) if level in ("moderate", "aggressive") and X_val is not None and y_val is not None: elapsed = time.time() - start_time if elapsed < time_budget * 0.9: try: if task_type == "classification": from endgame.calibration.conformal import ConformalClassifier conformal_predictor = ConformalClassifier( method="lac", alpha=0.1, ) else: from endgame.calibration.conformal import ConformalRegressor conformal_predictor = ConformalRegressor( method="adaptive", alpha=0.1, ) if ensemble is not None: conformal_predictor.fit(ensemble, X_val, y_val) logger.info("PostTraining: conformal prediction calibrated") except ImportError: logger.debug("Conformal prediction not available") conformal_predictor = None except Exception as e: logger.debug(f"Conformal prediction failed: {e}") conformal_predictor = None duration = time.time() - start_time return StageResult( stage_name="post_training", success=True, duration=duration, output={ "distilled_model": distilled_model, "conformal_predictor": conformal_predictor, }, metadata={ "level": level, "has_distilled": distilled_model is not None, "has_conformal": conformal_predictor is not None, }, ) except Exception as e: duration = time.time() - start_time logger.error(f"Post-training failed: {e}") return StageResult( stage_name="post_training", success=False, duration=duration, output={"distilled_model": None, "conformal_predictor": None}, error=str(e), )
[docs] class PipelineOrchestrator: """Coordinates AutoML pipeline stages with time budget management. The orchestrator manages the execution of all pipeline stages, handles time allocation, and provides graceful degradation when stages fail or time runs out. Parameters ---------- preset : str or PresetConfig, default="medium_quality" Preset configuration to use. time_limit : int, optional Total time budget in seconds. Overrides preset default. search_strategy : BaseSearchStrategy, optional Search strategy for model selection. verbose : int, default=1 Verbosity level. Attributes ---------- stage_results_ : dict Results from each executed stage. time_manager_ : TimeBudgetManager Time budget manager. Examples -------- >>> orchestrator = PipelineOrchestrator(preset="medium_quality") >>> result = orchestrator.run(X_train, y_train, X_val, y_val) >>> print(result.score) """ # Default stage order and time allocations DEFAULT_STAGES = [ ("profiling", 0.01), ("quality_guardrails", 0.02), ("data_cleaning", 0.02), ("preprocessing", 0.05), ("feature_engineering", 0.03), ("data_augmentation", 0.02), ("model_selection", 0.04), ("model_training", 0.40), ("constraint_check", 0.01), ("hyperparameter_tuning", 0.20), ("ensembling", 0.06), ("threshold_opt", 0.02), ("calibration", 0.03), ("post_training", 0.02), ("explainability", 0.02), ("persistence", 0.01), ] def __init__( self, preset: str | PresetConfig = "medium_quality", time_limit: int | None = None, search_strategy=None, verbose: int = 1, checkpoint_callback=None, keep_training: bool = False, patience: int = 5, min_improvement: float = 1e-4, min_model_time: float = 300.0, max_model_time: float = 600.0, eval_metric: str = "auto", excluded_models: list[str] | None = None, early_stopping_rounds: int = 50, use_gpu: bool = False, ): if isinstance(preset, str): self.preset = PRESETS.get(preset, PRESETS["medium_quality"]) else: self.preset = preset self.eval_metric = eval_metric self.excluded_models = set(excluded_models or []) # time_limit semantics: # positive int → hard budget in seconds # 0 → unlimited (sentinel large value) # None → fall back to preset default, then 900s _UNLIMITED = 10 ** 9 # ~31 years if time_limit is not None and time_limit > 0: self.time_limit = time_limit elif time_limit == 0: self.time_limit = _UNLIMITED else: self.time_limit = self.preset.default_time_limit or 900 self.search_strategy = search_strategy self.verbose = verbose self.checkpoint_callback = checkpoint_callback self.keep_training = keep_training self.patience = patience self.min_improvement = min_improvement self.min_model_time = min_model_time self.max_model_time = max_model_time self.early_stopping_rounds = early_stopping_rounds self.use_gpu = use_gpu # Configure logging based on verbosity if verbose >= 3: logger.setLevel(logging.DEBUG) elif verbose >= 2: logger.setLevel(logging.INFO) elif verbose >= 1: logger.setLevel(logging.WARNING) else: logger.setLevel(logging.ERROR) # Ensure at least one handler exists so messages are visible if not logger.handlers and not logging.getLogger().handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S" )) logger.addHandler(handler) # Initialize executors feature_eng = getattr(self.preset, "feature_engineering", "none") self._executors = { "profiling": ProfilingExecutor(), "data_cleaning": DataCleaningExecutor(feature_engineering=feature_eng), "preprocessing": PreprocessingExecutor(feature_engineering=feature_eng), "feature_engineering": AdvancedFeatureEngineeringExecutor(feature_engineering=feature_eng), "data_augmentation": DataAugmentationExecutor(feature_engineering=feature_eng), "model_selection": ModelSelectionExecutor(search_strategy), "model_training": ModelTrainingExecutor( cv_folds=self.preset.cv_folds, feature_engineering=feature_eng, verbose=self.verbose, min_model_time=self.min_model_time, max_model_time=self.max_model_time, eval_metric=self.eval_metric, early_stopping_rounds=self.early_stopping_rounds, use_gpu=self.use_gpu, ), "ensembling": EnsemblingExecutor(method=self.preset.ensemble_method), "calibration": CalibrationExecutor(), "post_training": PostTrainingExecutor(feature_engineering=feature_eng), } # Register new stage executors from endgame.automl.executors import ( ConstraintCheckExecutor, ExplainabilityExecutor, HyperparameterTuningExecutor, PersistenceExecutor, ThresholdOptimizationExecutor, ) from endgame.automl.guardrails import QualityGuardrailsExecutor guardrails_strict = getattr(self.preset, "guardrails_strict", False) self._executors["quality_guardrails"] = QualityGuardrailsExecutor( strict=guardrails_strict, ) self._executors["hyperparameter_tuning"] = HyperparameterTuningExecutor( top_n=3, cv_folds=self.preset.cv_folds, ) self._executors["threshold_opt"] = ThresholdOptimizationExecutor() self._executors["explainability"] = ExplainabilityExecutor() # Persistence: auto-save when output_path is configured # The output_path is passed via preset or constructor kwargs. output_path = getattr(self.preset, "output_path", None) self._executors["persistence"] = PersistenceExecutor( output_dir=output_path, ) constraints = getattr(self.preset, "constraints", None) self._executors["constraint_check"] = ConstraintCheckExecutor( constraints=constraints, ) # State self.stage_results_: dict[str, StageResult] = {} self.time_manager_: TimeBudgetManager | None = None
[docs] def run( self, X: pd.DataFrame | np.ndarray, y: np.ndarray, X_val: np.ndarray | None = None, y_val: np.ndarray | None = None, task_type: str = "classification", ) -> PipelineResult: """Execute the full AutoML pipeline. Parameters ---------- X : array-like Training feature matrix. y : array-like Training target vector. X_val : array-like, optional Validation feature matrix. y_val : array-like, optional Validation target vector. task_type : str, default="classification" Task type. Returns ------- PipelineResult Complete pipeline results. """ start_time = time.time() # Initialize time manager allocations = dict(self.preset.time_allocations or self.DEFAULT_STAGES) self.time_manager_ = TimeBudgetManager( total_budget=self.time_limit, allocations=allocations, ) self.time_manager_.start() # Initialize context context = { "X": X, "y": y, "X_val": X_val, "y_val": y_val, "task_type": task_type, "preset": self.preset.name, "max_models": len(self.preset.model_pool), } # Execute stages fail_fast = False for stage_name, _ in self.DEFAULT_STAGES: if stage_name not in self._executors: continue # Abort early on critical guardrail failures if fail_fast: logger.warning( f"Skipping {stage_name}: pipeline aborted due to critical " "quality guardrail failure (guardrails_strict=True)" ) break # Skip calibration if not enabled if stage_name == "calibration" and not self.preset.calibrate: continue # Skip HPO if not enabled in preset if stage_name == "hyperparameter_tuning" and not self.preset.hyperparameter_tune: continue # Skip stages with 0.0 time allocation stage_alloc = allocations.get(stage_name, 0.0) if stage_alloc <= 0.0 and stage_name not in ("profiling",): continue # Begin stage stage_budget = self.time_manager_.begin_stage(stage_name) budget_str = "unlimited" if stage_budget >= 10**7 else f"{stage_budget:.1f}s" if self.verbose > 0: print(f" [AutoML] Starting stage: {stage_name} (budget: {budget_str})") logger.debug(f"Starting stage: {stage_name} (budget: {budget_str})") try: # Execute stage result = self._executors[stage_name].execute(context, stage_budget) self.stage_results_[stage_name] = result # Update context with stage output if result.output: context.update(result.output) # Propagate preprocessing / feature-engineering to X_val # so that downstream stages (calibration, etc.) get # correctly transformed validation data. if stage_name == "preprocessing" and result.success: preprocessor = result.output.get("preprocessor") if result.output else None if preprocessor is not None and context.get("X_val") is not None: try: context["X_val"] = preprocessor.transform(context["X_val"]) except Exception as e: logger.debug(f"Could not preprocess X_val: {e}") if stage_name == "feature_engineering" and result.success: fe_transforms = result.output.get("feature_transformers", []) if result.output else [] if fe_transforms and context.get("X_val") is not None: try: X_val_arr = ( context["X_val"].values if isinstance(context["X_val"], pd.DataFrame) else np.asarray(context["X_val"]) ) for _name, transformer in fe_transforms: X_val_arr = transformer.transform(X_val_arr) context["X_val"] = X_val_arr except Exception as e: logger.debug(f"Could not feature-engineer X_val: {e}") # Check for fail_fast signal from guardrails if result.output and result.output.get("fail_fast"): fail_fast = True status = "OK" if result.success else "FAILED" if self.verbose > 0: print(f" [AutoML] Completed stage: {stage_name} [{status}] ({result.duration:.1f}s)") logger.debug(f"Completed stage: {stage_name} [{status}] ({result.duration:.1f}s)") # Checkpoint after heavyweight stages if stage_name in ( "model_training", "hyperparameter_tuning", "ensembling", "calibration", ): self._checkpoint(context, f"post_{stage_name}") except Exception as e: if self.verbose > 0: print(f" [AutoML] Stage {stage_name} FAILED: {e}") logger.error(f"Stage {stage_name} failed with exception: {e}") self.stage_results_[stage_name] = StageResult( stage_name=stage_name, success=False, duration=0, error=str(e), ) finally: # End stage and redistribute unused time self.time_manager_.end_stage() # Checkpoint after initial pipeline self._checkpoint(context, "post_pipeline") # --- Feedback loop: iterative search if time permits --- if ( not fail_fast and self.search_strategy is not None and self.time_manager_ is not None and self.time_manager_.remaining_budget() > 60 ): self._run_feedback_loop(context, allocations) self._checkpoint(context, "post_feedback") # --- HPO stage on top models (if enabled and time permits) --- if ( not fail_fast and self.preset.hyperparameter_tune and "hyperparameter_tuning" in self._executors and self.time_manager_ is not None and self.time_manager_.remaining_budget() > 30 ): stage_budget = self.time_manager_.begin_stage("hyperparameter_tuning") if self.verbose > 0: print(f" [AutoML] Starting stage: hyperparameter_tuning (budget: {stage_budget:.1f}s)") try: result = self._executors["hyperparameter_tuning"].execute(context, stage_budget) self.stage_results_["hyperparameter_tuning"] = result if result.output: context.update(result.output) if self.verbose > 0: status = "OK" if result.success else "FAILED" print(f" [AutoML] Completed stage: hyperparameter_tuning [{status}] ({result.duration:.1f}s)") except Exception as e: logger.error(f"HPO stage failed: {e}") finally: self.time_manager_.end_stage() self._checkpoint(context, "post_hpo") # --- Continuous optimization loop (keep_training mode) --- if self.keep_training and not fail_fast: self._run_continuous_loop(context) # Store context so the result builder can access continuous-loop # models/results that were not in the initial training stage. self._final_context = context # Build final result total_time = time.time() - start_time # Get best score training_result = self.stage_results_.get("model_training") # Collect ALL results including from the continuous loop all_results: list = [] if training_result and training_result.output: all_results = training_result.output.get("results", []) # Also include results stored in context by the continuous loop ctx = getattr(self, "_final_context", {}) ctx_results = ctx.get("results", []) seen_ids = {id(r) for r in all_results} for r in ctx_results: if id(r) not in seen_ids: all_results.append(r) successful_results = [r for r in all_results if r.success] best_score = max((r.score for r in successful_results), default=0.0) # Get ensemble ensemble_result = self.stage_results_.get("ensembling") ensemble = ensemble_result.output.get("ensemble") if ensemble_result and ensemble_result.output else None # Get best model from ALL trained models (initial + continuous) trained_models: dict = {} if training_result and training_result.output: trained_models.update(training_result.output.get("trained_models", {})) ctx_models = ctx.get("trained_models", {}) trained_models.update(ctx_models) best_model = None if trained_models and successful_results: score_map: dict[str, float] = {} for r in successful_results: name = r.config.model_name if name in trained_models: score_map[name] = max(score_map.get(name, -float("inf")), r.score) if score_map: best_name = max(score_map, key=score_map.get) best_model = trained_models[best_name] elif trained_models: best_model = next(iter(trained_models.values())) return PipelineResult( best_model=best_model, ensemble=ensemble, score=best_score, scores={"primary": best_score}, stage_results=self.stage_results_, total_time=total_time, metadata={ "preset": self.preset.name, "time_limit": self.time_limit, "task_type": task_type, }, )
def _run_feedback_loop( self, context: dict[str, Any], allocations: dict[str, float], ) -> None: """Run iterative feedback loop with remaining time budget. Suggests and trains new model configurations (including HPO variants once the initial sweep is complete). Re-runs ensembling if new models are added. This loop is no longer gated on the HPO preset flag — it runs whenever time permits. """ max_iterations = 5 strategy = self.search_strategy results = context.get("results", []) trained_models = context.get("trained_models", {}) meta_features = context.get("meta_features", {}) if self.verbose > 0: remaining = self.time_manager_.remaining_budget() print(f" [AutoML] Starting feedback loop ({remaining:.0f}s remaining)") # Sync strategy with all results so far already_synced = {r.config.config_id for r in strategy.results_} if strategy.results_ else set() for r in results: if r.config.config_id not in already_synced: try: strategy.update(r) except Exception: pass new_models_added = False for iteration in range(max_iterations): remaining = self.time_manager_.remaining_budget() if remaining < 30: break try: new_configs = strategy.suggest(meta_features, n_suggestions=2) except Exception as e: logger.debug(f"Feedback loop suggest failed: {e}") break if not new_configs: break # For non-variant configs, skip models already trained. # For HPO variants, always allow (they have unique config_ids). new_configs = [ c for c in new_configs if c.metadata.get("source") == "portfolio_hpo_variant" or c.model_name not in trained_models ] if not new_configs: break train_budget = min( remaining * 0.5, self.min_model_time * max(len(new_configs), 1), ) if self.verbose > 0: names = [c.model_name for c in new_configs] print( f" [AutoML] Feedback iteration {iteration + 1}: " f"training {names} ({train_budget:.0f}s)" ) trainer = self._executors.get("model_training") if trainer is None: break context["model_configs"] = new_configs try: train_result = trainer.execute(context, train_budget) if train_result.success and train_result.output: new_trained = train_result.output.get("trained_models", {}) new_results = train_result.output.get("results", []) for name, model in new_trained.items(): trained_models[name] = model new_models_added = True results.extend(new_results) for r in new_results: try: strategy.update(r) except Exception: pass except Exception as e: logger.warning(f"Feedback loop training failed: {e}") break if new_models_added: context["trained_models"] = trained_models context["results"] = results ensembler = self._executors.get("ensembling") if ensembler is not None: remaining = self.time_manager_.remaining_budget() if remaining > 10: try: ensemble_result = ensembler.execute(context, remaining * 0.5) if ensemble_result.success: self.stage_results_["ensembling"] = ensemble_result if ensemble_result.output: context.update(ensemble_result.output) except Exception as e: logger.warning(f"Feedback loop ensembling failed: {e}") def _update_feature_selection_feedback( self, context: dict[str, Any], trained_models: dict[str, Any], results: list, ) -> None: """Compute aggregate feature importances from trained models and store an informed feature mask in context. This enables iterative feature selection: future pipeline configs can reference ``context["important_feature_mask"]`` to focus on features that matter, dropping noise columns. """ try: importances: list[np.ndarray] = [] for r in results: if not r.success: continue model = trained_models.get(r.config.model_name) if model is None: continue # Try to get feature importances from model fi = getattr(model, "feature_importances_", None) if fi is None and hasattr(model, "named_steps"): # Pipeline wrapper — get from final estimator final = model.named_steps.get("model", model) fi = getattr(final, "feature_importances_", None) if fi is not None and len(fi) > 0: # Normalise to sum=1 so different models are comparable total = np.sum(np.abs(fi)) if total > 0: importances.append(np.abs(fi) / total) if len(importances) < 2: return # Stack and average across models min_len = min(len(fi) for fi in importances) stacked = np.stack([fi[:min_len] for fi in importances]) avg_importance = np.mean(stacked, axis=0) # Mark features with >1% average importance as "important" threshold = 0.01 / min_len if min_len > 0 else 0.01 mask = avg_importance > threshold n_kept = int(np.sum(mask)) if 0 < n_kept < min_len: context["important_feature_mask"] = mask context["feature_importances_aggregated"] = avg_importance if self.verbose > 1: print( f" [AutoML] Feature selection feedback: " f"keeping {n_kept}/{min_len} features " f"(threshold={threshold:.6f})" ) except Exception as e: logger.debug(f"Feature selection feedback failed: {e}") def _checkpoint(self, context: dict[str, Any], label: str = "") -> None: """Invoke the checkpoint callback if one is registered.""" if self.checkpoint_callback is not None: try: self.checkpoint_callback( stage_results=self.stage_results_, context=context, label=label, ) except Exception as e: logger.warning(f"Checkpoint callback failed ({label}): {e}") def _run_continuous_loop(self, context: dict[str, Any]) -> None: """Run continuous optimization until convergence or interruption. This is the core AutoML loop. It alternates between: 1. **Model search** — ask the search strategy for new configs (new model types during the initial sweep, then HPO variants of the top performers). 2. **Training** — fit the suggested configurations. 3. **Optional HPO** — run Optuna on the best models if the HPO executor is available and time permits. 4. **Re-ensembling** — rebuild the ensemble with the expanded model pool. Stopping criteria: - ``patience`` consecutive rounds without improvement exceeding ``min_improvement``. - ``time_limit`` reached (if set). - ``KeyboardInterrupt`` (saves checkpoint and exits). """ strategy = self.search_strategy if strategy is None: return trained_models = context.get("trained_models", {}) oof_predictions = context.get("oof_predictions", {}) results: list = context.get("results", []) meta_features = context.get("meta_features", {}) # Sync strategy with all results collected so far already_synced = {r.config.config_id for r in strategy.results_} if strategy.results_ else set() for r in results: if r.config.config_id not in already_synced: try: strategy.update(r) except Exception: pass best_score = max((r.score for r in results if r.success), default=0.0) rounds_without_improvement = 0 iteration = 0 total_new_models = 0 is_genetic = hasattr(strategy, "_evolve") # duck-type GeneticSearch is_bandit = hasattr(strategy, "current_rung") # duck-type BanditSearch is_adaptive = hasattr(strategy, "current_phase") # duck-type AdaptiveSearch if self.verbose > 0: if is_adaptive: strategy_label = f"adaptive ({getattr(strategy, 'phase_name', '?')})" elif is_bandit: strategy_label = "bandit (successive halving)" elif is_genetic: strategy_label = "evolutionary" else: phase = "model sweep" if not getattr(strategy, "initial_sweep_done", True) else "HPO variants" strategy_label = f"portfolio ({phase})" print( f"\n [AutoML] Entering continuous optimization " f"(patience={self.patience}, strategy={strategy_label})" ) trainer = self._executors.get("model_training") ensembler = self._executors.get("ensembling") hpo_executor = self._executors.get("hyperparameter_tuning") try: while self.patience == 0 or rounds_without_improvement < self.patience: iteration += 1 # ── Time budget check ──────────────────────────────── remaining = ( self.time_manager_.remaining_budget() if self.time_manager_ is not None else float("inf") ) if remaining < 30: if self.verbose > 0: print(" [AutoML] Time budget exhausted, stopping") break # ── Step 1: Get new configs from strategy ──────────── n_suggest = 5 if (is_genetic or is_bandit) else 3 new_configs = None for _suggest_attempt in range(3): try: new_configs = strategy.suggest( meta_features, n_suggestions=n_suggest, ) break except Exception as e: logger.warning( f"Strategy suggest failed (attempt " f"{_suggest_attempt + 1}/3): {e}" ) if self.verbose > 0: print( f" [AutoML] ⚠ suggest() error: {e} " f"(retry {_suggest_attempt + 1}/3)" ) import traceback traceback.print_exc() if not new_configs: # Bandit search returns empty when all rungs are complete if is_bandit and hasattr(strategy, "should_stop") and strategy.should_stop(): if self.verbose > 0: print(" [AutoML] Bandit search completed all rungs, stopping") break # Genetic search returns empty when should_stop is True if is_genetic and hasattr(strategy, "should_stop") and strategy.should_stop(): if self.verbose > 0: print(" [AutoML] Evolutionary search converged, stopping") break if self.verbose > 0: print(" [AutoML] No new candidates from strategy, stopping") break source = new_configs[0].metadata.get("source", "") if is_bandit: rung = new_configs[0].metadata.get("rung", "?") frac = new_configs[0].metadata.get("data_fraction", 1.0) phase_label = f"rung {rung} ({frac:.0%} data)" elif is_genetic: gen = new_configs[0].metadata.get("generation", "?") phase_label = f"gen {gen}" elif source == "portfolio_hpo_variant": phase_label = "HPO variant" else: phase_label = "new model" if self.verbose > 0: names = [] for c in new_configs: v = c.metadata.get("variant_num") names.append(f"{c.model_name}(v{v})" if v else c.model_name) rem = ( self.time_manager_.remaining_budget() if self.time_manager_ else float("inf") ) remaining = "unlimited" if rem == float("inf") else f"{rem:.0f}s left" print( f" [AutoML] Iter {iteration} [{phase_label}]: " f"training {names} ({remaining})" ) # ── Step 2: Train ──────────────────────────────────── if trainer is None: break n_new = max(len(new_configs), 1) remaining_budget = ( self.time_manager_.remaining_budget() if self.time_manager_ is not None else float("inf") ) if remaining_budget == float("inf"): iter_budget = self.max_model_time * n_new else: iter_budget = max( self.min_model_time * n_new, min(remaining_budget * 0.4, self.max_model_time * n_new), ) context["model_configs"] = new_configs try: train_result = trainer.execute(context, iter_budget) except Exception as e: logger.warning(f"Continuous loop training failed: {e}") rounds_without_improvement += 1 continue if not train_result.success or not train_result.output: rounds_without_improvement += 1 continue new_trained = train_result.output.get("trained_models", {}) new_results = train_result.output.get("results", []) new_oof = train_result.output.get("oof_predictions", {}) # For HPO variants, use a unique key so they don't overwrite # the original model. for name, model in new_trained.items(): if source == "portfolio_hpo_variant" and name in trained_models: # Only replace if the variant is better old_score = next( (r.score for r in results if r.success and r.config.model_name == name), -float("inf"), ) new_score = next( (r.score for r in new_results if r.success and r.config.model_name == name), -float("inf"), ) if new_score > old_score: trained_models[name] = model if self.verbose > 0: print( f" ↑ {name} improved: " f"{old_score:.4f}{new_score:.4f}" ) else: trained_models[name] = model oof_predictions.update(new_oof) results.extend(new_results) total_new_models += len(new_trained) for r in new_results: try: strategy.update(r) except Exception: pass # ── Step 3: Check improvement ──────────────────────── round_best = max( (r.score for r in new_results if r.success), default=0.0, ) if round_best > best_score + self.min_improvement: improvement = round_best - best_score best_score = round_best rounds_without_improvement = 0 if self.verbose > 0: print( f" [AutoML] ★ New best score: {best_score:.4f} " f"(+{improvement:.4f})" ) else: rounds_without_improvement += 1 if self.verbose > 0: print( f" [AutoML] No improvement " f"({rounds_without_improvement}/{self.patience})" ) # ── Step 3b: Iterative feature selection feedback ──── # Every 3 iterations, compute aggregate feature importances # from successful models and inject an informed feature # mask into context so the next search iteration can use it. if iteration % 3 == 0 and any(r.success for r in results): self._update_feature_selection_feedback( context, trained_models, results, ) # Pass feedback to the search strategy if it supports it mask = context.get("important_feature_mask") scores = context.get("feature_importances_aggregated") if mask is not None and hasattr(strategy, "set_feature_importance_feedback"): strategy.set_feature_importance_feedback(mask, scores) # ── Step 4: Re-ensemble ────────────────────────────── context["trained_models"] = trained_models context["oof_predictions"] = oof_predictions context["results"] = results if ensembler is not None and len(trained_models) >= 2: try: ens_result = ensembler.execute(context, 60) if ens_result.success: self.stage_results_["ensembling"] = ens_result if ens_result.output: context.update(ens_result.output) except Exception: pass # ── Step 5: Periodic HPO on top models ─────────────── # Skip for genetic search — it evolves HPs through mutation if ( not is_genetic and hpo_executor is not None and self.preset.hyperparameter_tune and iteration % 3 == 0 # every 3rd iteration and (self.time_manager_ is None or self.time_manager_.remaining_budget() > 120) ): hpo_budget = min( 120.0, self.time_manager_.remaining_budget() * 0.2 if self.time_manager_ else 120.0, ) if self.verbose > 0: print(f" [AutoML] Running HPO stage ({hpo_budget:.0f}s budget)") try: hpo_result = hpo_executor.execute(context, hpo_budget) if hpo_result.success and hpo_result.output: context.update(hpo_result.output) tuning_results = hpo_result.output.get("tuning_results", []) improved = [t for t in tuning_results if t.get("improved")] if improved and self.verbose > 0: for t in improved: print( f" ↑ HPO improved {t['model']}: " f"{t['original_score']:.4f}{t['tuned_score']:.4f}" ) except Exception as e: logger.debug(f"HPO in continuous loop failed: {e}") self._checkpoint(context, f"continuous_iter_{iteration}") except KeyboardInterrupt: if self.verbose > 0: print( f"\n [AutoML] Interrupted by user after {iteration} iterations" ) print( f" [AutoML] Best score so far: {best_score:.4f} " f"({total_new_models} models trained)" ) print(" [AutoML] Saving best pipelines to checkpoint…") self._checkpoint(context, "interrupted") if self.verbose > 0: print(" [AutoML] Done — checkpoint saved.") else: if self.verbose > 0: print( f" [AutoML] Continuous optimization complete: " f"{iteration} iters, {total_new_models} models trained, " f"best={best_score:.4f}" )
[docs] def get_stage_summary(self) -> pd.DataFrame: """Get summary of stage execution. Returns ------- pd.DataFrame Summary table with stage statistics. """ rows = [] for stage_name, result in self.stage_results_.items(): rows.append({ "stage": stage_name, "success": result.success, "duration": result.duration, "error": result.error, }) return pd.DataFrame(rows)