from __future__ import annotations
"""Variational Autoencoder for Dimensionality Reduction.
This module implements a VAE-based approach to dimensionality reduction,
learning a smooth latent space that can be used for both reduction and
generation.
Classes
-------
VAEReducer : Variational Autoencoder for dimensionality reduction
"""
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler
from sklearn.utils.validation import check_array, check_is_fitted
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
HAS_TORCH = True
except ImportError:
HAS_TORCH = False
torch = None
nn = None
def _check_torch_installed():
"""Raise ImportError if PyTorch is not installed."""
if not HAS_TORCH:
raise ImportError(
"PyTorch is required for VAEReducer. "
"Install it with: pip install torch"
)
_VAEBase = nn.Module if HAS_TORCH else object
class _VAEModule(_VAEBase):
"""Variational Autoencoder neural network module.
Architecture:
- Encoder: Input -> Hidden layers -> (mu, log_var)
- Reparameterization: z = mu + std * epsilon
- Decoder: z -> Hidden layers -> Reconstruction
"""
def __init__(
self,
input_dim: int,
latent_dim: int,
encoder_layers: list[int],
decoder_layers: list[int],
activation: str = "relu",
dropout: float = 0.0,
):
super().__init__()
self.input_dim = input_dim
self.latent_dim = latent_dim
# Activation function
if activation == "relu":
self.activation = nn.ReLU()
elif activation == "leaky_relu":
self.activation = nn.LeakyReLU(0.2)
elif activation == "elu":
self.activation = nn.ELU()
elif activation == "tanh":
self.activation = nn.Tanh()
else:
self.activation = nn.ReLU()
# Build encoder
encoder_modules = []
prev_dim = input_dim
for hidden_dim in encoder_layers:
encoder_modules.append(nn.Linear(prev_dim, hidden_dim))
encoder_modules.append(nn.BatchNorm1d(hidden_dim))
encoder_modules.append(self.activation)
if dropout > 0:
encoder_modules.append(nn.Dropout(dropout))
prev_dim = hidden_dim
self.encoder = nn.Sequential(*encoder_modules)
# Latent space parameters
self.fc_mu = nn.Linear(prev_dim, latent_dim)
self.fc_log_var = nn.Linear(prev_dim, latent_dim)
# Build decoder
decoder_modules = []
prev_dim = latent_dim
for hidden_dim in decoder_layers:
decoder_modules.append(nn.Linear(prev_dim, hidden_dim))
decoder_modules.append(nn.BatchNorm1d(hidden_dim))
decoder_modules.append(self.activation)
if dropout > 0:
decoder_modules.append(nn.Dropout(dropout))
prev_dim = hidden_dim
decoder_modules.append(nn.Linear(prev_dim, input_dim))
self.decoder = nn.Sequential(*decoder_modules)
def encode(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
"""Encode input to latent distribution parameters.
Parameters
----------
x : Tensor of shape (batch, input_dim)
Returns
-------
mu : Tensor of shape (batch, latent_dim)
Mean of the latent distribution.
log_var : Tensor of shape (batch, latent_dim)
Log variance of the latent distribution.
"""
h = self.encoder(x)
mu = self.fc_mu(h)
log_var = self.fc_log_var(h)
return mu, log_var
def reparameterize(
self, mu: torch.Tensor, log_var: torch.Tensor
) -> torch.Tensor:
"""Reparameterization trick for backpropagation.
Parameters
----------
mu : Tensor of shape (batch, latent_dim)
log_var : Tensor of shape (batch, latent_dim)
Returns
-------
z : Tensor of shape (batch, latent_dim)
Sampled latent vector.
"""
std = torch.exp(0.5 * log_var)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z: torch.Tensor) -> torch.Tensor:
"""Decode latent vector to reconstruction.
Parameters
----------
z : Tensor of shape (batch, latent_dim)
Returns
-------
x_recon : Tensor of shape (batch, input_dim)
"""
return self.decoder(z)
def forward(
self, x: torch.Tensor
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""Forward pass.
Parameters
----------
x : Tensor of shape (batch, input_dim)
Returns
-------
x_recon : Tensor of shape (batch, input_dim)
Reconstructed input.
mu : Tensor of shape (batch, latent_dim)
Latent mean.
log_var : Tensor of shape (batch, latent_dim)
Latent log variance.
"""
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
x_recon = self.decode(z)
return x_recon, mu, log_var
[docs]
class VAEReducer(TransformerMixin, BaseEstimator):
"""Variational Autoencoder for Dimensionality Reduction.
VAE learns a probabilistic mapping to a lower-dimensional latent space
with a smooth structure, useful for both dimensionality reduction and
generative modeling.
Parameters
----------
n_components : int, default=2
Dimension of the latent space.
encoder_layers : list of int, default=[256, 128]
Hidden layer sizes for the encoder.
decoder_layers : list of int, default=[128, 256]
Hidden layer sizes for the decoder.
activation : str, default='relu'
Activation function: 'relu', 'leaky_relu', 'elu', 'tanh'.
dropout : float, default=0.0
Dropout rate in hidden layers.
learning_rate : float, default=1e-3
Learning rate for Adam optimizer.
batch_size : int, default=128
Mini-batch size.
n_epochs : int, default=100
Number of training epochs.
beta : float, default=1.0
Weight of KL divergence term (beta-VAE parameter).
Higher values create more disentangled representations.
early_stopping : int, default=10
Stop if no improvement for this many epochs.
validation_fraction : float, default=0.1
Fraction of data for validation.
scale_data : bool, default=True
Whether to standardize input features.
device : str, default='auto'
Device: 'auto', 'cpu', or 'cuda'.
random_state : int, optional
Random seed.
verbose : bool, default=False
Whether to print training progress.
Attributes
----------
model_ : _VAEModule
Fitted VAE model.
reconstruction_loss_ : float
Final reconstruction loss.
kl_loss_ : float
Final KL divergence loss.
Example
-------
>>> from endgame.dimensionality_reduction import VAEReducer
>>> vae = VAEReducer(n_components=10, encoder_layers=[512, 256])
>>> X_latent = vae.fit_transform(X)
>>> # Reconstruct data
>>> X_recon = vae.inverse_transform(X_latent)
>>> # Generate new samples
>>> z_random = np.random.randn(100, 10)
>>> X_generated = vae.decode(z_random)
"""
def __init__(
self,
n_components: int = 2,
encoder_layers: list[int] | None = None,
decoder_layers: list[int] | None = None,
activation: str = "relu",
dropout: float = 0.0,
learning_rate: float = 1e-3,
batch_size: int = 128,
n_epochs: int = 100,
beta: float = 1.0,
early_stopping: int = 10,
validation_fraction: float = 0.1,
scale_data: bool = True,
device: str = "auto",
random_state: int | None = None,
verbose: bool = False,
):
self.n_components = n_components
self.encoder_layers = encoder_layers or [256, 128]
self.decoder_layers = decoder_layers or [128, 256]
self.activation = activation
self.dropout = dropout
self.learning_rate = learning_rate
self.batch_size = batch_size
self.n_epochs = n_epochs
self.beta = beta
self.early_stopping = early_stopping
self.validation_fraction = validation_fraction
self.scale_data = scale_data
self.device = device
self.random_state = random_state
self.verbose = verbose
def _get_device(self):
"""Determine the device to use."""
if self.device == "auto":
return torch.device("cuda" if torch.cuda.is_available() else "cpu")
return torch.device(self.device)
def _vae_loss(
self,
x: torch.Tensor,
x_recon: torch.Tensor,
mu: torch.Tensor,
log_var: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""Compute VAE loss: reconstruction + KL divergence.
Parameters
----------
x : Original input
x_recon : Reconstructed input
mu : Latent mean
log_var : Latent log variance
Returns
-------
total_loss : Combined loss
recon_loss : Reconstruction loss
kl_loss : KL divergence loss
"""
# Reconstruction loss (MSE)
recon_loss = F.mse_loss(x_recon, x, reduction="mean")
# KL divergence: D_KL(q(z|x) || p(z))
# For Gaussian: -0.5 * sum(1 + log_var - mu^2 - exp(log_var))
kl_loss = -0.5 * torch.mean(
1 + log_var - mu.pow(2) - log_var.exp()
)
# Total loss with beta weighting
total_loss = recon_loss + self.beta * kl_loss
return total_loss, recon_loss, kl_loss
def _log(self, message: str):
"""Print message if verbose."""
if self.verbose:
print(message)
[docs]
def fit(self, X, y=None):
"""Fit the VAE model.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : Ignored
Returns
-------
self : VAEReducer
"""
_check_torch_installed()
if self.random_state is not None:
torch.manual_seed(self.random_state)
np.random.seed(self.random_state)
X = check_array(X, dtype=np.float32)
self.n_features_in_ = X.shape[1]
# Scale data
if self.scale_data:
self._scaler = StandardScaler()
X = self._scaler.fit_transform(X)
else:
self._scaler = None
# Get device
self._device = self._get_device()
self._log(f"Training VAE on {self._device}")
# Create model
self.model_ = _VAEModule(
input_dim=self.n_features_in_,
latent_dim=self.n_components,
encoder_layers=self.encoder_layers,
decoder_layers=self.decoder_layers,
activation=self.activation,
dropout=self.dropout,
).to(self._device)
# Prepare data
X_tensor = torch.tensor(X, dtype=torch.float32)
# Train/validation split
if self.validation_fraction > 0:
n_val = int(len(X) * self.validation_fraction)
indices = np.random.permutation(len(X))
train_idx, val_idx = indices[n_val:], indices[:n_val]
X_train = X_tensor[train_idx]
X_val = X_tensor[val_idx]
else:
X_train = X_tensor
X_val = None
train_loader = DataLoader(
TensorDataset(X_train),
batch_size=self.batch_size,
shuffle=True,
)
if X_val is not None:
val_loader = DataLoader(
TensorDataset(X_val),
batch_size=self.batch_size,
)
else:
val_loader = None
# Optimizer
optimizer = optim.Adam(
self.model_.parameters(),
lr=self.learning_rate,
)
# Training loop
best_val_loss = float("inf")
best_state = None
patience_counter = 0
for epoch in range(self.n_epochs):
# Training
self.model_.train()
train_recon_loss = 0.0
train_kl_loss = 0.0
n_batches = 0
for (x_batch,) in train_loader:
x_batch = x_batch.to(self._device)
optimizer.zero_grad()
x_recon, mu, log_var = self.model_(x_batch)
loss, recon_loss, kl_loss = self._vae_loss(
x_batch, x_recon, mu, log_var
)
loss.backward()
optimizer.step()
train_recon_loss += recon_loss.item()
train_kl_loss += kl_loss.item()
n_batches += 1
train_recon_loss /= n_batches
train_kl_loss /= n_batches
# Validation
if val_loader is not None:
self.model_.eval()
val_loss = 0.0
n_val_batches = 0
with torch.no_grad():
for (x_batch,) in val_loader:
x_batch = x_batch.to(self._device)
x_recon, mu, log_var = self.model_(x_batch)
loss, _, _ = self._vae_loss(
x_batch, x_recon, mu, log_var
)
val_loss += loss.item()
n_val_batches += 1
val_loss /= n_val_batches
if val_loss < best_val_loss:
best_val_loss = val_loss
best_state = {
k: v.cpu().clone()
for k, v in self.model_.state_dict().items()
}
patience_counter = 0
else:
patience_counter += 1
if self.verbose and (epoch + 1) % 10 == 0:
self._log(
f"Epoch {epoch+1}: recon={train_recon_loss:.4f}, "
f"kl={train_kl_loss:.4f}, val={val_loss:.4f}"
)
if patience_counter >= self.early_stopping:
self._log(f"Early stopping at epoch {epoch + 1}")
break
else:
if self.verbose and (epoch + 1) % 10 == 0:
self._log(
f"Epoch {epoch+1}: recon={train_recon_loss:.4f}, "
f"kl={train_kl_loss:.4f}"
)
# Restore best model
if best_state is not None:
self.model_.load_state_dict(best_state)
self.model_.eval()
self.reconstruction_loss_ = train_recon_loss
self.kl_loss_ = train_kl_loss
return self
[docs]
def decode(self, z) -> np.ndarray:
"""Decode latent vectors to data space.
Alias for inverse_transform, useful for generation.
Parameters
----------
z : array-like of shape (n_samples, n_components)
Latent vectors.
Returns
-------
X : ndarray of shape (n_samples, n_features)
Generated data.
"""
return self.inverse_transform(z)
[docs]
def sample(self, n_samples: int = 100) -> np.ndarray:
"""Generate new samples from the prior.
Parameters
----------
n_samples : int, default=100
Number of samples to generate.
Returns
-------
X : ndarray of shape (n_samples, n_features)
Generated samples.
"""
check_is_fitted(self, "model_")
z = np.random.randn(n_samples, self.n_components).astype(np.float32)
return self.decode(z)
[docs]
def reconstruct(self, X) -> np.ndarray:
"""Reconstruct input through the VAE.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data to reconstruct.
Returns
-------
X_recon : ndarray of shape (n_samples, n_features)
Reconstructed data.
"""
X_latent = self.transform(X)
return self.inverse_transform(X_latent)
[docs]
def reconstruction_error(self, X) -> np.ndarray:
"""Compute reconstruction error for each sample.
Useful for anomaly detection.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Data to evaluate.
Returns
-------
errors : ndarray of shape (n_samples,)
Reconstruction error (MSE) per sample.
"""
X = check_array(X, dtype=np.float32)
X_recon = self.reconstruct(X)
return np.mean((X - X_recon) ** 2, axis=1)