from __future__ import annotations
"""Multi-Layer Perceptron implementations for tabular data.
This module provides PyTorch-based MLP classifiers and regressors with
modern techniques like batch normalization, dropout, and learning rate scheduling.
"""
from typing import Any
import numpy as np
from sklearn.base import ClassifierMixin, RegressorMixin
from sklearn.preprocessing import LabelEncoder, StandardScaler
from endgame.core.base import EndgameEstimator
# PyTorch imports (lazy loaded)
try:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
HAS_TORCH = True
except ImportError:
HAS_TORCH = False
def _check_torch():
"""Check if PyTorch is available."""
if not HAS_TORCH:
raise ImportError(
"PyTorch is required for neural network models. "
"Install with: pip install torch"
)
# Only define PyTorch module if torch is available
_MLPModule = None
if HAS_TORCH:
class _MLPModule(nn.Module):
"""PyTorch MLP module.
Parameters
----------
input_dim : int
Number of input features.
hidden_dims : List[int]
Hidden layer dimensions.
output_dim : int
Number of output units.
dropout : float
Dropout rate.
batch_norm : bool
Whether to use batch normalization.
activation : str
Activation function.
"""
def __init__(
self,
input_dim: int,
hidden_dims: list[int],
output_dim: int,
dropout: float = 0.3,
batch_norm: bool = True,
activation: str = "relu",
):
super().__init__()
self.activation_fn = self._get_activation(activation)
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
if batch_norm:
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(self.activation_fn)
if dropout > 0:
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim
self.hidden_layers = nn.Sequential(*layers)
self.output_layer = nn.Linear(prev_dim, output_dim)
def _get_activation(self, activation: str) -> nn.Module:
"""Get activation function by name."""
activations = {
"relu": nn.ReLU(),
"leaky_relu": nn.LeakyReLU(0.1),
"elu": nn.ELU(),
"selu": nn.SELU(),
"gelu": nn.GELU(),
"swish": nn.SiLU(),
"mish": nn.Mish(),
"tanh": nn.Tanh(),
}
if activation not in activations:
raise ValueError(
f"Unknown activation: {activation}. "
f"Choose from: {list(activations.keys())}"
)
return activations[activation]
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward pass."""
x = self.hidden_layers(x)
return self.output_layer(x)
class _BaseMLPEstimator(EndgameEstimator):
"""Base class for MLP estimators.
Parameters
----------
hidden_dims : List[int], default=[256, 128]
Hidden layer dimensions.
dropout : float, default=0.3
Dropout rate for regularization.
batch_norm : bool, default=True
Whether to use batch normalization.
activation : str, default='relu'
Activation function: 'relu', 'leaky_relu', 'elu', 'selu',
'gelu', 'swish', 'mish', 'tanh'.
learning_rate : float, default=1e-3
Initial learning rate.
weight_decay : float, default=1e-5
L2 regularization strength.
n_epochs : int, default=100
Maximum number of training epochs.
batch_size : int, default=256
Training batch size.
early_stopping : int, default=10
Number of epochs without improvement to stop training.
scheduler : str, default='cosine'
Learning rate scheduler: 'cosine', 'step', 'plateau', 'none'.
device : str, default='auto'
Device: 'cuda', 'cpu', or 'auto' (auto-detect).
random_state : int, optional
Random seed for reproducibility.
verbose : bool, default=False
Enable verbose output.
"""
def __init__(
self,
hidden_dims: list[int] = None,
dropout: float = 0.3,
batch_norm: bool = True,
activation: str = "relu",
learning_rate: float = 1e-3,
weight_decay: float = 1e-5,
n_epochs: int = 100,
batch_size: int = 256,
early_stopping: int = 10,
scheduler: str = "cosine",
device: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
_check_torch()
super().__init__(random_state=random_state, verbose=verbose)
self.hidden_dims = hidden_dims or [256, 128]
self.dropout = dropout
self.batch_norm = batch_norm
self.activation = activation
self.learning_rate = learning_rate
self.weight_decay = weight_decay
self.n_epochs = n_epochs
self.batch_size = batch_size
self.early_stopping = early_stopping
self.scheduler = scheduler
self.device = device
# Model components (set during fit)
self.model_: _MLPModule | None = None
self.scaler_: StandardScaler | None = None
self._device: torch.device | None = None
self.history_: dict[str, list[float]] = {"train_loss": [], "val_loss": []}
def _get_device(self) -> torch.device:
"""Get the computation device."""
if self.device == "auto":
return torch.device("cuda" if torch.cuda.is_available() else "cpu")
return torch.device(self.device)
def _set_seed(self):
"""Set random seeds for reproducibility."""
if self.random_state is not None:
torch.manual_seed(self.random_state)
np.random.seed(self.random_state)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(self.random_state)
def _get_scheduler(
self,
optimizer: optim.Optimizer,
n_epochs: int,
) -> Any | None:
"""Create learning rate scheduler."""
if self.scheduler == "none":
return None
elif self.scheduler == "cosine":
return optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=n_epochs, eta_min=1e-6
)
elif self.scheduler == "step":
return optim.lr_scheduler.StepLR(
optimizer, step_size=n_epochs // 3, gamma=0.1
)
elif self.scheduler == "plateau":
return optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.5, patience=5
)
else:
raise ValueError(f"Unknown scheduler: {self.scheduler}")
def _create_dataloader(
self,
X: np.ndarray,
y: np.ndarray,
shuffle: bool = True,
) -> DataLoader:
"""Create a DataLoader from numpy arrays."""
X_tensor = torch.FloatTensor(X)
y_tensor = self._prepare_target_tensor(y)
dataset = TensorDataset(X_tensor, y_tensor)
return DataLoader(
dataset,
batch_size=self.batch_size,
shuffle=shuffle,
num_workers=0,
pin_memory=self._device.type == "cuda",
)
def _prepare_target_tensor(self, y: np.ndarray) -> torch.Tensor:
"""Prepare target tensor (override in subclasses)."""
raise NotImplementedError
def _compute_loss(
self,
outputs: torch.Tensor,
targets: torch.Tensor,
criterion: nn.Module,
) -> torch.Tensor:
"""Compute loss (override in subclasses if needed)."""
return criterion(outputs, targets)
def _train_epoch(
self,
dataloader: DataLoader,
optimizer: optim.Optimizer,
criterion: nn.Module,
) -> float:
"""Train for one epoch."""
self.model_.train()
total_loss = 0.0
n_batches = 0
for X_batch, y_batch in dataloader:
X_batch = X_batch.to(self._device)
y_batch = y_batch.to(self._device)
optimizer.zero_grad()
outputs = self.model_(X_batch)
loss = self._compute_loss(outputs, y_batch, criterion)
loss.backward()
optimizer.step()
total_loss += loss.item()
n_batches += 1
return total_loss / n_batches
def _validate_epoch(
self,
dataloader: DataLoader,
criterion: nn.Module,
) -> float:
"""Validate for one epoch."""
self.model_.eval()
total_loss = 0.0
n_batches = 0
with torch.no_grad():
for X_batch, y_batch in dataloader:
X_batch = X_batch.to(self._device)
y_batch = y_batch.to(self._device)
outputs = self.model_(X_batch)
loss = self._compute_loss(outputs, y_batch, criterion)
total_loss += loss.item()
n_batches += 1
return total_loss / n_batches
def _fit_impl(
self,
X: np.ndarray,
y: np.ndarray,
output_dim: int,
criterion: nn.Module,
val_data: tuple[np.ndarray, np.ndarray] | None = None,
) -> EndgameEstimator:
"""Internal fit implementation."""
self._set_seed()
self._device = self._get_device()
# Scale features
self.scaler_ = StandardScaler()
X_scaled = self.scaler_.fit_transform(X)
# Create model
self.model_ = _MLPModule(
input_dim=X.shape[1],
hidden_dims=self.hidden_dims,
output_dim=output_dim,
dropout=self.dropout,
batch_norm=self.batch_norm,
activation=self.activation,
).to(self._device)
# Create optimizer and scheduler
optimizer = optim.AdamW(
self.model_.parameters(),
lr=self.learning_rate,
weight_decay=self.weight_decay,
)
scheduler = self._get_scheduler(optimizer, self.n_epochs)
# Create dataloaders
train_loader = self._create_dataloader(X_scaled, y, shuffle=True)
val_loader = None
if val_data is not None:
X_val, y_val = val_data
X_val_scaled = self.scaler_.transform(X_val)
val_loader = self._create_dataloader(X_val_scaled, y_val, shuffle=False)
# Training loop
best_val_loss = float("inf")
best_state = None
patience_counter = 0
self._log(f"Training MLP on {self._device} for up to {self.n_epochs} epochs...")
for epoch in range(self.n_epochs):
train_loss = self._train_epoch(train_loader, optimizer, criterion)
self.history_["train_loss"].append(train_loss)
# Validation
if val_loader is not None:
val_loss = self._validate_epoch(val_loader, criterion)
self.history_["val_loss"].append(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
best_state = {k: v.cpu().clone() for k, v in self.model_.state_dict().items()}
patience_counter = 0
else:
patience_counter += 1
if self.verbose and (epoch + 1) % 10 == 0:
self._log(
f"Epoch {epoch + 1}/{self.n_epochs}: "
f"train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"
)
if patience_counter >= self.early_stopping:
self._log(f"Early stopping at epoch {epoch + 1}")
break
else:
if self.verbose and (epoch + 1) % 10 == 0:
self._log(f"Epoch {epoch + 1}/{self.n_epochs}: train_loss={train_loss:.4f}")
# Update scheduler
if scheduler is not None:
if isinstance(scheduler, optim.lr_scheduler.ReduceLROnPlateau):
scheduler.step(train_loss if val_loader is None else val_loss)
else:
scheduler.step()
# Restore best model
if best_state is not None:
self.model_.load_state_dict(best_state)
self._is_fitted = True
return self
def _predict_impl(self, X: np.ndarray) -> np.ndarray:
"""Internal predict implementation."""
self._check_is_fitted()
X_scaled = self.scaler_.transform(X)
X_tensor = torch.FloatTensor(X_scaled).to(self._device)
self.model_.eval()
with torch.no_grad():
outputs = self.model_(X_tensor)
return outputs.cpu().numpy()
[docs]
class MLPClassifier(ClassifierMixin, _BaseMLPEstimator):
"""Multi-Layer Perceptron classifier.
PyTorch-based MLP with modern techniques for tabular classification.
Parameters
----------
hidden_dims : List[int], default=[256, 128]
Hidden layer dimensions.
dropout : float, default=0.3
Dropout rate for regularization.
batch_norm : bool, default=True
Whether to use batch normalization.
activation : str, default='relu'
Activation function.
learning_rate : float, default=1e-3
Initial learning rate.
weight_decay : float, default=1e-5
L2 regularization strength.
n_epochs : int, default=100
Maximum number of training epochs.
batch_size : int, default=256
Training batch size.
early_stopping : int, default=10
Patience for early stopping.
class_weight : str or dict, optional
Class weights: 'balanced' or dict mapping classes to weights.
scheduler : str, default='cosine'
Learning rate scheduler.
device : str, default='auto'
Device: 'cuda', 'cpu', or 'auto'.
random_state : int, optional
Random seed.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
classes_ : ndarray
Unique class labels.
n_classes_ : int
Number of classes.
model_ : _MLPModule
Fitted PyTorch model.
history_ : dict
Training history with 'train_loss' and 'val_loss'.
Examples
--------
>>> from endgame.models.neural import MLPClassifier
>>> clf = MLPClassifier(hidden_dims=[128, 64], n_epochs=50)
>>> clf.fit(X_train, y_train, val_data=(X_val, y_val))
>>> predictions = clf.predict(X_test)
>>> probabilities = clf.predict_proba(X_test)
"""
_estimator_type = "classifier"
def __init__(
self,
hidden_dims: list[int] = None,
dropout: float = 0.3,
batch_norm: bool = True,
activation: str = "relu",
learning_rate: float = 1e-3,
weight_decay: float = 1e-5,
n_epochs: int = 100,
batch_size: int = 256,
early_stopping: int = 10,
class_weight: str | dict | None = None,
scheduler: str = "cosine",
device: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(
hidden_dims=hidden_dims,
dropout=dropout,
batch_norm=batch_norm,
activation=activation,
learning_rate=learning_rate,
weight_decay=weight_decay,
n_epochs=n_epochs,
batch_size=batch_size,
early_stopping=early_stopping,
scheduler=scheduler,
device=device,
random_state=random_state,
verbose=verbose,
)
self.class_weight = class_weight
self.classes_: np.ndarray | None = None
self.n_classes_: int | None = None
self._label_encoder: LabelEncoder | None = None
self._class_weights: torch.Tensor | None = None
def _prepare_target_tensor(self, y: np.ndarray) -> torch.Tensor:
"""Prepare target tensor for classification."""
return torch.LongTensor(y)
def _compute_class_weights(self, y: np.ndarray) -> torch.Tensor | None:
"""Compute class weights."""
if self.class_weight is None:
return None
if self.class_weight == "balanced":
from sklearn.utils.class_weight import compute_class_weight
weights = compute_class_weight(
"balanced", classes=np.unique(y), y=y
)
return torch.FloatTensor(weights)
if isinstance(self.class_weight, dict):
weights = np.array([
self.class_weight.get(c, 1.0) for c in range(self.n_classes_)
])
return torch.FloatTensor(weights)
return None
[docs]
def fit(
self,
X,
y,
val_data: tuple[Any, Any] | None = None,
) -> MLPClassifier:
"""Fit the classifier.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training features.
y : array-like of shape (n_samples,)
Target labels.
val_data : tuple of (X_val, y_val), optional
Validation data for early stopping.
Returns
-------
self
Fitted classifier.
"""
X_arr, y_arr = self._validate_data(X, y)
# Encode labels
self._label_encoder = LabelEncoder()
y_encoded = self._label_encoder.fit_transform(y_arr)
self.classes_ = self._label_encoder.classes_
self.n_classes_ = len(self.classes_)
# Compute class weights
self._class_weights = self._compute_class_weights(y_encoded)
# Prepare validation data
if val_data is not None:
X_val, y_val = val_data
X_val = self._to_numpy(X_val)
y_val = self._label_encoder.transform(np.asarray(y_val))
val_data = (X_val, y_val)
# Create criterion
if self._class_weights is not None:
criterion = nn.CrossEntropyLoss(weight=self._class_weights.to(self._get_device()))
else:
criterion = nn.CrossEntropyLoss()
return self._fit_impl(
X_arr, y_encoded, self.n_classes_, criterion, val_data
)
[docs]
def predict(self, X) -> np.ndarray:
"""Predict class labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input samples.
Returns
-------
ndarray of shape (n_samples,)
Predicted class labels.
"""
proba = self.predict_proba(X)
indices = np.argmax(proba, axis=1)
return self._label_encoder.inverse_transform(indices)
[docs]
def predict_proba(self, X) -> np.ndarray:
"""Predict class probabilities.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input samples.
Returns
-------
ndarray of shape (n_samples, n_classes)
Class probabilities.
"""
X_arr = self._to_numpy(X)
logits = self._predict_impl(X_arr)
# Apply softmax
exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
return exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
[docs]
class MLPRegressor(_BaseMLPEstimator, RegressorMixin):
"""Multi-Layer Perceptron regressor.
PyTorch-based MLP with modern techniques for tabular regression.
Parameters
----------
hidden_dims : List[int], default=[256, 128]
Hidden layer dimensions.
dropout : float, default=0.3
Dropout rate for regularization.
batch_norm : bool, default=True
Whether to use batch normalization.
activation : str, default='relu'
Activation function.
learning_rate : float, default=1e-3
Initial learning rate.
weight_decay : float, default=1e-5
L2 regularization strength.
n_epochs : int, default=100
Maximum number of training epochs.
batch_size : int, default=256
Training batch size.
early_stopping : int, default=10
Patience for early stopping.
loss : str, default='mse'
Loss function: 'mse', 'mae', 'huber'.
scheduler : str, default='cosine'
Learning rate scheduler.
device : str, default='auto'
Device: 'cuda', 'cpu', or 'auto'.
random_state : int, optional
Random seed.
verbose : bool, default=False
Enable verbose output.
Attributes
----------
model_ : _MLPModule
Fitted PyTorch model.
history_ : dict
Training history with 'train_loss' and 'val_loss'.
Examples
--------
>>> from endgame.models.neural import MLPRegressor
>>> reg = MLPRegressor(hidden_dims=[128, 64], n_epochs=50)
>>> reg.fit(X_train, y_train, val_data=(X_val, y_val))
>>> predictions = reg.predict(X_test)
"""
_estimator_type = "regressor"
def __init__(
self,
hidden_dims: list[int] = None,
dropout: float = 0.3,
batch_norm: bool = True,
activation: str = "relu",
learning_rate: float = 1e-3,
weight_decay: float = 1e-5,
n_epochs: int = 100,
batch_size: int = 256,
early_stopping: int = 10,
loss: str = "mse",
scheduler: str = "cosine",
device: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
super().__init__(
hidden_dims=hidden_dims,
dropout=dropout,
batch_norm=batch_norm,
activation=activation,
learning_rate=learning_rate,
weight_decay=weight_decay,
n_epochs=n_epochs,
batch_size=batch_size,
early_stopping=early_stopping,
scheduler=scheduler,
device=device,
random_state=random_state,
verbose=verbose,
)
self.loss = loss
self._target_scaler: StandardScaler | None = None
def _prepare_target_tensor(self, y: np.ndarray) -> torch.Tensor:
"""Prepare target tensor for regression."""
if y.ndim == 1:
y = y.reshape(-1, 1)
return torch.FloatTensor(y)
def _get_criterion(self) -> nn.Module:
"""Get loss criterion."""
if self.loss == "mse":
return nn.MSELoss()
elif self.loss == "mae":
return nn.L1Loss()
elif self.loss == "huber":
return nn.HuberLoss()
else:
raise ValueError(f"Unknown loss: {self.loss}")
[docs]
def fit(
self,
X,
y,
val_data: tuple[Any, Any] | None = None,
) -> MLPRegressor:
"""Fit the regressor.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training features.
y : array-like of shape (n_samples,) or (n_samples, n_targets)
Target values.
val_data : tuple of (X_val, y_val), optional
Validation data for early stopping.
Returns
-------
self
Fitted regressor.
"""
X_arr, y_arr = self._validate_data(X, y)
# Scale targets
self._target_scaler = StandardScaler()
if y_arr.ndim == 1:
y_arr = y_arr.reshape(-1, 1)
y_scaled = self._target_scaler.fit_transform(y_arr)
# Determine output dimension
output_dim = y_scaled.shape[1]
# Prepare validation data
if val_data is not None:
X_val, y_val = val_data
X_val = self._to_numpy(X_val)
y_val = np.asarray(y_val)
if y_val.ndim == 1:
y_val = y_val.reshape(-1, 1)
y_val = self._target_scaler.transform(y_val)
val_data = (X_val, y_val)
criterion = self._get_criterion()
return self._fit_impl(X_arr, y_scaled, output_dim, criterion, val_data)
[docs]
def predict(self, X) -> np.ndarray:
"""Predict target values.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input samples.
Returns
-------
ndarray of shape (n_samples,) or (n_samples, n_targets)
Predicted values.
"""
X_arr = self._to_numpy(X)
predictions = self._predict_impl(X_arr)
# Inverse transform
predictions = self._target_scaler.inverse_transform(predictions)
# Squeeze if single target
if predictions.shape[1] == 1:
predictions = predictions.ravel()
return predictions