from __future__ import annotations
"""Results analysis and visualization for benchmarks.
Provides tools for analyzing, ranking, and visualizing benchmark results.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Any
import numpy as np
from scipy import stats
try:
import polars as pl
HAS_POLARS = True
except ImportError:
HAS_POLARS = False
try:
import pandas as pd
HAS_PANDAS = True
except ImportError:
HAS_PANDAS = False
from endgame.benchmark.tracker import ExperimentTracker
[docs]
class RankingMethod(str, Enum):
"""Methods for ranking models."""
MEAN_SCORE = "mean_score" # Mean metric value across datasets
MEAN_RANK = "mean_rank" # Mean rank across datasets
WIN_COUNT = "win_count" # Number of datasets where model was best
BORDA_COUNT = "borda_count" # Borda count ranking
FRIEDMAN = "friedman" # Friedman test ranking
@dataclass
class ModelComparison:
"""Comparison between two models.
Attributes
----------
model_a : str
Name of first model.
model_b : str
Name of second model.
wins_a : int
Datasets where model A won.
wins_b : int
Datasets where model B won.
ties : int
Datasets where models tied.
p_value : float
P-value from statistical test.
effect_size : float
Effect size (Cohen's d or similar).
is_significant : bool
Whether difference is statistically significant.
"""
model_a: str
model_b: str
wins_a: int = 0
wins_b: int = 0
ties: int = 0
p_value: float = 1.0
effect_size: float = 0.0
is_significant: bool = False
[docs]
class ResultsAnalyzer:
"""Analyze and compare benchmark results.
Provides methods for:
- Ranking models across datasets
- Statistical significance testing
- Critical difference diagrams
- Performance profiles
- Meta-feature correlation analysis
Parameters
----------
tracker : ExperimentTracker
Tracker containing experiment results.
metric : str, default="accuracy"
Primary metric for comparisons.
higher_is_better : bool, default=True
Whether higher metric values are better.
significance_level : float, default=0.05
Alpha level for statistical tests.
Examples
--------
>>> analyzer = ResultsAnalyzer(tracker, metric="accuracy")
>>> rankings = analyzer.rank_models()
>>> print(rankings)
>>>
>>> # Statistical comparison
>>> comparison = analyzer.compare_models("RF", "XGBoost")
>>> print(f"P-value: {comparison.p_value}")
"""
def __init__(
self,
tracker: ExperimentTracker,
metric: str = "accuracy",
higher_is_better: bool = True,
significance_level: float = 0.05,
):
self.tracker = tracker
self.metric = metric
self.higher_is_better = higher_is_better
self.significance_level = significance_level
self._df = None
self._pivot_table = None
[docs]
@classmethod
def from_pivot(
cls,
pivot: dict[str, dict[str, float]],
metric: str = "accuracy",
higher_is_better: bool = True,
significance_level: float = 0.05,
) -> ResultsAnalyzer:
"""Create a ResultsAnalyzer from a pivot dict.
Convenience factory for external experiment systems that already have
results in {dataset: {method: score}} form.
Parameters
----------
pivot : Dict[str, Dict[str, float]]
Mapping of dataset_name -> {method_name: score}.
metric : str, default="accuracy"
Name of the metric the scores represent.
higher_is_better : bool, default=True
Whether higher metric values are better.
significance_level : float, default=0.05
Alpha level for statistical tests.
Returns
-------
ResultsAnalyzer
Analyzer ready for ranking, comparison, and statistical tests.
Examples
--------
>>> pivot = {
... "iris": {"RF": 0.95, "XGB": 0.96},
... "wine": {"RF": 0.97, "XGB": 0.95},
... }
>>> analyzer = ResultsAnalyzer.from_pivot(pivot, metric="accuracy")
>>> print(analyzer.summary_table())
"""
tracker = ExperimentTracker(name="from_pivot")
for dataset_name, method_scores in pivot.items():
for method_name, score in method_scores.items():
if score is not None:
tracker.log_experiment(
dataset_name=dataset_name,
model_name=method_name,
metrics={metric: score},
)
return cls(
tracker=tracker,
metric=metric,
higher_is_better=higher_is_better,
significance_level=significance_level,
)
@property
def df(self):
"""Get results as DataFrame."""
if self._df is None:
self._df = self.tracker.to_dataframe()
return self._df
[docs]
def get_pivot_table(self, metric: str | None = None):
"""Get pivot table of models vs datasets.
Parameters
----------
metric : str, optional
Metric to use. If None, uses default metric.
Returns
-------
DataFrame
Pivot table with models as rows, datasets as columns.
"""
metric = metric or self.metric
metric_col = f"metric_{metric}"
df = self.df
# Filter to successful experiments
if HAS_POLARS and isinstance(df, pl.DataFrame):
df = df.filter(pl.col("status") == "success")
# Create pivot table
pivot = df.pivot(
values=metric_col,
index="model_name",
on="dataset_name",
)
else:
df = df[df["status"] == "success"]
pivot = df.pivot_table(
values=metric_col,
index="model_name",
columns="dataset_name",
aggfunc="mean",
)
return pivot
[docs]
def rank_models(
self,
method: str | RankingMethod = RankingMethod.MEAN_RANK,
metric: str | None = None,
) -> dict[str, float]:
"""Rank models across all datasets.
Parameters
----------
method : RankingMethod
Ranking method to use.
metric : str, optional
Metric to rank by.
Returns
-------
Dict[str, float]
Model name to rank/score mapping (sorted).
"""
method = RankingMethod(method) if isinstance(method, str) else method
metric = metric or self.metric
pivot = self.get_pivot_table(metric)
if HAS_POLARS and isinstance(pivot, pl.DataFrame):
models = pivot["model_name"].to_list()
data = pivot.drop("model_name").to_numpy()
else:
models = list(pivot.index)
data = pivot.values
# Handle NaN values
data = np.nan_to_num(data, nan=np.nanmin(data) - 1 if self.higher_is_better else np.nanmax(data) + 1)
if method == RankingMethod.MEAN_SCORE:
scores = np.nanmean(data, axis=1)
if not self.higher_is_better:
scores = -scores
elif method == RankingMethod.MEAN_RANK:
# Compute rank for each dataset
ranks = np.zeros_like(data)
for j in range(data.shape[1]):
col = data[:, j]
if self.higher_is_better:
ranks[:, j] = stats.rankdata(-col) # Higher is better = lower rank
else:
ranks[:, j] = stats.rankdata(col)
scores = -np.nanmean(ranks, axis=1) # Negative because lower rank is better
elif method == RankingMethod.WIN_COUNT:
wins = np.zeros(len(models))
for j in range(data.shape[1]):
col = data[:, j]
if self.higher_is_better:
best_idx = np.argmax(col)
else:
best_idx = np.argmin(col)
wins[best_idx] += 1
scores = wins
elif method == RankingMethod.BORDA_COUNT:
# Borda count: points based on position in each ranking
n_models = len(models)
points = np.zeros(len(models))
for j in range(data.shape[1]):
col = data[:, j]
if self.higher_is_better:
order = np.argsort(-col)
else:
order = np.argsort(col)
for rank, idx in enumerate(order):
points[idx] += n_models - rank - 1
scores = points
elif method == RankingMethod.FRIEDMAN:
# Use Friedman ranks
ranks = np.zeros_like(data)
for j in range(data.shape[1]):
col = data[:, j]
if self.higher_is_better:
ranks[:, j] = stats.rankdata(-col)
else:
ranks[:, j] = stats.rankdata(col)
scores = -np.nanmean(ranks, axis=1)
else:
raise ValueError(f"Unknown ranking method: {method}")
# Create sorted dictionary
ranking = dict(sorted(
zip(models, scores),
key=lambda x: x[1],
reverse=True,
))
return ranking
[docs]
def compare_models(
self,
model_a: str,
model_b: str,
metric: str | None = None,
test: str = "wilcoxon",
) -> ModelComparison:
"""Compare two models statistically.
Parameters
----------
model_a : str
Name of first model.
model_b : str
Name of second model.
metric : str, optional
Metric to compare on.
test : str, default="wilcoxon"
Statistical test: "wilcoxon", "paired_t", "sign".
Returns
-------
ModelComparison
Comparison results.
"""
metric = metric or self.metric
pivot = self.get_pivot_table(metric)
if HAS_POLARS and isinstance(pivot, pl.DataFrame):
pivot_pd = pivot.to_pandas().set_index("model_name")
else:
pivot_pd = pivot
if model_a not in pivot_pd.index or model_b not in pivot_pd.index:
raise ValueError("Model not found in results")
scores_a = pivot_pd.loc[model_a].values
scores_b = pivot_pd.loc[model_b].values
# Remove NaN pairs
valid_mask = ~(np.isnan(scores_a) | np.isnan(scores_b))
scores_a = scores_a[valid_mask]
scores_b = scores_b[valid_mask]
if len(scores_a) < 2:
return ModelComparison(model_a=model_a, model_b=model_b)
# Count wins
if self.higher_is_better:
wins_a = np.sum(scores_a > scores_b)
wins_b = np.sum(scores_b > scores_a)
else:
wins_a = np.sum(scores_a < scores_b)
wins_b = np.sum(scores_b < scores_a)
ties = len(scores_a) - wins_a - wins_b
# Statistical test
if test == "wilcoxon":
try:
stat, p_value = stats.wilcoxon(scores_a, scores_b)
except ValueError:
p_value = 1.0
elif test == "paired_t":
stat, p_value = stats.ttest_rel(scores_a, scores_b)
elif test == "sign":
# Sign test
diff = scores_a - scores_b
if self.higher_is_better:
n_plus = np.sum(diff > 0)
else:
n_plus = np.sum(diff < 0)
n = np.sum(diff != 0)
p_value = stats.binom_test(n_plus, n, 0.5) if n > 0 else 1.0
else:
raise ValueError(f"Unknown test: {test}")
# Effect size (Cohen's d)
pooled_std = np.sqrt((np.var(scores_a) + np.var(scores_b)) / 2)
if pooled_std > 0:
effect_size = (np.mean(scores_a) - np.mean(scores_b)) / pooled_std
else:
effect_size = 0.0
return ModelComparison(
model_a=model_a,
model_b=model_b,
wins_a=int(wins_a),
wins_b=int(wins_b),
ties=int(ties),
p_value=float(p_value),
effect_size=float(effect_size),
is_significant=p_value < self.significance_level,
)
[docs]
def friedman_test(self, metric: str | None = None) -> tuple[float, float]:
"""Perform Friedman test across all models.
Parameters
----------
metric : str, optional
Metric to test on.
Returns
-------
Tuple[float, float]
(chi2 statistic, p-value)
"""
metric = metric or self.metric
pivot = self.get_pivot_table(metric)
if HAS_POLARS and isinstance(pivot, pl.DataFrame):
data = pivot.drop("model_name").to_numpy()
else:
data = pivot.values
# Remove datasets with missing values
valid_cols = ~np.any(np.isnan(data), axis=0)
data = data[:, valid_cols]
if data.shape[1] < 2:
return 0.0, 1.0
# Friedman test
chi2, p_value = stats.friedmanchisquare(*data)
return float(chi2), float(p_value)
[docs]
def nemenyi_critical_difference(
self,
alpha: float = 0.05,
) -> float:
"""Compute critical difference for Nemenyi test.
Parameters
----------
alpha : float, default=0.05
Significance level.
Returns
-------
float
Critical difference value.
"""
pivot = self.get_pivot_table()
if HAS_POLARS and isinstance(pivot, pl.DataFrame):
n_models = len(pivot["model_name"])
n_datasets = len(pivot.columns) - 1
else:
n_models = len(pivot.index)
n_datasets = len(pivot.columns)
# Critical values for Nemenyi test (q_alpha)
# These are approximations for common alpha levels
q_alpha_table = {
2: {0.05: 1.960, 0.10: 1.645},
3: {0.05: 2.343, 0.10: 2.052},
4: {0.05: 2.569, 0.10: 2.291},
5: {0.05: 2.728, 0.10: 2.459},
6: {0.05: 2.850, 0.10: 2.589},
7: {0.05: 2.949, 0.10: 2.693},
8: {0.05: 3.031, 0.10: 2.780},
9: {0.05: 3.102, 0.10: 2.855},
10: {0.05: 3.164, 0.10: 2.920},
}
k = min(n_models, 10)
q_alpha = q_alpha_table.get(k, {}).get(alpha, 2.569) # Default to k=4
cd = q_alpha * np.sqrt(n_models * (n_models + 1) / (6 * n_datasets))
return cd
[docs]
def get_model_summary(
self,
model_name: str,
metric: str | None = None,
) -> dict[str, Any]:
"""Get detailed summary for a specific model.
Parameters
----------
model_name : str
Name of the model.
metric : str, optional
Metric to summarize.
Returns
-------
Dict[str, Any]
Summary statistics.
"""
metric = metric or self.metric
metric_col = f"metric_{metric}"
df = self.df
if HAS_POLARS and isinstance(df, pl.DataFrame):
model_df = df.filter(
(pl.col("model_name") == model_name) &
(pl.col("status") == "success")
)
if len(model_df) == 0:
return {"error": "No successful experiments for this model"}
scores = model_df[metric_col].to_numpy()
datasets = model_df["dataset_name"].to_list()
fit_times = model_df["fit_time"].to_numpy()
else:
model_df = df[(df["model_name"] == model_name) & (df["status"] == "success")]
if len(model_df) == 0:
return {"error": "No successful experiments for this model"}
scores = model_df[metric_col].values
datasets = model_df["dataset_name"].tolist()
fit_times = model_df["fit_time"].values
# Remove NaN
valid = ~np.isnan(scores)
scores = scores[valid]
return {
"model_name": model_name,
"metric": metric,
"n_datasets": len(datasets),
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"min_score": float(np.min(scores)),
"max_score": float(np.max(scores)),
"median_score": float(np.median(scores)),
"mean_fit_time": float(np.mean(fit_times)),
"total_fit_time": float(np.sum(fit_times)),
"best_dataset": datasets[np.argmax(scores)] if len(scores) > 0 else None,
"worst_dataset": datasets[np.argmin(scores)] if len(scores) > 0 else None,
}
[docs]
def get_dataset_summary(
self,
dataset_name: str,
metric: str | None = None,
) -> dict[str, Any]:
"""Get detailed summary for a specific dataset.
Parameters
----------
dataset_name : str
Name of the dataset.
metric : str, optional
Metric to summarize.
Returns
-------
Dict[str, Any]
Summary statistics.
"""
metric = metric or self.metric
metric_col = f"metric_{metric}"
df = self.df
if HAS_POLARS and isinstance(df, pl.DataFrame):
ds_df = df.filter(
(pl.col("dataset_name") == dataset_name) &
(pl.col("status") == "success")
)
if len(ds_df) == 0:
return {"error": "No successful experiments for this dataset"}
scores = ds_df[metric_col].to_numpy()
models = ds_df["model_name"].to_list()
n_samples = int(ds_df["n_samples"][0])
n_features = int(ds_df["n_features"][0])
task_type = str(ds_df["task_type"][0])
else:
ds_df = df[(df["dataset_name"] == dataset_name) & (df["status"] == "success")]
if len(ds_df) == 0:
return {"error": "No successful experiments for this dataset"}
scores = ds_df[metric_col].values
models = ds_df["model_name"].tolist()
n_samples = int(ds_df["n_samples"].iloc[0])
n_features = int(ds_df["n_features"].iloc[0])
task_type = str(ds_df["task_type"].iloc[0])
# Remove NaN
valid = ~np.isnan(scores)
scores = scores[valid]
models_valid = [m for m, v in zip(models, valid) if v]
best_idx = np.argmax(scores) if self.higher_is_better else np.argmin(scores)
return {
"dataset_name": dataset_name,
"metric": metric,
"n_models": len(models),
"n_samples": n_samples,
"n_features": n_features,
"task_type": task_type,
"mean_score": float(np.mean(scores)),
"std_score": float(np.std(scores)),
"best_score": float(np.max(scores)) if self.higher_is_better else float(np.min(scores)),
"best_model": models_valid[best_idx] if len(models_valid) > 0 else None,
"score_range": float(np.max(scores) - np.min(scores)),
}
[docs]
def summary_table(
self,
metric: str | None = None,
sort_by: str = "mean_rank",
) -> str:
"""Generate formatted summary table.
Parameters
----------
metric : str, optional
Metric to summarize.
sort_by : str, default="mean_rank"
Column to sort by.
Returns
-------
str
Formatted table string.
"""
metric = metric or self.metric
rankings = self.rank_models(RankingMethod.MEAN_RANK, metric)
scores = self.rank_models(RankingMethod.MEAN_SCORE, metric)
wins = self.rank_models(RankingMethod.WIN_COUNT, metric)
lines = [
f"Benchmark Summary (metric: {metric})",
"=" * 70,
f"{'Model':<25} {'Mean Rank':<12} {'Mean Score':<12} {'Wins':<8}",
"-" * 70,
]
for model in rankings.keys():
mean_rank = -rankings[model] # Convert back to positive rank
mean_score = scores.get(model, 0)
win_count = wins.get(model, 0)
lines.append(f"{model:<25} {mean_rank:<12.2f} {mean_score:<12.4f} {int(win_count):<8}")
lines.append("-" * 70)
# Add Friedman test result
chi2, p_value = self.friedman_test(metric)
lines.append(f"Friedman test: chi2={chi2:.2f}, p={p_value:.4f}")
if p_value < self.significance_level:
cd = self.nemenyi_critical_difference()
lines.append(f"Critical difference (Nemenyi): {cd:.3f}")
return "\n".join(lines)