Source code for endgame.benchmark.profiler
from __future__ import annotations
"""Dataset meta-feature extraction for meta-learning.
Extracts comprehensive meta-features that characterize datasets for predicting
optimal model/pipeline choices.
"""
import warnings
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import numpy as np
from scipy import stats
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
# Lazy imports for optional dependencies
HAS_PYMFE = False
try:
from pymfe.mfe import MFE
HAS_PYMFE = True
except ImportError:
pass
class MetaFeatureGroup(str, Enum):
"""Groups of meta-features."""
SIMPLE = "simple" # Basic dataset properties
STATISTICAL = "statistical" # Statistical properties
INFO_THEORY = "info-theory" # Information-theoretic measures
LANDMARKING = "landmarking" # Simple model performance
MODEL_BASED = "model-based" # Tree-based meta-features
COMPLEXITY = "complexity" # Data complexity measures
# Meta-feature definitions
SIMPLE_META_FEATURES: list[str] = [
"nr_inst", # Number of instances
"nr_attr", # Number of attributes
"nr_num", # Number of numeric attributes
"nr_cat", # Number of categorical attributes
"nr_class", # Number of classes
"nr_missing", # Number of missing values
"pct_missing", # Percentage of missing values
"inst_to_attr", # Instance to attribute ratio
"cat_to_num", # Categorical to numeric ratio
"class_imbalance", # Class imbalance ratio
"dimensionality", # n_features / n_samples
]
STATISTICAL_META_FEATURES: list[str] = [
"mean_mean", # Mean of feature means
"mean_std", # Mean of feature std devs
"mean_skewness", # Mean skewness across features
"mean_kurtosis", # Mean kurtosis across features
"mean_cor", # Mean absolute correlation
"max_cor", # Maximum absolute correlation
"eigenvalue_ratio", # Ratio of top eigenvalues (PCA)
"outlier_ratio", # Ratio of outliers (IQR method)
]
INFO_THEORY_META_FEATURES: list[str] = [
"class_entropy", # Entropy of class distribution
"attr_entropy", # Mean entropy of attributes
"joint_entropy", # Mean joint entropy (attr, class)
"mutual_info", # Mean mutual information
"noise_signal", # Noise-to-signal ratio
]
LANDMARKING_META_FEATURES: list[str] = [
"lm_1nn", # 1-Nearest Neighbor accuracy
"lm_nb", # Naive Bayes accuracy
"lm_dt_stump", # Decision stump accuracy
"lm_linear", # Linear model accuracy
"lm_random", # Random baseline
]
[docs]
@dataclass
class MetaFeatureSet:
"""Container for extracted meta-features.
Attributes
----------
features : Dict[str, float]
Dictionary of meta-feature name to value.
groups : Dict[str, List[str]]
Mapping from group name to feature names in that group.
extraction_time : float
Time taken to extract features (seconds).
errors : List[str]
Any errors encountered during extraction.
"""
features: dict[str, float] = field(default_factory=dict)
groups: dict[str, list[str]] = field(default_factory=dict)
extraction_time: float = 0.0
errors: list[str] = field(default_factory=list)
[docs]
def to_dict(self) -> dict[str, float]:
"""Convert to dictionary."""
return self.features.copy()
[docs]
def to_array(self, feature_names: list[str] | None = None) -> np.ndarray:
"""Convert to numpy array.
Parameters
----------
feature_names : List[str], optional
Specific features to include (in order).
If None, uses all features in sorted order.
"""
if feature_names is None:
feature_names = sorted(self.features.keys())
return np.array([self.features.get(f, np.nan) for f in feature_names])
[docs]
def get_group(self, group: str) -> dict[str, float]:
"""Get features from a specific group."""
if group not in self.groups:
return {}
return {f: self.features[f] for f in self.groups[group] if f in self.features}
[docs]
class MetaProfiler:
"""Extract meta-features from datasets for meta-learning.
Uses pymfe when available, with fallback to pure numpy/sklearn implementations.
Parameters
----------
groups : List[str], optional
Meta-feature groups to extract. Default: ["simple", "statistical", "info-theory"].
Options: "simple", "statistical", "info-theory", "landmarking", "complexity".
use_pymfe : bool, default=True
Use pymfe library when available (more comprehensive features).
landmarking_cv : int, default=3
Number of CV folds for landmarking meta-features.
random_state : int, default=42
Random seed for reproducibility.
verbose : bool, default=False
Enable verbose output.
Examples
--------
>>> profiler = MetaProfiler(groups=["simple", "statistical"])
>>> meta_features = profiler.profile(X, y)
>>> print(meta_features.features)
>>> # With landmarking
>>> profiler = MetaProfiler(groups=["simple", "landmarking"])
>>> meta_features = profiler.profile(X, y)
"""
def __init__(
self,
groups: list[str] | None = None,
use_pymfe: bool = True,
landmarking_cv: int = 3,
random_state: int = 42,
verbose: bool = False,
):
self.groups = groups or ["simple", "statistical", "info-theory"]
self.use_pymfe = use_pymfe and HAS_PYMFE
self.landmarking_cv = landmarking_cv
self.random_state = random_state
self.verbose = verbose
def _log(self, message: str) -> None:
"""Print message if verbose."""
if self.verbose:
print(f"[MetaProfiler] {message}")
[docs]
def profile(
self,
X: np.ndarray,
y: np.ndarray,
categorical_indicator: list[bool] | None = None,
task_type: str = "classification",
) -> MetaFeatureSet:
"""Extract meta-features from a dataset.
Parameters
----------
X : np.ndarray
Feature matrix of shape (n_samples, n_features).
y : np.ndarray
Target variable of shape (n_samples,).
categorical_indicator : List[bool], optional
Boolean mask indicating categorical features.
task_type : str, default="classification"
Type of task: "classification" or "regression".
Returns
-------
MetaFeatureSet
Extracted meta-features.
"""
import time
start_time = time.time()
X = np.asarray(X, dtype=np.float64)
y = np.asarray(y)
# Handle missing values
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
if categorical_indicator is None:
categorical_indicator = [False] * X.shape[1]
features: dict[str, float] = {}
group_features: dict[str, list[str]] = {}
errors: list[str] = []
# Try pymfe first
if self.use_pymfe:
try:
pymfe_features, pymfe_groups = self._extract_pymfe(
X, y, categorical_indicator, task_type
)
features.update(pymfe_features)
for g, fs in pymfe_groups.items():
group_features.setdefault(g, []).extend(fs)
except Exception as e:
errors.append(f"pymfe extraction failed: {e}")
self._log(f"pymfe failed, using fallback: {e}")
# Always compute our own simple features
simple_features = self._extract_simple(X, y, categorical_indicator, task_type)
features.update(simple_features)
group_features.setdefault("simple", []).extend(simple_features.keys())
# Compute statistical features
if "statistical" in self.groups:
try:
stat_features = self._extract_statistical(X)
features.update(stat_features)
group_features.setdefault("statistical", []).extend(stat_features.keys())
except Exception as e:
errors.append(f"statistical extraction failed: {e}")
# Compute info-theory features
if "info-theory" in self.groups and task_type == "classification":
try:
info_features = self._extract_info_theory(X, y)
features.update(info_features)
group_features.setdefault("info-theory", []).extend(info_features.keys())
except Exception as e:
errors.append(f"info-theory extraction failed: {e}")
# Compute landmarking features
if "landmarking" in self.groups:
try:
lm_features = self._extract_landmarking(X, y, task_type)
features.update(lm_features)
group_features.setdefault("landmarking", []).extend(lm_features.keys())
except Exception as e:
errors.append(f"landmarking extraction failed: {e}")
return MetaFeatureSet(
features=features,
groups=group_features,
extraction_time=time.time() - start_time,
errors=errors,
)
def _extract_pymfe(
self,
X: np.ndarray,
y: np.ndarray,
categorical_indicator: list[bool],
task_type: str,
) -> tuple[dict[str, float], dict[str, list[str]]]:
"""Extract meta-features using pymfe."""
# Map our groups to pymfe groups
pymfe_groups = []
if "simple" in self.groups:
pymfe_groups.append("general")
if "statistical" in self.groups:
pymfe_groups.append("statistical")
if "info-theory" in self.groups:
pymfe_groups.append("info-theory")
if "landmarking" in self.groups:
pymfe_groups.append("landmarking")
if "model-based" in self.groups:
pymfe_groups.append("model-based")
if "complexity" in self.groups:
pymfe_groups.append("complexity")
if not pymfe_groups:
return {}, {}
mfe = MFE(groups=pymfe_groups, random_state=self.random_state)
# Fit with categorical indicator
cat_cols = [i for i, c in enumerate(categorical_indicator) if c]
if cat_cols:
mfe.fit(X, y, cat_cols=cat_cols)
else:
mfe.fit(X, y)
# Extract
names, values = mfe.extract()
features = {}
group_map = {}
for name, value in zip(names, values):
if value is not None and np.isfinite(value):
features[name] = float(value)
# Infer group from feature name
for group in pymfe_groups:
group_map.setdefault(group, []).append(name)
return features, group_map
def _extract_simple(
self,
X: np.ndarray,
y: np.ndarray,
categorical_indicator: list[bool],
task_type: str,
) -> dict[str, float]:
"""Extract simple meta-features."""
n_samples, n_features = X.shape
n_cat = sum(categorical_indicator)
n_num = n_features - n_cat
features = {
"nr_inst": float(n_samples),
"nr_attr": float(n_features),
"nr_num": float(n_num),
"nr_cat": float(n_cat),
"inst_to_attr": n_samples / max(n_features, 1),
"dimensionality": n_features / max(n_samples, 1),
}
# Categorical to numeric ratio
features["cat_to_num"] = n_cat / max(n_num, 1)
# Missing values
n_missing = np.sum(np.isnan(X)) if np.any(np.isnan(X)) else 0
features["nr_missing"] = float(n_missing)
features["pct_missing"] = n_missing / (n_samples * n_features)
# Class-specific features
if task_type == "classification":
unique, counts = np.unique(y, return_counts=True)
features["nr_class"] = float(len(unique))
features["class_imbalance"] = float(max(counts) / max(min(counts), 1))
else:
features["nr_class"] = 0.0
features["class_imbalance"] = 1.0
return features
def _extract_statistical(self, X: np.ndarray) -> dict[str, float]:
"""Extract statistical meta-features."""
features = {}
# Feature-wise statistics
means = np.mean(X, axis=0)
stds = np.std(X, axis=0)
features["mean_mean"] = float(np.mean(means))
features["mean_std"] = float(np.mean(stds))
# Skewness and kurtosis
with warnings.catch_warnings():
warnings.simplefilter("ignore")
skewness = stats.skew(X, axis=0, nan_policy='omit')
kurtosis = stats.kurtosis(X, axis=0, nan_policy='omit')
skewness = np.nan_to_num(skewness, nan=0.0)
kurtosis = np.nan_to_num(kurtosis, nan=0.0)
features["mean_skewness"] = float(np.mean(np.abs(skewness)))
features["mean_kurtosis"] = float(np.mean(np.abs(kurtosis)))
# Correlations
if X.shape[1] > 1:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
corr_matrix = np.corrcoef(X.T)
corr_matrix = np.nan_to_num(corr_matrix, nan=0.0)
# Get upper triangle (excluding diagonal)
upper_tri = corr_matrix[np.triu_indices_from(corr_matrix, k=1)]
features["mean_cor"] = float(np.mean(np.abs(upper_tri)))
features["max_cor"] = float(np.max(np.abs(upper_tri))) if len(upper_tri) > 0 else 0.0
else:
features["mean_cor"] = 0.0
features["max_cor"] = 0.0
# PCA-based eigenvalue ratio
try:
# Standardize
X_scaled = StandardScaler().fit_transform(X)
X_scaled = np.nan_to_num(X_scaled, nan=0.0)
# Compute covariance eigenvalues
cov = np.cov(X_scaled.T)
if cov.ndim == 0:
cov = np.array([[cov]])
eigenvalues = np.linalg.eigvalsh(cov)
eigenvalues = np.sort(eigenvalues)[::-1]
eigenvalues = np.maximum(eigenvalues, 0) # Ensure non-negative
if len(eigenvalues) > 0 and eigenvalues.sum() > 0:
features["eigenvalue_ratio"] = float(eigenvalues[0] / eigenvalues.sum())
else:
features["eigenvalue_ratio"] = 1.0
except Exception:
features["eigenvalue_ratio"] = 1.0
# Outlier ratio (IQR method)
outlier_count = 0
for col in range(X.shape[1]):
q1, q3 = np.percentile(X[:, col], [25, 75])
iqr = q3 - q1
if iqr > 0:
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
outlier_count += np.sum((X[:, col] < lower) | (X[:, col] > upper))
features["outlier_ratio"] = outlier_count / (X.shape[0] * X.shape[1])
return features
def _extract_info_theory(
self,
X: np.ndarray,
y: np.ndarray,
) -> dict[str, float]:
"""Extract information-theoretic meta-features."""
features = {}
# Class entropy
unique, counts = np.unique(y, return_counts=True)
probs = counts / counts.sum()
class_entropy = -np.sum(probs * np.log2(probs + 1e-10))
features["class_entropy"] = float(class_entropy)
# Attribute entropy (discretize continuous features)
attr_entropies = []
n_bins = min(10, int(np.sqrt(X.shape[0])))
for col in range(X.shape[1]):
try:
# Discretize
binned = np.digitize(X[:, col], np.linspace(X[:, col].min(), X[:, col].max(), n_bins))
unique, counts = np.unique(binned, return_counts=True)
probs = counts / counts.sum()
entropy = -np.sum(probs * np.log2(probs + 1e-10))
attr_entropies.append(entropy)
except Exception:
continue
features["attr_entropy"] = float(np.mean(attr_entropies)) if attr_entropies else 0.0
# Joint entropy and mutual information
mutual_infos = []
joint_entropies = []
for col in range(min(X.shape[1], 20)): # Limit for speed
try:
binned = np.digitize(X[:, col], np.linspace(X[:, col].min(), X[:, col].max(), n_bins))
# Joint distribution
joint_counts = {}
for b, c in zip(binned, y):
key = (b, c)
joint_counts[key] = joint_counts.get(key, 0) + 1
joint_probs = np.array(list(joint_counts.values())) / len(y)
joint_entropy = -np.sum(joint_probs * np.log2(joint_probs + 1e-10))
joint_entropies.append(joint_entropy)
# Mutual information: I(X;Y) = H(X) + H(Y) - H(X,Y)
attr_unique, attr_counts = np.unique(binned, return_counts=True)
attr_probs = attr_counts / attr_counts.sum()
attr_entropy = -np.sum(attr_probs * np.log2(attr_probs + 1e-10))
mi = attr_entropy + class_entropy - joint_entropy
mutual_infos.append(max(0, mi)) # MI should be non-negative
except Exception:
continue
features["joint_entropy"] = float(np.mean(joint_entropies)) if joint_entropies else 0.0
features["mutual_info"] = float(np.mean(mutual_infos)) if mutual_infos else 0.0
# Noise-to-signal ratio
if features["mutual_info"] > 0:
features["noise_signal"] = features["attr_entropy"] / features["mutual_info"]
else:
features["noise_signal"] = float('inf')
return features
def _extract_landmarking(
self,
X: np.ndarray,
y: np.ndarray,
task_type: str,
) -> dict[str, float]:
"""Extract landmarking meta-features (simple model performance)."""
from sklearn.dummy import DummyClassifier, DummyRegressor
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
features = {}
# Prepare data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_scaled = np.nan_to_num(X_scaled, nan=0.0, posinf=0.0, neginf=0.0)
# Limit size for speed
if X.shape[0] > 5000:
rng = np.random.RandomState(self.random_state)
idx = rng.choice(X.shape[0], 5000, replace=False)
X_scaled = X_scaled[idx]
y_subset = y[idx]
else:
y_subset = y
scoring = "accuracy" if task_type == "classification" else "r2"
models: list[tuple[str, Any]] = []
if task_type == "classification":
models = [
("lm_1nn", KNeighborsClassifier(n_neighbors=1)),
("lm_nb", GaussianNB()),
("lm_dt_stump", DecisionTreeClassifier(max_depth=1, random_state=self.random_state)),
("lm_linear", LogisticRegression(max_iter=100, random_state=self.random_state)),
("lm_random", DummyClassifier(strategy="stratified", random_state=self.random_state)),
]
else:
models = [
("lm_1nn", KNeighborsRegressor(n_neighbors=1)),
("lm_dt_stump", DecisionTreeRegressor(max_depth=1, random_state=self.random_state)),
("lm_linear", Ridge(random_state=self.random_state)),
("lm_random", DummyRegressor(strategy="mean")),
]
for name, model in models:
try:
scores = cross_val_score(
model,
X_scaled,
y_subset,
cv=self.landmarking_cv,
scoring=scoring,
)
features[name] = float(np.mean(scores))
except Exception as e:
self._log(f"Landmarking {name} failed: {e}")
features[name] = np.nan
return features
[docs]
def get_feature_names(self) -> list[str]:
"""Get list of all possible meta-feature names."""
names = []
if "simple" in self.groups:
names.extend(SIMPLE_META_FEATURES)
if "statistical" in self.groups:
names.extend(STATISTICAL_META_FEATURES)
if "info-theory" in self.groups:
names.extend(INFO_THEORY_META_FEATURES)
if "landmarking" in self.groups:
names.extend(LANDMARKING_META_FEATURES)
return names
def profile_dataset(
X: np.ndarray,
y: np.ndarray,
groups: list[str] | None = None,
**kwargs,
) -> dict[str, float]:
"""Convenience function to profile a dataset.
Parameters
----------
X : np.ndarray
Feature matrix.
y : np.ndarray
Target variable.
groups : List[str], optional
Meta-feature groups to extract.
**kwargs
Additional arguments passed to MetaProfiler.
Returns
-------
Dict[str, float]
Dictionary of meta-features.
"""
profiler = MetaProfiler(groups=groups, **kwargs)
result = profiler.profile(X, y)
return result.to_dict()