Source code for endgame.benchmark.tracker

from __future__ import annotations

"""Experiment tracking for benchmark runs.

Records pipeline configurations, hyperparameters, metrics, and results
for systematic analysis. Supports master database functionality for
meta-learning dataset accumulation.
"""

import hashlib
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any

import numpy as np

try:
    import polars as pl
    HAS_POLARS = True
except ImportError:
    HAS_POLARS = False

try:
    import pandas as pd
    HAS_PANDAS = True
except ImportError:
    HAS_PANDAS = False


# Default master database location
DEFAULT_MASTER_DB_PATH = Path.home() / ".endgame" / "meta_learning_db.parquet"


[docs] def get_experiment_hash( dataset_name: str, model_name: str, hyperparameters: dict[str, Any], task_type: str = "classification", ) -> str: """Generate a unique hash for an experiment configuration. This hash is used to detect duplicate experiments in the master database. Two experiments are considered duplicates if they have the same: - dataset name - model name - hyperparameters - task type Parameters ---------- dataset_name : str Name of the dataset. model_name : str Name of the model/pipeline. hyperparameters : Dict[str, Any] Model hyperparameters. task_type : str Task type (classification/regression). Returns ------- str SHA256 hash (first 16 characters) uniquely identifying this config. """ # Create a canonical representation config = { "dataset": dataset_name, "model": model_name, "task_type": task_type, "hyperparameters": _canonicalize_params(hyperparameters), } # Convert to JSON string with sorted keys for consistency config_str = json.dumps(config, sort_keys=True, default=str) # Return first 16 chars of SHA256 hash return hashlib.sha256(config_str.encode()).hexdigest()[:16]
def _canonicalize_params(params: dict[str, Any]) -> dict[str, Any]: """Convert parameters to a canonical form for hashing.""" result = {} for key, value in sorted(params.items()): if value is None: result[key] = None elif isinstance(value, (int, float, str, bool)): result[key] = value elif isinstance(value, np.ndarray): result[key] = value.tolist() elif isinstance(value, (list, tuple)): result[key] = list(value) elif isinstance(value, dict): result[key] = _canonicalize_params(value) else: # For objects, use their string representation result[key] = str(type(value).__name__) return result
[docs] @dataclass class ExperimentRecord: """Single experiment record. Attributes ---------- experiment_id : str Unique identifier for this experiment. timestamp : str ISO timestamp of when the experiment was run. dataset_name : str Name of the dataset. dataset_id : Optional[str] External ID (e.g., OpenML ID). model_name : str Name/identifier of the model or pipeline. pipeline_config : Dict Serialized pipeline configuration. hyperparameters : Dict Model hyperparameters. metrics : Dict[str, float] Performance metrics. meta_features : Dict[str, float] Dataset meta-features. cv_scores : Optional[List[float]] Per-fold CV scores. fit_time : float Training time in seconds. predict_time : float Prediction time in seconds. memory_mb : float Peak memory usage in MB. n_samples : int Number of training samples. n_features : int Number of features. task_type : str Type of task. status : str Experiment status: "success", "failed", "timeout". error_message : Optional[str] Error message if failed. tags : List[str] User-defined tags. notes : str Additional notes. """ experiment_id: str = "" timestamp: str = "" dataset_name: str = "" dataset_id: str | None = None model_name: str = "" pipeline_config: dict[str, Any] = field(default_factory=dict) hyperparameters: dict[str, Any] = field(default_factory=dict) metrics: dict[str, float] = field(default_factory=dict) meta_features: dict[str, float] = field(default_factory=dict) cv_scores: list[float] | None = None fit_time: float = 0.0 predict_time: float = 0.0 memory_mb: float = 0.0 n_samples: int = 0 n_features: int = 0 task_type: str = "classification" status: str = "pending" error_message: str | None = None tags: list[str] = field(default_factory=list) notes: str = "" model_structure: str | None = None # Human-readable model structure (tree, rules, etc.) config_hash: str = "" # Hash for deduplication in master database def __post_init__(self): if not self.experiment_id: self.experiment_id = str(uuid.uuid4())[:8] if not self.timestamp: self.timestamp = datetime.now().isoformat() if not self.config_hash and self.dataset_name and self.model_name: self.config_hash = get_experiment_hash( self.dataset_name, self.model_name, self.hyperparameters, self.task_type, )
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary.""" return { "experiment_id": self.experiment_id, "timestamp": self.timestamp, "dataset_name": self.dataset_name, "dataset_id": self.dataset_id, "model_name": self.model_name, "pipeline_config": json.dumps(self.pipeline_config), "hyperparameters": json.dumps(self.hyperparameters), "metrics": self.metrics.copy(), "meta_features": self.meta_features.copy(), "cv_scores": self.cv_scores, "fit_time": self.fit_time, "predict_time": self.predict_time, "memory_mb": self.memory_mb, "n_samples": self.n_samples, "n_features": self.n_features, "task_type": self.task_type, "status": self.status, "error_message": self.error_message, "tags": self.tags, "notes": self.notes, "model_structure": self.model_structure, "config_hash": self.config_hash, }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> ExperimentRecord: """Create from dictionary.""" # Parse JSON fields if needed if isinstance(data.get("pipeline_config"), str): data["pipeline_config"] = json.loads(data["pipeline_config"]) if isinstance(data.get("hyperparameters"), str): data["hyperparameters"] = json.loads(data["hyperparameters"]) return cls(**data)
[docs] class ExperimentTracker: """Track and store experiment results. Provides methods for logging experiments, querying results, and exporting to various formats. Parameters ---------- name : str, default="benchmark" Name for this tracking session. auto_save : bool, default=False Automatically save after each experiment. save_path : str, optional Path for auto-saving results. Examples -------- >>> tracker = ExperimentTracker(name="my_benchmark") >>> tracker.log_experiment( ... dataset_name="iris", ... model_name="RandomForest", ... metrics={"accuracy": 0.95, "f1": 0.94}, ... hyperparameters={"n_estimators": 100}, ... ) >>> df = tracker.to_dataframe() """ def __init__( self, name: str = "benchmark", auto_save: bool = False, save_path: str | None = None, ): self.name = name self.auto_save = auto_save self.save_path = save_path self._records: list[ExperimentRecord] = [] self._start_time = datetime.now()
[docs] def log_experiment( self, dataset_name: str, model_name: str, metrics: dict[str, float], hyperparameters: dict[str, Any] | None = None, pipeline_config: dict[str, Any] | None = None, meta_features: dict[str, float] | None = None, cv_scores: list[float] | None = None, fit_time: float = 0.0, predict_time: float = 0.0, memory_mb: float = 0.0, n_samples: int = 0, n_features: int = 0, task_type: str = "classification", dataset_id: str | None = None, status: str = "success", error_message: str | None = None, tags: list[str] | None = None, notes: str = "", model_structure: str | None = None, ) -> ExperimentRecord: """Log a single experiment. Parameters ---------- dataset_name : str Name of the dataset. model_name : str Name of the model/pipeline. metrics : Dict[str, float] Performance metrics. hyperparameters : Dict, optional Model hyperparameters. pipeline_config : Dict, optional Full pipeline configuration. meta_features : Dict, optional Dataset meta-features. cv_scores : List[float], optional Per-fold CV scores. fit_time : float Training time in seconds. predict_time : float Prediction time in seconds. memory_mb : float Peak memory usage in MB. n_samples : int Number of samples. n_features : int Number of features. task_type : str Task type. dataset_id : str, optional External dataset ID. status : str Experiment status. error_message : str, optional Error message if failed. tags : List[str], optional Tags for filtering. notes : str Additional notes. Returns ------- ExperimentRecord The logged experiment record. """ record = ExperimentRecord( dataset_name=dataset_name, dataset_id=dataset_id, model_name=model_name, pipeline_config=pipeline_config or {}, hyperparameters=hyperparameters or {}, metrics=metrics, meta_features=meta_features or {}, cv_scores=cv_scores, fit_time=fit_time, predict_time=predict_time, memory_mb=memory_mb, n_samples=n_samples, n_features=n_features, task_type=task_type, status=status, error_message=error_message, tags=tags or [], notes=notes, model_structure=model_structure, ) self._records.append(record) if self.auto_save and self.save_path: self.save(self.save_path) return record
[docs] def log_failure( self, dataset_name: str, model_name: str, error_message: str, **kwargs, ) -> ExperimentRecord: """Log a failed experiment.""" return self.log_experiment( dataset_name=dataset_name, model_name=model_name, metrics={}, status="failed", error_message=error_message, **kwargs, )
@property def records(self) -> list[ExperimentRecord]: """Get all experiment records.""" return self._records def __len__(self) -> int: return len(self._records) def __iter__(self): return iter(self._records)
[docs] def get_by_dataset(self, dataset_name: str) -> list[ExperimentRecord]: """Get records for a specific dataset.""" return [r for r in self._records if r.dataset_name == dataset_name]
[docs] def get_by_model(self, model_name: str) -> list[ExperimentRecord]: """Get records for a specific model.""" return [r for r in self._records if r.model_name == model_name]
[docs] def get_by_tag(self, tag: str) -> list[ExperimentRecord]: """Get records with a specific tag.""" return [r for r in self._records if tag in r.tags]
[docs] def get_successful(self) -> list[ExperimentRecord]: """Get successful experiments only.""" return [r for r in self._records if r.status == "success"]
[docs] def to_dataframe(self, include_meta_features: bool = True): """Convert to DataFrame. Parameters ---------- include_meta_features : bool, default=True Include meta-features as columns. Returns ------- DataFrame Polars DataFrame (or Pandas if Polars unavailable). """ rows = [] for record in self._records: row = { "experiment_id": record.experiment_id, "timestamp": record.timestamp, "dataset_name": record.dataset_name, "dataset_id": record.dataset_id, "model_name": record.model_name, "pipeline_config": json.dumps(record.pipeline_config), "hyperparameters": json.dumps(record.hyperparameters), "fit_time": record.fit_time, "predict_time": record.predict_time, "memory_mb": record.memory_mb, "n_samples": record.n_samples, "n_features": record.n_features, "task_type": record.task_type, "status": record.status, "error_message": record.error_message, "tags": ",".join(record.tags), "notes": record.notes, "config_hash": record.config_hash, } # Add metrics as separate columns (ensure float type for consistency) for metric_name, value in record.metrics.items(): # Convert to float to ensure consistent types across records try: row[f"metric_{metric_name}"] = float(value) if value is not None else None except (TypeError, ValueError): row[f"metric_{metric_name}"] = None # Add CV scores summary if record.cv_scores: row["cv_mean"] = np.mean(record.cv_scores) row["cv_std"] = np.std(record.cv_scores) # Add meta-features (ensure float type for consistency) if include_meta_features: for mf_name, value in record.meta_features.items(): try: row[f"mf_{mf_name}"] = float(value) if value is not None else None except (TypeError, ValueError): row[f"mf_{mf_name}"] = None rows.append(row) if not rows: # Return empty DataFrame with minimal schema if HAS_POLARS: return pl.DataFrame() elif HAS_PANDAS: return pd.DataFrame() else: raise ImportError("Either polars or pandas is required") # Collect all unique columns across all rows to ensure consistent schema all_columns = set() for row in rows: all_columns.update(row.keys()) # Fill missing columns with None for each row for row in rows: for col in all_columns: if col not in row: row[col] = None if HAS_POLARS: # Use infer_schema_length=None to check all rows for schema inference return pl.DataFrame(rows, infer_schema_length=None) elif HAS_PANDAS: return pd.DataFrame(rows) else: raise ImportError("Either polars or pandas is required")
[docs] def to_dict_list(self) -> list[dict[str, Any]]: """Convert to list of dictionaries.""" return [r.to_dict() for r in self._records]
[docs] def save(self, path: str, append: bool = False, deduplicate: bool = True) -> None: """Save results to file. Parameters ---------- path : str Output path. Supports: .parquet, .csv, .json append : bool, default=False If True and file exists, append new records to existing file. If False, overwrite existing file. deduplicate : bool, default=True When appending, skip records with duplicate config_hash. """ # Handle append mode if append and Path(path).exists(): # Load existing records existing = ExperimentTracker(name="existing") try: existing.load(path) # Merge new records into existing existing.merge(self, deduplicate=deduplicate) # Use merged tracker for saving tracker_to_save = existing except Exception as e: # If loading fails, fall back to overwrite print(f"Warning: Could not load existing file for append: {e}. Overwriting instead.") tracker_to_save = self else: tracker_to_save = self df = tracker_to_save.to_dataframe() if path.endswith(".parquet"): if HAS_POLARS: df.write_parquet(path) else: df.to_parquet(path, index=False) elif path.endswith(".csv"): if HAS_POLARS: df.write_csv(path) else: df.to_csv(path, index=False) elif path.endswith(".json"): import json with open(path, "w") as f: json.dump(tracker_to_save.to_dict_list(), f, indent=2, default=str) else: raise ValueError(f"Unsupported format: {path}")
[docs] def load(self, path: str) -> ExperimentTracker: """Load results from file. Parameters ---------- path : str Input path. Returns ------- self """ if path.endswith(".json"): import json with open(path) as f: data = json.load(f) self._records = [ExperimentRecord.from_dict(d) for d in data] else: if HAS_POLARS: if path.endswith(".parquet"): df = pl.read_parquet(path) else: df = pl.read_csv(path) data = df.to_dicts() else: if path.endswith(".parquet"): df = pd.read_parquet(path) else: df = pd.read_csv(path) data = df.to_dict(orient="records") # Reconstruct records from DataFrame format self._records = [] for row in data: record = self._row_to_record(row) self._records.append(record) return self
def _row_to_record(self, row: dict[str, Any]) -> ExperimentRecord: """Convert DataFrame row to ExperimentRecord.""" # Extract metrics metrics = {} meta_features = {} for key, value in row.items(): if key.startswith("metric_") and value is not None: metrics[key[7:]] = float(value) elif key.startswith("mf_") and value is not None: meta_features[key[3:]] = float(value) # Parse JSON fields pipeline_config = {} hyperparameters = {} if row.get("pipeline_config"): try: pipeline_config = json.loads(row["pipeline_config"]) except (json.JSONDecodeError, TypeError): pass if row.get("hyperparameters"): try: hyperparameters = json.loads(row["hyperparameters"]) except (json.JSONDecodeError, TypeError): pass # Parse tags tags = [] if row.get("tags"): tags = row["tags"].split(",") if isinstance(row["tags"], str) else [] return ExperimentRecord( experiment_id=row.get("experiment_id", ""), timestamp=row.get("timestamp", ""), dataset_name=row.get("dataset_name", ""), dataset_id=row.get("dataset_id"), model_name=row.get("model_name", ""), pipeline_config=pipeline_config, hyperparameters=hyperparameters, metrics=metrics, meta_features=meta_features, fit_time=float(row.get("fit_time", 0)), predict_time=float(row.get("predict_time", 0)), memory_mb=float(row.get("memory_mb", 0)), n_samples=int(row.get("n_samples", 0)), n_features=int(row.get("n_features", 0)), task_type=row.get("task_type", "classification"), status=row.get("status", "unknown"), error_message=row.get("error_message"), tags=tags, notes=row.get("notes", ""), config_hash=row.get("config_hash", ""), )
[docs] def summary(self) -> str: """Get summary of tracked experiments.""" successful = self.get_successful() datasets = set(r.dataset_name for r in self._records) models = set(r.model_name for r in self._records) lines = [ f"Experiment Tracker: {self.name}", "=" * 50, f"Total experiments: {len(self._records)}", f"Successful: {len(successful)}", f"Failed: {len(self._records) - len(successful)}", f"Datasets: {len(datasets)}", f"Models: {len(models)}", ] if successful: # Get available metrics metrics = set() for r in successful: metrics.update(r.metrics.keys()) lines.append(f"\nMetrics tracked: {', '.join(sorted(metrics))}") # Best scores per metric for metric in sorted(metrics): scores = [(r.model_name, r.dataset_name, r.metrics.get(metric, float('-inf'))) for r in successful if metric in r.metrics] if scores: best = max(scores, key=lambda x: x[2]) lines.append(f"Best {metric}: {best[2]:.4f} ({best[0]} on {best[1]})") return "\n".join(lines)
[docs] def clear(self) -> None: """Clear all records.""" self._records = []
[docs] def get_config_hashes(self) -> set: """Get set of all config hashes in the tracker.""" return {r.config_hash for r in self._records if r.config_hash}
[docs] def merge( self, other: ExperimentTracker, deduplicate: bool = True, ) -> ExperimentTracker: """Merge another tracker into this one. Parameters ---------- other : ExperimentTracker Tracker to merge. deduplicate : bool, default=True Skip records with duplicate config_hash. Returns ------- self """ if deduplicate: existing_hashes = self.get_config_hashes() for record in other._records: if record.config_hash not in existing_hashes: self._records.append(record) existing_hashes.add(record.config_hash) else: self._records.extend(other._records) return self
[docs] def save_to_master( self, path: str | Path | None = None, deduplicate: bool = True, ) -> int: """Save results to master database, appending to existing records. This is the primary method for building a meta-learning dataset. New experiments are appended to the master database, with duplicate configurations (same dataset + model + hyperparameters) skipped. Parameters ---------- path : str or Path, optional Path to master database. Defaults to ~/.endgame/meta_learning_db.parquet deduplicate : bool, default=True Skip records with duplicate config_hash. Returns ------- int Number of new records added. Examples -------- >>> tracker = ExperimentTracker() >>> # ... run experiments ... >>> n_added = tracker.save_to_master() >>> print(f"Added {n_added} new experiments to master database") """ if path is None: path = DEFAULT_MASTER_DB_PATH path = Path(path) # Create directory if needed path.parent.mkdir(parents=True, exist_ok=True) # Load existing master database existing = ExperimentTracker(name="master_db") if path.exists(): try: existing.load(str(path)) except Exception as e: print(f"Warning: Could not load existing master DB: {e}") initial_count = len(existing) # Merge new records existing.merge(self, deduplicate=deduplicate) n_added = len(existing) - initial_count # Save back to master if n_added > 0 or not path.exists(): existing.save(str(path)) return n_added
[docs] @classmethod def load_master( cls, path: str | Path | None = None, ) -> ExperimentTracker: """Load the master meta-learning database. Parameters ---------- path : str or Path, optional Path to master database. Defaults to ~/.endgame/meta_learning_db.parquet Returns ------- ExperimentTracker Tracker with all historical experiments. Examples -------- >>> tracker = ExperimentTracker.load_master() >>> print(f"Master database has {len(tracker)} experiments") """ if path is None: path = DEFAULT_MASTER_DB_PATH path = Path(path) tracker = cls(name="master_db") if path.exists(): tracker.load(str(path)) return tracker
[docs] @staticmethod def get_master_db_path() -> Path: """Get the default master database path. Returns ------- Path Default path: ~/.endgame/meta_learning_db.parquet """ return DEFAULT_MASTER_DB_PATH
[docs] def filter_existing( self, master_path: str | Path | None = None, ) -> list[tuple[str, str]]: """Find which (dataset, model) pairs already exist in master DB. Useful for skipping already-benchmarked combinations. Parameters ---------- master_path : str or Path, optional Path to master database. Returns ------- List[Tuple[str, str]] List of (dataset_name, model_name) pairs that exist. """ master = self.load_master(master_path) existing_hashes = master.get_config_hashes() existing_pairs = [] for record in master._records: if record.config_hash in existing_hashes: existing_pairs.append((record.dataset_name, record.model_name)) return existing_pairs
def serialize_pipeline(pipeline) -> dict[str, Any]: """Serialize a sklearn pipeline to a dictionary. Parameters ---------- pipeline : estimator Sklearn estimator or pipeline. Returns ------- Dict[str, Any] Serialized configuration. """ config = { "type": type(pipeline).__name__, "module": type(pipeline).__module__, } # Handle sklearn Pipeline if hasattr(pipeline, "named_steps"): config["steps"] = [] for name, estimator in pipeline.named_steps.items(): step_config = { "name": name, "type": type(estimator).__name__, "params": _serialize_params(estimator.get_params()), } config["steps"].append(step_config) else: # Single estimator config["params"] = _serialize_params(pipeline.get_params()) return config def _serialize_params(params: dict[str, Any]) -> dict[str, Any]: """Serialize parameters to JSON-compatible format.""" result = {} for key, value in params.items(): if value is None or isinstance(value, (int, float, str, bool)): result[key] = value elif isinstance(value, np.ndarray): result[key] = value.tolist() elif isinstance(value, (list, tuple)): result[key] = [_serialize_value(v) for v in value] elif isinstance(value, dict): result[key] = _serialize_params(value) elif hasattr(value, "get_params"): # Nested estimator result[key] = { "type": type(value).__name__, "params": _serialize_params(value.get_params()), } else: result[key] = str(value) return result def _serialize_value(value: Any) -> Any: """Serialize a single value.""" if value is None or isinstance(value, (int, float, str, bool)): return value elif isinstance(value, np.ndarray): return value.tolist() elif hasattr(value, "get_params"): return {"type": type(value).__name__} else: return str(value)