Source code for endgame.automl.search.portfolio

from __future__ import annotations

"""Portfolio-based search strategy.

The default search strategy that trains a diverse portfolio of models
in parallel, ensuring coverage across different model families.

After the initial model sweep exhausts all model types, the strategy
switches to **hyperparameter variant generation**: it perturbs the
hyperparameters of the top-performing models so that the continuous
optimization loop always has new configurations to try.
"""

import importlib.util
import logging
import math
from functools import lru_cache
from typing import Any

import numpy as np


@lru_cache(maxsize=64)
def _is_package_available(name: str) -> bool:
    """Check if a Python package is installed (lightweight, no import)."""
    return importlib.util.find_spec(name) is not None

from endgame.automl.model_registry import (
    MODEL_REGISTRY,
    get_interpretable_portfolio,
)
from endgame.automl.presets import MODEL_POOLS
from endgame.automl.search.base import (
    BaseSearchStrategy,
    PipelineConfig,
    SearchResult,
)

logger = logging.getLogger(__name__)

# ── Hyperparameter perturbation tables ──────────────────────────────────────
# For each model family, define (param_name, type, range) triples.
# "log" means sample log-uniformly; "int" means integer uniform.
_GBDT_SPACE: list[tuple[str, str, float, float]] = [
    ("learning_rate", "log", 0.005, 0.3),
    ("n_estimators", "int", 200, 4000),
    ("max_depth", "int", 3, 12),
    ("subsample", "uniform", 0.5, 1.0),
    ("colsample_bytree", "uniform", 0.3, 1.0),
    ("reg_alpha", "log", 1e-8, 10.0),
    ("reg_lambda", "log", 1e-8, 10.0),
    ("min_child_weight", "log", 1e-3, 100.0),
]

_NEURAL_SPACE: list[tuple[str, str, float, float]] = [
    ("learning_rate", "log", 1e-5, 1e-2),
    ("n_epochs", "int", 20, 300),
    ("weight_decay", "log", 1e-7, 1e-2),
    ("dropout", "uniform", 0.0, 0.5),
    ("batch_size", "choice", 64, 512),
]

_TREE_SPACE: list[tuple[str, str, float, float]] = [
    ("n_estimators", "int", 50, 2000),
    ("max_depth", "int", 3, 30),
    ("min_samples_leaf", "int", 1, 50),
    ("max_features", "uniform", 0.3, 1.0),
]

_LINEAR_SPACE: list[tuple[str, str, float, float]] = [
    ("C", "log", 1e-4, 100.0),
]

_FAMILY_SPACES: dict[str, list] = {
    "gbdt": _GBDT_SPACE,
    "neural": _NEURAL_SPACE,
    "tree": _TREE_SPACE,
    "linear": _LINEAR_SPACE,
    "foundation": _NEURAL_SPACE,
}


def _perturb_params(
    base_params: dict[str, Any],
    family: str,
    rng: np.random.RandomState,
    magnitude: float = 1.0,
) -> dict[str, Any]:
    """Create a random perturbation of *base_params* using family-aware ranges.

    Only modifies parameters that already exist in *base_params* or that are
    in the family space.  Parameters not in the space are kept as-is.
    """
    space = _FAMILY_SPACES.get(family, [])
    if not space:
        return base_params

    params = base_params.copy()
    n_to_change = max(1, int(len(space) * 0.4 * magnitude))
    indices = rng.choice(len(space), size=min(n_to_change, len(space)), replace=False)

    for idx in indices:
        name, kind, lo, hi = space[idx]
        try:
            if kind == "log":
                params[name] = float(np.exp(rng.uniform(math.log(lo), math.log(hi))))
            elif kind == "int":
                params[name] = int(rng.randint(int(lo), int(hi) + 1))
            elif kind == "uniform":
                params[name] = float(rng.uniform(lo, hi))
            elif kind == "choice":
                params[name] = int(2 ** rng.randint(int(math.log2(lo)), int(math.log2(hi)) + 1))
        except Exception:
            pass

    return params


# ── Preprocessing option sampling ───────────────────────────────────────────
_IMPUTER_OPTIONS = [
    {"strategy": "median"},
    {"strategy": "mean"},
    {"strategy": "most_frequent"},
    {"strategy": "knn", "n_neighbors": 5},
]

_ENCODER_OPTIONS = [
    {"method": "target"},
    {"method": "onehot"},
    {"method": "ordinal"},
]

_SCALER_OPTIONS = [
    {"method": "standard"},
    {"method": "robust"},
    {"method": "quantile"},
    None,  # no scaling
]

_FEATURE_SELECTION_OPTIONS = [
    None,  # no selection
    {"method": "variance_threshold", "threshold": 0.01},
    {"method": "mutual_info", "k": 20},
    {"method": "boruta"},
]


def _sample_preprocessing(
    model_name: str,
    meta_features: dict[str, float] | None,
    rng: np.random.RandomState,
) -> list[tuple[str, dict]]:
    """Sample a random preprocessing pipeline for a model variant."""
    info = MODEL_REGISTRY.get(model_name)
    steps: list[tuple[str, dict]] = []

    mf = meta_features or {}

    # Imputer — only if dataset has missing values
    if mf.get("pct_missing", 0) > 0:
        if not (info and info.handles_missing):
            imp = _IMPUTER_OPTIONS[rng.randint(len(_IMPUTER_OPTIONS))]
            steps.append(("imputer", imp))

    # Encoder — only if dataset has categorical features
    if mf.get("nr_cat", 0) > 0:
        if not (info and info.handles_categorical):
            enc = _ENCODER_OPTIONS[rng.randint(len(_ENCODER_OPTIONS))]
            steps.append(("encoder", enc))

    # Scaler — randomly include or skip
    scaler = _SCALER_OPTIONS[rng.randint(len(_SCALER_OPTIONS))]
    if scaler is not None:
        steps.append(("scaler", scaler))

    # Feature selection — randomly include or skip
    fs = _FEATURE_SELECTION_OPTIONS[rng.randint(len(_FEATURE_SELECTION_OPTIONS))]
    if fs is not None:
        steps.append(("feature_selection", fs))

    return steps


[docs] class PortfolioSearch(BaseSearchStrategy): """Portfolio-based search strategy with iterative HPO. Phase 1 (initial sweep): suggests a diverse set of model types. Phase 2 (HPO variants): once all model types are trained, generates hyperparameter variants of the top performers so the continuous optimization loop never runs out of candidates. Parameters ---------- task_type : str Task type ("classification" or "regression"). eval_metric : str Evaluation metric to optimize. model_pool : list of str, optional Explicit list of models to consider. If None, uses preset. preset : str, default="medium_quality" Preset to use for model pool if model_pool not specified. ensure_diversity : bool, default=True Whether to ensure at least one model from each family. max_models : int, optional Maximum number of models to suggest. min_models : int, default=1 Minimum number of models to suggest. meta_learner : MetaLearner, optional Pre-trained meta-learner for model ranking. random_state : int, optional Random seed. verbose : int, default=0 Verbosity level. """ def __init__( self, task_type: str = "classification", eval_metric: str = "auto", model_pool: list[str] | None = None, preset: str = "medium_quality", ensure_diversity: bool = True, max_models: int | None = None, min_models: int = 1, meta_learner: Any | None = None, random_state: int | None = None, verbose: int = 0, interpretable_only: bool = False, ): super().__init__( task_type=task_type, eval_metric=eval_metric, random_state=random_state, verbose=verbose, ) self.preset = preset self.ensure_diversity = ensure_diversity self.max_models = max_models self.min_models = min_models self.meta_learner = meta_learner self.interpretable_only = interpretable_only self._rng = np.random.RandomState(random_state or 42) if interpretable_only: self.model_pool = get_interpretable_portfolio( task_type=task_type, time_budget="high", ) if verbose > 0: logger.info(f"Interpretable-only mode: {len(self.model_pool)} models available") elif model_pool is not None: self.model_pool = model_pool elif preset in MODEL_POOLS: self.model_pool = MODEL_POOLS[preset].copy() else: self.model_pool = MODEL_POOLS["medium_quality"].copy() self._trained: set[str] = set() self._family_scores: dict[str, list[float]] = {} self._variant_counter: int = 0 self._config_hashes: set[str] = set() # ── Phase detection ────────────────────────────────────────────────── @property def initial_sweep_done(self) -> bool: """True when all candidate model types have been trained once.""" return len(self._filter_available_models(None)) == 0 # ── Main suggest() ───────────────────────────────────────────────────
[docs] def suggest( self, meta_features: dict[str, float] | None = None, n_suggestions: int = 1, ) -> list[PipelineConfig]: """Suggest pipeline configurations to try. During the initial sweep, suggests new model types. After all model types have been tried, switches to HPO variant generation (perturbing hyperparameters of the best-performing models). """ # Phase 1: new model types available = self._filter_available_models(meta_features) if available: ranked = self._rank_models(available, meta_features) if self.ensure_diversity: selected = self._select_diverse(ranked, n_suggestions) else: selected = ranked[:n_suggestions] if self.max_models: selected = selected[: self.max_models] if len(selected) < self.min_models: remaining = [m for m in ranked if m not in selected] selected.extend(remaining[: self.min_models - len(selected)]) configs = [self._create_config(m, meta_features) for m in selected] if self.verbose > 0: print(f"Portfolio: {[c.model_name for c in configs]}") return configs # Phase 2: HPO variants of top models return self._suggest_variants(meta_features, n_suggestions)
def _filter_available_models( self, meta_features: dict[str, float] | None, ) -> list[str]: """Filter models based on constraints. Parameters ---------- meta_features : dict, optional Dataset meta-features. Returns ------- list of str Available model names. """ available = [] n_samples = meta_features.get("nr_inst", 10000) if meta_features else 10000 for model_name in self.model_pool: if model_name not in MODEL_REGISTRY: logger.warning(f"Model '{model_name}' not in registry, skipping") continue info = MODEL_REGISTRY[model_name] # Enforce interpretable_only constraint if self.interpretable_only and not info.interpretable: if self.verbose > 1: logger.debug(f"Skipping {model_name}: not interpretable") continue # Check task type compatibility if self.task_type == "classification": if "classification" not in info.task_types and "both" not in info.task_types: continue elif self.task_type == "regression": if "regression" not in info.task_types and "both" not in info.task_types: continue # Check sample size limits if info.max_samples and n_samples > info.max_samples: if self.verbose > 1: logger.debug( f"Skipping {model_name}: n_samples={n_samples} > max={info.max_samples}" ) continue if info.min_samples and n_samples < info.min_samples: if self.verbose > 1: logger.debug( f"Skipping {model_name}: n_samples={n_samples} < min={info.min_samples}" ) continue # Skip already trained models if model_name in self._trained: continue # Check that required external packages are installed if info.required_packages: missing = [p for p in info.required_packages if not _is_package_available(p)] if missing: if self.verbose > 1: logger.debug( f"Skipping {model_name}: missing packages {missing}" ) continue available.append(model_name) return available def _rank_models( self, models: list[str], meta_features: dict[str, float] | None, ) -> list[str]: """Rank models by expected performance. Parameters ---------- models : list of str Models to rank. meta_features : dict, optional Dataset meta-features. Returns ------- list of str Models ranked by expected performance (best first). """ # Use meta-learner if available if self.meta_learner is not None: try: from endgame.benchmark.profiler import MetaFeatureSet if isinstance(meta_features, dict): mf = MetaFeatureSet(features=meta_features) else: mf = meta_features recommendation = self.meta_learner.recommend_from_features(mf) # Build ranking from recommendation ranked = [] if recommendation.model_name in models: ranked.append(recommendation.model_name) for alt_model, _ in recommendation.alternatives: if alt_model in models and alt_model not in ranked: ranked.append(alt_model) # Add remaining models for m in models: if m not in ranked: ranked.append(m) return ranked except Exception as e: logger.warning(f"Meta-learner ranking failed: {e}") # Fallback: heuristic ranking return self._heuristic_rank(models, meta_features) def _heuristic_rank( self, models: list[str], meta_features: dict[str, float] | None, ) -> list[str]: """Heuristic model ranking based on meta-features. Parameters ---------- models : list of str Models to rank. meta_features : dict, optional Dataset meta-features. Returns ------- list of str Models ranked by heuristic score. """ if meta_features is None: meta_features = {} n_samples = meta_features.get("nr_inst", 10000) n_features = meta_features.get("nr_attr", 10) pct_cat = meta_features.get("cat_to_num", 0) imbalance = meta_features.get("class_imbalance", 1) scores = {} for model_name in models: info = MODEL_REGISTRY.get(model_name) if info is None: scores[model_name] = 0 continue score = 50 # Base score # GBDTs are generally strong if info.family == "gbdt": score += 30 # Neural networks need enough data if info.family == "neural": if n_samples < 1000: score -= 20 elif n_samples > 10000: score += 10 # Foundation models for small data if info.family == "foundation": if n_samples < 5000: score += 25 else: score -= 10 # Handle categorical features if pct_cat > 0.5 and info.handles_categorical: score += 10 # Prefer models that handle imbalance well if imbalance > 5: if model_name in ("catboost", "lgbm", "xgb"): score += 10 # Penalize slow models for large data if info.typical_fit_time == "very_slow": if n_samples > 10000: score -= 40 elif n_samples > 5000: score -= 20 elif info.typical_fit_time == "slow": if n_samples > 50000: score -= 15 elif n_samples > 20000: score -= 10 scores[model_name] = score # Sort by score descending return sorted(models, key=lambda m: scores.get(m, 0), reverse=True) def _select_diverse( self, ranked: list[str], n_select: int, ) -> list[str]: """Select diverse models ensuring family coverage. Parameters ---------- ranked : list of str Models ranked by expected performance. n_select : int Number of models to select. Returns ------- list of str Selected models. """ selected = [] families_included: set[str] = set() # First pass: include top model from each family for model_name in ranked: if len(selected) >= n_select: break info = MODEL_REGISTRY.get(model_name) if info is None: continue family = info.family # Always include if family not yet represented if family not in families_included: selected.append(model_name) families_included.add(family) # Second pass: fill remaining slots with best models for model_name in ranked: if len(selected) >= n_select: break if model_name not in selected: selected.append(model_name) return selected def _create_config( self, model_name: str, meta_features: dict[str, float] | None, ) -> PipelineConfig: """Create a pipeline configuration for a model. Parameters ---------- model_name : str Model name. meta_features : dict, optional Dataset meta-features. Returns ------- PipelineConfig Pipeline configuration. """ info = MODEL_REGISTRY.get(model_name) # Get default params params = info.default_params.copy() if info else {} # Adjust params based on meta-features if meta_features: params = self._adjust_params(model_name, params, meta_features) # Determine preprocessing preprocessing = self._suggest_preprocessing(model_name, meta_features) return PipelineConfig( model_name=model_name, model_params=params, preprocessing=preprocessing, metadata={"source": "portfolio_search"}, ) def _adjust_params( self, model_name: str, params: dict[str, Any], meta_features: dict[str, float], ) -> dict[str, Any]: """Adjust model parameters based on meta-features. Parameters ---------- model_name : str Model name. params : dict Base parameters. meta_features : dict Dataset meta-features. Returns ------- dict Adjusted parameters. """ params = params.copy() n_samples = meta_features.get("nr_inst", 10000) n_features = meta_features.get("nr_attr", 10) # GBDT adjustments if model_name in ("lgbm", "xgb", "catboost"): # Reduce estimators for small data if n_samples < 1000: params["n_estimators"] = min(params.get("n_estimators", 1000), 500) # Increase for large data elif n_samples > 50000: params["n_estimators"] = max(params.get("n_estimators", 1000), 2000) # Adjust depth for dimensionality if n_features > 100: params["max_depth"] = params.get("max_depth", 6) # Neural network adjustments elif model_name in ("ft_transformer", "saint", "tabnet", "mlp"): # Reduce epochs for small data to prevent overfitting if n_samples < 2000: params["n_epochs"] = min(params.get("n_epochs", 100), 50) return params def _suggest_preprocessing( self, model_name: str, meta_features: dict[str, float] | None, ) -> list[tuple]: """Suggest preprocessing steps for a model. Parameters ---------- model_name : str Model name. meta_features : dict, optional Dataset meta-features. Returns ------- list of tuple Preprocessing steps as (name, params) tuples. """ steps = [] if meta_features is None: return steps info = MODEL_REGISTRY.get(model_name) # Handle missing values pct_missing = meta_features.get("pct_missing", 0) if pct_missing > 0: if info and info.handles_missing: pass # Model handles it else: steps.append(("imputer", {"strategy": "median"})) # Handle categorical encoding n_cat = meta_features.get("nr_cat", 0) if n_cat > 0: if info and info.handles_categorical: pass # Model handles it else: # Target encoding for high cardinality steps.append(("encoder", {"method": "target"})) return steps def _suggest_variants( self, meta_features: dict[str, float] | None, n_suggestions: int, ) -> list[PipelineConfig]: """Generate HPO variants of the top-performing trained models. Picks the best models, perturbs their hyperparameters **and preprocessing options**, and returns configs with unique ``config_id``s so the training loop treats them as new. """ successful = [r for r in self.results_ if r.success] if not successful: return [] successful.sort(key=lambda r: r.score, reverse=True) top_n = max(3, n_suggestions) top_results = successful[:top_n] configs: list[PipelineConfig] = [] for result in top_results: if len(configs) >= n_suggestions: break model_name = result.config.model_name info = MODEL_REGISTRY.get(model_name) family = info.family if info else "unknown" base_params = result.config.model_params.copy() self._variant_counter += 1 magnitude = min(1.0 + self._variant_counter * 0.1, 3.0) new_params = _perturb_params(base_params, family, self._rng, magnitude) # Vary preprocessing every other variant if self._variant_counter % 2 == 0: preprocessing = _sample_preprocessing( model_name, meta_features, self._rng, ) else: preprocessing = result.config.preprocessing config = PipelineConfig( model_name=model_name, model_params=new_params, preprocessing=preprocessing, metadata={ "source": "portfolio_hpo_variant", "variant_of": model_name, "variant_num": self._variant_counter, "base_score": result.score, }, ) if config.config_id in self._config_hashes: continue self._config_hashes.add(config.config_id) configs.append(config) if self.verbose > 0 and configs: names = [f"{c.model_name}(v{c.metadata.get('variant_num', '?')})" for c in configs] print(f"HPO variants: {names}") return configs
[docs] def update(self, result: SearchResult) -> None: """Update strategy with a new result.""" super().update(result) # Mark model type as trained (for Phase 1 sweep) self._trained.add(result.config.model_name) # Track the config hash to avoid duplicates in variant generation if result.config.config_id: self._config_hashes.add(result.config.config_id) # Track model performance by family if result.success: info = MODEL_REGISTRY.get(result.config.model_name) if info: family = info.family if family not in self._family_scores: self._family_scores[family] = [] self._family_scores[family].append(result.score)