Source code for endgame.models.symbolic.symbolic_regressor

"""Symbolic Regression — pure-Python GP engine with sklearn interface.

Discovers interpretable symbolic equations that best fit the data while
maintaining parsimony through multi-population evolutionary search
and Pareto-frontier tracking.

No Julia or PySR dependency — uses numpy for evaluation and optionally
PyTorch/scipy for constant optimization.
"""

from __future__ import annotations

import warnings
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    import pandas as pd

import numpy as np
from numpy.typing import NDArray
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y

from endgame.models.symbolic._constant_optimizer import optimize_constants
from endgame.models.symbolic._expression import (
    Node,
    evaluate,
    to_sympy,
)
from endgame.models.symbolic._operators import (
    OPERATOR_SETS,
    validate_operators,
)
from endgame.models.symbolic._pareto import ParetoFrontier
from endgame.models.symbolic._population import (
    compute_fitness,
    evolve_population,
    migrate,
    ramped_half_and_half,
)

# ============================================================
# Preset configurations
# ============================================================

PRESETS = {
    "fast": {
        "niterations": 20,
        "populations": 15,
        "population_size": 33,
        "maxsize": 20,
        "ncycles_per_iteration": 100,
        "parsimony": 0.0032,
    },
    "default": {
        "niterations": 40,
        "populations": 31,
        "population_size": 27,
        "maxsize": 25,
        "ncycles_per_iteration": 380,
        "parsimony": 0.0,
    },
    "competition": {
        "niterations": 100,
        "populations": 50,
        "population_size": 50,
        "maxsize": 35,
        "ncycles_per_iteration": 550,
        "parsimony": 0.0,
        "weight_optimize": 0.02,
        "should_optimize_constants": True,
        "optimizer_nrestarts": 4,
    },
    "interpretable": {
        "niterations": 60,
        "populations": 31,
        "population_size": 27,
        "maxsize": 15,
        "maxdepth": 5,
        "ncycles_per_iteration": 380,
        "parsimony": 0.01,
        "constraints": {"/": (-1, 9)},
    },
}

# Julia loss name → Python equivalent mapping
_LOSS_MAP = {
    "L2DistLoss()": "mse",
    "L1DistLoss()": "mae",
    "HuberLoss()": "huber",
    "LogitDistLoss()": "logcosh",
}


def _get_loss_fn(loss: str) -> Callable[[np.ndarray, np.ndarray], float]:
    """Return a numpy loss function from a loss name."""
    key = _LOSS_MAP.get(loss, loss)
    if key == "mse":
        return lambda y, yp: float(np.mean((y - yp) ** 2))
    elif key == "mae":
        return lambda y, yp: float(np.mean(np.abs(y - yp)))
    elif key == "huber":
        def huber(y, yp, delta=1.0):
            r = np.abs(y - yp)
            return float(np.mean(np.where(r < delta, 0.5 * r ** 2, delta * (r - 0.5 * delta))))
        return huber
    elif key == "logcosh":
        return lambda y, yp: float(np.mean(np.log(np.cosh(np.clip(y - yp, -500, 500)))))
    else:
        # Treat unknown as MSE with a deprecation warning
        warnings.warn(
            f"Unknown loss {loss!r}, falling back to MSE. "
            f"Supported: 'mse', 'mae', 'huber', 'logcosh', or Julia-style names "
            f"like 'L2DistLoss()'.",
            DeprecationWarning,
            stacklevel=3,
        )
        return lambda y, yp: float(np.mean((y - yp) ** 2))


[docs] class SymbolicRegressor(BaseEstimator, RegressorMixin): """Symbolic Regression for discovering interpretable equations. Uses multi-population genetic programming with Pareto-frontier tracking to find symbolic expressions balancing accuracy and complexity. Parameters ---------- preset : str, default="default" Preset configuration: "fast", "default", "competition", "interpretable". operators : str or dict, default="scientific" Operator set name or dict with "binary_operators"/"unary_operators". binary_operators : list of str, optional Explicit binary operators (overrides *operators*). unary_operators : list of str, optional Explicit unary operators (overrides *operators*). niterations : int, optional Number of GP iterations. maxsize : int, optional Max tree complexity (nodes). maxdepth : int, optional Max tree depth. populations : int, optional Number of sub-populations. population_size : int, optional Individuals per population. parsimony : float, optional Complexity penalty added to loss. model_selection : str, default="best" "best" (lowest loss) or "score" (loss-complexity trade-off). loss : str, default="L2DistLoss()" Loss function name. Accepts Julia-style names for backward compatibility (e.g. ``"L2DistLoss()"``) or Python names (``"mse"``, ``"mae"``, ``"huber"``). constraints : dict, optional Reserved for API compatibility (not enforced in GP engine). nested_constraints : dict, optional Reserved for API compatibility. denoise : bool, default=False Reserved for API compatibility. select_k_features : int, optional Reserved for API compatibility. turbo : bool, default=False Reserved for API compatibility. parallelism : str, default="multithreading" Reserved for API compatibility (GP runs single-threaded). procs : int, optional Reserved for API compatibility. random_state : int, optional Random seed. verbosity : int, default=0 0 = silent, 1 = progress, 2 = detailed. temp_equation_file : bool, default=True Reserved for API compatibility. output_directory : str, optional Reserved for API compatibility. Attributes ---------- equations_ : DataFrame All discovered equations with loss and complexity. best_equation_ : str String of the best equation. best_loss_ : float Loss of the best equation. best_complexity_ : int Complexity of the best equation. n_features_in_ : int Number of features seen during fit. feature_names_in_ : ndarray Feature names. """ _estimator_type = "regressor" def __init__( self, preset: str = "default", operators: str | dict[str, list[str]] = "scientific", binary_operators: list[str] | None = None, unary_operators: list[str] | None = None, niterations: int | None = None, maxsize: int | None = None, maxdepth: int | None = None, populations: int | None = None, population_size: int | None = None, parsimony: float | None = None, model_selection: str = "best", loss: str = "L2DistLoss()", constraints: dict | None = None, nested_constraints: dict | None = None, denoise: bool = False, select_k_features: int | None = None, turbo: bool = False, parallelism: str = "multithreading", procs: int | None = None, random_state: int | None = None, verbosity: int = 0, temp_equation_file: bool = True, output_directory: str | None = None, ): self.preset = preset self.operators = operators self.binary_operators = binary_operators self.unary_operators = unary_operators self.niterations = niterations self.maxsize = maxsize self.maxdepth = maxdepth self.populations = populations self.population_size = population_size self.parsimony = parsimony self.model_selection = model_selection self.loss = loss self.constraints = constraints self.nested_constraints = nested_constraints self.denoise = denoise self.select_k_features = select_k_features self.turbo = turbo self.parallelism = parallelism self.procs = procs self.random_state = random_state self.verbosity = verbosity self.temp_equation_file = temp_equation_file self.output_directory = output_directory # -------------------------------------------------------- # Parameter resolution # -------------------------------------------------------- def _get_operators(self) -> tuple[list[str], list[str]]: """Resolve binary/unary operators from settings.""" if self.binary_operators is not None or self.unary_operators is not None: return ( self.binary_operators or ["+", "-", "*", "/"], self.unary_operators or [], ) if isinstance(self.operators, str): if self.operators not in OPERATOR_SETS: raise ValueError( f"Unknown operator set: {self.operators}. " f"Choose from: {list(OPERATOR_SETS.keys())}" ) op_set = OPERATOR_SETS[self.operators] return op_set["binary_operators"], op_set["unary_operators"] if isinstance(self.operators, dict): return ( self.operators.get("binary_operators", ["+", "-", "*", "/"]), self.operators.get("unary_operators", []), ) raise ValueError(f"Invalid operators type: {type(self.operators)}") def _build_params(self) -> dict[str, Any]: """Resolve all hyperparameters (preset + overrides).""" if self.preset not in PRESETS: raise ValueError( f"Unknown preset: {self.preset}. " f"Choose from: {list(PRESETS.keys())}" ) params = PRESETS[self.preset].copy() if self.niterations is not None: params["niterations"] = self.niterations if self.maxsize is not None: params["maxsize"] = self.maxsize if self.maxdepth is not None: params["maxdepth"] = self.maxdepth if self.populations is not None: params["populations"] = self.populations if self.population_size is not None: params["population_size"] = self.population_size if self.parsimony is not None: params["parsimony"] = self.parsimony return params # -------------------------------------------------------- # Fit # --------------------------------------------------------
[docs] def fit(self, X, y, **fit_params) -> SymbolicRegressor: """Fit symbolic regression model. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. y : array-like of shape (n_samples,) Target values. Returns ------- self """ X, y = check_X_y(X, y, accept_sparse=False, dtype=np.float64) self.n_features_in_ = X.shape[1] if hasattr(X, "columns"): self.feature_names_in_ = np.array(X.columns) else: self.feature_names_in_ = np.array([f"x{i}" for i in range(X.shape[1])]) # Resolve parameters params = self._build_params() n_iter = params["niterations"] n_pops = params["populations"] pop_size = params["population_size"] maxsize = params["maxsize"] maxdepth = params.get("maxdepth", 10) parsimony_val = params.get("parsimony", 0.0) ncycles = params.get("ncycles_per_iteration", 1) should_optimize = params.get("should_optimize_constants", False) optimizer_restarts = params.get("optimizer_nrestarts", 2) binary_ops, unary_ops = self._get_operators() validate_operators(binary_ops, unary_ops) loss_fn = _get_loss_fn(self.loss) rng = np.random.default_rng(self.random_state) # Initialize populations init_depth = min(4, maxdepth) all_pops: list[list[Node]] = [] for _ in range(n_pops): pop = ramped_half_and_half(rng, pop_size, self.n_features_in_, binary_ops, unary_ops, 1, init_depth) all_pops.append(pop) frontier = ParetoFrontier() # Main loop for iteration in range(n_iter): all_fitnesses = [] for p_idx in range(n_pops): pop = all_pops[p_idx] fitnesses = np.array([ compute_fitness(t, X, y, loss_fn, parsimony_val) for t in pop ]) # Multiple cycles per iteration for _ in range(ncycles): pop = evolve_population( pop, fitnesses, rng, binary_ops, unary_ops, self.n_features_in_, tournament_size=min(5, pop_size), crossover_prob=0.5, mutation_prob=0.3, elite_frac=0.1, maxsize=maxsize, ) fitnesses = np.array([ compute_fitness(t, X, y, loss_fn, parsimony_val) for t in pop ]) all_pops[p_idx] = pop all_fitnesses.append(fitnesses) # Update frontier with best from this population for t, f in zip(pop, fitnesses): if f < 1e19: raw_loss = loss_fn(y, evaluate(t, X)) frontier.update(t, raw_loss, list(self.feature_names_in_)) # Migration every 5 iterations if iteration % 5 == 4 and n_pops > 1: migrate(all_pops, all_fitnesses, rng, n_migrants=2) # Optional constant optimization on best individual if should_optimize and iteration % 10 == 9: best_entry = frontier.get_best(self.model_selection) if best_entry is not None: optimized = optimize_constants( best_entry.tree, X, y, loss_fn, n_restarts=optimizer_restarts, ) opt_loss = loss_fn(y, evaluate(optimized, X)) frontier.update(optimized, opt_loss, list(self.feature_names_in_)) if self.verbosity >= 1 and (iteration + 1) % max(1, n_iter // 10) == 0: best = frontier.get_best(self.model_selection) loss_str = f"{best.loss:.6f}" if best else "N/A" print(f" Iteration {iteration + 1}/{n_iter} — best loss: {loss_str}") # Final constant optimization pass if should_optimize or self.preset == "competition": for entry in list(frontier._best.values()): optimized = optimize_constants( entry.tree, X, y, loss_fn, n_restarts=max(optimizer_restarts, 2), ) opt_loss = loss_fn(y, evaluate(optimized, X)) frontier.update(optimized, opt_loss, list(self.feature_names_in_)) # Store results self._frontier = frontier self.equations_ = frontier.to_dataframe() self._best_entry = frontier.get_best(self.model_selection) if self._best_entry is not None: self.best_equation_ = self._best_entry.equation self.best_loss_ = self._best_entry.loss self.best_complexity_ = self._best_entry.complexity self._best_tree = self._best_entry.tree else: self.best_equation_ = "None" self.best_loss_ = float("inf") self.best_complexity_ = 0 self._best_tree = None # Mark as fitted (for check_is_fitted compatibility with old API) self.model_ = self return self
# -------------------------------------------------------- # Predict # --------------------------------------------------------
[docs] def predict(self, X, index: int | None = None) -> NDArray: """Predict using the discovered equation. Parameters ---------- X : array-like of shape (n_samples, n_features) index : int, optional Complexity level to use. If None, uses best equation. Returns ------- y_pred : ndarray of shape (n_samples,) """ check_is_fitted(self, "model_") X = check_array(X, accept_sparse=False, dtype=np.float64) if index is not None: entry = self._frontier.get_at_complexity(index) if entry is None: raise ValueError(f"No equation found at complexity {index}") return evaluate(entry.tree, X) if self._best_tree is None: return np.zeros(X.shape[0]) return evaluate(self._best_tree, X)
# -------------------------------------------------------- # Symbolic export # --------------------------------------------------------
[docs] def sympy(self, index: int | None = None): """Return SymPy expression of the best (or indexed) equation.""" check_is_fitted(self, "model_") if index is not None: entry = self._frontier.get_at_complexity(index) if entry is None: raise ValueError(f"No equation at complexity {index}") return to_sympy(entry.tree, list(self.feature_names_in_)) if self._best_tree is None: import sympy return sympy.Integer(0) return to_sympy(self._best_tree, list(self.feature_names_in_))
[docs] def latex(self, index: int | None = None) -> str: """Return LaTeX string of the equation.""" check_is_fitted(self, "model_") try: from sympy import latex as sympy_latex return sympy_latex(self.sympy(index=index)) except ImportError: raise ImportError("SymPy is required for LaTeX export")
# -------------------------------------------------------- # Inspection # --------------------------------------------------------
[docs] def get_best_equation(self) -> str: check_is_fitted(self, "model_") return self.best_equation_
[docs] def get_pareto_frontier(self) -> pd.DataFrame: """Return Pareto-optimal equations as a DataFrame.""" check_is_fitted(self, "model_") return self._frontier.get_pareto_optimal()
[docs] def get_equation_at_complexity(self, complexity: int) -> str | None: check_is_fitted(self, "model_") entry = self._frontier.get_at_complexity(complexity) return entry.equation if entry else None
@property def feature_importances_(self) -> NDArray: """Feature importances from equation structure (occurrence count).""" check_is_fitted(self, "model_") importances = np.zeros(self.n_features_in_) if self._best_tree is None: return importances for i, name in enumerate(self.feature_names_in_): importances[i] = self.best_equation_.count(name) total = importances.sum() if total > 0: importances /= total return importances
[docs] def summary(self) -> str: check_is_fitted(self, "model_") if self.equations_ is None or len(self.equations_) == 0: return "No equations discovered." lines = [ "Symbolic Regression Results", "=" * 50, f"Best equation: {self.best_equation_}", f"Best loss: {self.best_loss_:.6f}", f"Best complexity: {self.best_complexity_}", "", "All equations (sorted by complexity):", "-" * 50, ] for _, row in self.equations_.sort_values("complexity").iterrows(): lines.append( f" [{row['complexity']:2.0f}] {row['equation']} " f"(loss: {row['loss']:.6f})" ) return "\n".join(lines)
def __repr__(self) -> str: if hasattr(self, "best_equation_"): return f"SymbolicRegressor(best={self.best_equation_}, loss={self.best_loss_:.4f})" return f"SymbolicRegressor(preset={self.preset!r}, operators={self.operators!r})"