Source code for endgame.models.bayesian.classic.eskdb

from __future__ import annotations

"""Ensemble of Selective K-Dependence Bayes (ESKDB) Classifier.

ESKDB is a state-of-the-art ensemble method for Bayesian Network
Classification. It combines multiple KDB classifiers with diversity
through Stochastic Attribute Ordering (SAO) and/or bootstrapping.

Key features:
- K-Dependence structure (generalizes NB, TAN)
- Hierarchical Dirichlet Process smoothing
- Ensemble diversity via SAO or bootstrap

References
----------
Webb, G. I., Boughton, J. R., & Wang, Z. (2005).
Not So Naive Bayes: Aggregating One-Dependence Estimators.
Machine Learning, 58(1).
"""

from concurrent.futures import ThreadPoolExecutor
from typing import Any

import networkx as nx
import numpy as np
from sklearn.utils.validation import check_is_fitted, check_X_y

from endgame.models.bayesian.base import (
    BaseBayesianClassifier,
)
from endgame.models.bayesian.structure.learning import (
    compute_mi_scores,
)


[docs] class KDBClassifier(BaseBayesianClassifier): """ K-Dependence Bayes Classifier. KDB allows each feature to have at most K feature parents plus the class as a parent. This generalizes: - K=0: Naive Bayes - K=1: One-Dependence Estimator (AODE) - K>=n_features: Unrestricted (but computationally expensive) Parameters ---------- k : int, default=2 Maximum number of feature parents per node. smoothing : {'laplace', 'hdp'}, default='laplace' Smoothing method: - 'laplace': Standard add-alpha smoothing - 'hdp': Hierarchical Dirichlet Process smoothing_alpha : float, default=1.0 Smoothing parameter (for Laplace). attribute_order : list[int] | None, default=None Custom attribute ordering. If None, uses MI ranking. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. """ def __init__( self, k: int = 2, smoothing: str = 'laplace', smoothing_alpha: float = 1.0, attribute_order: list[int] | None = None, max_cardinality: int = 100, auto_discretize: bool = True, discretizer_strategy: str = 'mdlp', discretizer_max_bins: int = 10, random_state: int | None = None, verbose: bool = False, ): super().__init__( smoothing=smoothing_alpha, max_cardinality=max_cardinality, auto_discretize=auto_discretize, discretizer_strategy=discretizer_strategy, discretizer_max_bins=discretizer_max_bins, random_state=random_state, verbose=verbose, ) self.k = k self.smoothing_method = smoothing self.smoothing_alpha = smoothing_alpha self.attribute_order = attribute_order self.cpts_: dict | None = None self.class_prior_: np.ndarray | None = None self.mi_scores_: np.ndarray | None = None def _learn_structure(self, X: np.ndarray, y: np.ndarray) -> nx.DiGraph: """Build KDB structure with at most K feature parents.""" n_features = X.shape[1] # Compute MI scores self.mi_scores_ = compute_mi_scores(X, y) # Get attribute order if self.attribute_order is not None: feature_order = np.array(self.attribute_order) else: feature_order = np.argsort(-self.mi_scores_) # Build structure structure = nx.DiGraph() structure.add_node('Y') for rank, feature in enumerate(feature_order): structure.add_node(feature) structure.add_edge('Y', feature) # Add up to K parents from higher-ranked features if rank > 0 and self.k > 0: candidates = feature_order[:rank] # Score by CMI (approximated by MI for efficiency) parent_scores = [(c, self.mi_scores_[c]) for c in candidates] parent_scores.sort(key=lambda x: x[1], reverse=True) for parent, _ in parent_scores[:self.k]: structure.add_edge(parent, feature) # Feature importances self.feature_importances_ = self.mi_scores_ / (self.mi_scores_.sum() + 1e-10) return structure def _learn_parameters( self, X: np.ndarray, y: np.ndarray, structure: nx.DiGraph, ) -> None: """Learn CPTs with specified smoothing.""" self.cpts_ = {} # Class prior y_counts = np.bincount(y.astype(int), minlength=self.n_classes_) self.class_prior_ = (y_counts + self.smoothing) / ( len(y) + self.smoothing * self.n_classes_ ) for node in structure.nodes(): if node == 'Y': continue parents = list(structure.predecessors(node)) if self.smoothing_method == 'hdp': self.cpts_[node] = self._estimate_cpt_hdp(X, y, node, parents) else: self.cpts_[node] = self._estimate_cpt_laplace(X, y, node, parents) def _estimate_cpt_laplace( self, X: np.ndarray, y: np.ndarray, node: int, parents: list, ) -> np.ndarray: """Standard Laplace smoothing.""" node_card = self.cardinalities_[node] has_class_parent = 'Y' in parents feature_parents = [p for p in parents if p != 'Y'] if not parents: counts = np.bincount(X[:, node].astype(int), minlength=node_card) return (counts + self.smoothing_alpha) / ( len(X) + self.smoothing_alpha * node_card ) parent_cards = [] if has_class_parent: parent_cards.append(self.n_classes_) for p in feature_parents: parent_cards.append(self.cardinalities_[p]) shape = [node_card] + parent_cards counts = np.zeros(shape) for i in range(X.shape[0]): # Clamp node value to valid range node_val = min(max(int(X[i, node]), 0), node_card - 1) idx = [node_val] if has_class_parent: idx.append(int(y[i])) for p in feature_parents: # Clamp parent value to valid range p_card = self.cardinalities_[p] p_val = min(max(int(X[i, p]), 0), p_card - 1) idx.append(p_val) counts[tuple(idx)] += 1 parent_totals = counts.sum(axis=0, keepdims=True) cpt = (counts + self.smoothing_alpha) / ( parent_totals + self.smoothing_alpha * node_card ) return cpt def _estimate_cpt_hdp( self, X: np.ndarray, y: np.ndarray, node: int, parents: list, ) -> np.ndarray: """ Hierarchical Dirichlet Process smoothing. Adapts smoothing based on: - Local sparsity - Parent configuration frequency - Global distribution """ node_card = self.cardinalities_[node] has_class_parent = 'Y' in parents feature_parents = [p for p in parents if p != 'Y'] if not parents: counts = np.bincount(X[:, node].astype(int), minlength=node_card) # Global distribution as base global_prob = (counts + 1) / (len(X) + node_card) return global_prob parent_cards = [] if has_class_parent: parent_cards.append(self.n_classes_) for p in feature_parents: parent_cards.append(self.cardinalities_[p]) shape = [node_card] + parent_cards counts = np.zeros(shape) for i in range(X.shape[0]): # Clamp node value to valid range node_val = min(max(int(X[i, node]), 0), node_card - 1) idx = [node_val] if has_class_parent: idx.append(int(y[i])) for p in feature_parents: # Clamp parent value to valid range p_card = self.cardinalities_[p] p_val = min(max(int(X[i, p]), 0), p_card - 1) idx.append(p_val) counts[tuple(idx)] += 1 # Compute global (marginal) distribution X_node_clamped = np.clip(X[:, node].astype(int), 0, node_card - 1) global_counts = np.bincount(X_node_clamped, minlength=node_card) global_prob = (global_counts + 1) / (len(X) + node_card) # HDP smoothing: blend local with global based on sample count parent_totals = counts.sum(axis=0, keepdims=True) # Concentration parameter (adapts to sparsity) # More samples -> less smoothing concentration = self.smoothing_alpha * node_card # HDP formula: P = (N_local + concentration * P_global) / (N_parent + concentration) cpt = (counts + concentration * global_prob.reshape([-1] + [1] * len(parent_cards))) / ( parent_totals + concentration ) return cpt
[docs] def predict_proba(self, X) -> np.ndarray: """Predict class probabilities.""" check_is_fitted(self, ['structure_', 'cpts_', 'class_prior_']) # Preprocess input (applies discretization if needed) X = self._preprocess_X(X) X = X.astype(int) n_samples = X.shape[0] log_proba = np.zeros((n_samples, self.n_classes_)) log_proba += np.log(self.class_prior_ + 1e-10) for node in range(self.n_features_in_): if node not in self.cpts_: continue cpt = self.cpts_[node] parents = list(self.structure_.predecessors(node)) has_class_parent = 'Y' in parents feature_parents = [p for p in parents if p != 'Y'] for c_idx in range(self.n_classes_): for i in range(n_samples): # Clamp node value to valid range node_val = min(max(int(X[i, node]), 0), cpt.shape[0] - 1) idx = [node_val] if has_class_parent: idx.append(c_idx) for p in feature_parents: # Clamp parent value to valid range p_card = self.cardinalities_.get(p, 1) p_val = min(max(int(X[i, p]), 0), p_card - 1) idx.append(p_val) try: prob = cpt[tuple(idx)] log_proba[i, c_idx] += np.log(prob + 1e-10) except IndexError: log_proba[i, c_idx] += np.log(1.0 / cpt.shape[0]) log_proba -= log_proba.max(axis=1, keepdims=True) proba = np.exp(log_proba) proba /= proba.sum(axis=1, keepdims=True) return proba
def _get_fitted_state(self) -> dict[str, Any]: """Get fitted state.""" state = {} if self.cpts_: state['cpts'] = {str(k): v.tolist() for k, v in self.cpts_.items()} if self.class_prior_ is not None: state['class_prior'] = self.class_prior_.tolist() if self.mi_scores_ is not None: state['mi_scores'] = self.mi_scores_.tolist() if self.feature_importances_ is not None: state['feature_importances'] = self.feature_importances_.tolist() return state def _set_fitted_state(self, state: dict[str, Any]) -> None: """Restore fitted state.""" if 'cpts' in state: self.cpts_ = {int(k): np.array(v) for k, v in state['cpts'].items()} if 'class_prior' in state: self.class_prior_ = np.array(state['class_prior']) if 'mi_scores' in state: self.mi_scores_ = np.array(state['mi_scores']) if 'feature_importances' in state: self.feature_importances_ = np.array(state['feature_importances'])
[docs] class ESKDBClassifier(BaseBayesianClassifier): """ Ensemble of Selective K-Dependence Bayes classifiers. ESKDB is a state-of-the-art BNC ensemble that achieves diversity through Stochastic Attribute Ordering (SAO) and/or bootstrapping. Parameters ---------- n_estimators : int, default=50 Number of KDB models in ensemble. k : int, default=2 Maximum number of parent features per node (K-dependence). smoothing : {'laplace', 'hdp'}, default='hdp' - 'laplace': Standard add-alpha smoothing - 'hdp': Hierarchical Dirichlet Process (adapts to sparsity) diversity_method : {'sao', 'bootstrap', 'both'}, default='sao' How to generate ensemble diversity: - 'sao': Stochastic Attribute Ordering - 'bootstrap': Sample with replacement - 'both': Combine both methods aggregation : {'averaging', 'voting', 'stacking'}, default='averaging' How to combine predictions. n_jobs : int, default=-1 Parallelization. -1 uses all cores. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Attributes ---------- estimators_ : list[KDBClassifier] Fitted KDB models. oob_score_ : float Out-of-bag accuracy (when diversity_method includes 'bootstrap'). feature_importances_ : np.ndarray Average feature importance across estimators. Examples -------- >>> from endgame.models.bayesian import ESKDBClassifier >>> clf = ESKDBClassifier(n_estimators=50, k=2) >>> clf.fit(X_train, y_train) >>> clf.predict_proba(X_test) """ def __init__( self, n_estimators: int = 50, k: int = 2, smoothing: str = 'hdp', diversity_method: str = 'sao', aggregation: str = 'averaging', n_jobs: int = -1, max_cardinality: int = 100, auto_discretize: bool = True, discretizer_strategy: str = 'mdlp', discretizer_max_bins: int = 10, random_state: int | None = None, verbose: bool = False, ): super().__init__( max_cardinality=max_cardinality, auto_discretize=auto_discretize, discretizer_strategy=discretizer_strategy, discretizer_max_bins=discretizer_max_bins, random_state=random_state, verbose=verbose, ) self.n_estimators = n_estimators self.k = k self.smoothing_method = smoothing self.diversity_method = diversity_method self.aggregation = aggregation self.n_jobs = n_jobs self.estimators_: list[KDBClassifier] | None = None self.oob_score_: float | None = None self.oob_predictions_: np.ndarray | None = None def _learn_structure(self, X: np.ndarray, y: np.ndarray) -> nx.DiGraph: """ ESKDB doesn't learn a single structure - each estimator has its own. Return a placeholder structure for API compatibility. """ structure = nx.DiGraph() structure.add_node('Y') for i in range(X.shape[1]): structure.add_node(i) structure.add_edge('Y', i) return structure def _learn_parameters( self, X: np.ndarray, y: np.ndarray, structure: nx.DiGraph, ) -> None: """Not used for ESKDB - parameters learned by individual estimators.""" pass
[docs] def fit(self, X, y, **fit_params) -> ESKDBClassifier: """ Fit ensemble of KDB classifiers. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. Can be continuous (will be auto-discretized if auto_discretize=True) or discrete/integer-valued. y : array-like of shape (n_samples,) Target values. Returns ------- self """ X, y = check_X_y(X, y) # Store original n_features before any transformations self.n_features_in_ = X.shape[1] if self.auto_discretize: X = self._discretize_input(X, y, fit=True) elif self._needs_discretization(X): raise ValueError( "BayesianClassifiers require discrete (integer) input. " "Set auto_discretize=True or use BayesianDiscretizer to convert continuous features." ) else: self.discretizer_ = None # Shift any negative integer features to start at 0 X = self._remap_to_nonnegative(X, fit=True) X, y = self._validate_discrete_input(X, y, self.max_cardinality) self.classes_ = np.unique(y) self.n_classes_ = len(self.classes_) self._class_to_idx = {c: i for i, c in enumerate(self.classes_)} y = np.array([self._class_to_idx[v] for v in y]) self.cardinalities_ = self._compute_cardinalities(X, y) # Compute MI scores for SAO mi_scores = compute_mi_scores(X, y) self._log(f"Fitting ESKDB with {self.n_estimators} estimators...") # Initialize random state rng = np.random.RandomState(self.random_state) # Generate seeds for each estimator seeds = rng.randint(0, 2**31, size=self.n_estimators) # Prepare training arguments train_args = [] for i in range(self.n_estimators): attr_order = self._generate_attribute_order(mi_scores, seeds[i]) if self.diversity_method in ('bootstrap', 'both'): boot_idx = rng.choice(len(X), len(X), replace=True) X_train = X[boot_idx] y_train = y[boot_idx] oob_mask = ~np.isin(np.arange(len(X)), boot_idx) else: X_train = X y_train = y oob_mask = None train_args.append((X_train, y_train, attr_order, seeds[i], oob_mask)) # Train estimators self.estimators_ = [] oob_predictions = np.zeros((len(X), self.n_classes_)) oob_counts = np.zeros(len(X)) def train_single(args): X_train, y_train, attr_order, seed, _ = args est = KDBClassifier( k=self.k, smoothing=self.smoothing_method, attribute_order=attr_order.tolist(), max_cardinality=self.max_cardinality, random_state=seed, ) est.cardinalities_ = self.cardinalities_ est.n_features_in_ = self.n_features_in_ est.classes_ = self.classes_ est.n_classes_ = self.n_classes_ est.fit(X_train, y_train) return est # Train in parallel or sequential import os n_jobs = self.n_jobs if n_jobs == -1: n_jobs = os.cpu_count() or 1 if n_jobs > 1: with ThreadPoolExecutor(max_workers=n_jobs) as executor: self.estimators_ = list(executor.map(train_single, train_args)) else: for args in train_args: self.estimators_.append(train_single(args)) # Compute OOB score if using bootstrap if self.diversity_method in ('bootstrap', 'both'): for i, (_, _, _, _, oob_mask) in enumerate(train_args): if oob_mask is not None and oob_mask.any(): X_oob = X[oob_mask] proba = self.estimators_[i].predict_proba(X_oob) oob_predictions[oob_mask] += proba oob_counts[oob_mask] += 1 # Normalize and compute accuracy valid = oob_counts > 0 if valid.any(): oob_predictions[valid] /= oob_counts[valid, np.newaxis] oob_pred = self.classes_[oob_predictions[valid].argmax(axis=1)] self.oob_score_ = (oob_pred == y[valid]).mean() self.oob_predictions_ = oob_predictions # Aggregate feature importances importances = np.zeros(self.n_features_in_) for est in self.estimators_: if hasattr(est, 'feature_importances_') and est.feature_importances_ is not None: importances += est.feature_importances_ self.feature_importances_ = importances / len(self.estimators_) # Create placeholder structure self.structure_ = self._learn_structure(X, y) self._is_fitted = True return self
def _generate_attribute_order( self, mi_scores: np.ndarray, seed: int, ) -> np.ndarray: """ Stochastic Attribute Ordering (SAO). Sample attribute order with probability proportional to MI(X_i; Y). This creates structural diversity while preserving quality. """ if self.diversity_method not in ('sao', 'both'): # Deterministic ordering return np.argsort(-mi_scores) rng = np.random.RandomState(seed) # Probability proportional to MI (with small epsilon for zero-MI features) probs = mi_scores + 1e-10 probs = probs / probs.sum() # Sample without replacement order = rng.choice( len(mi_scores), size=len(mi_scores), replace=False, p=probs, ) return order
[docs] def predict_proba(self, X) -> np.ndarray: """ Predict class probabilities by aggregating estimators. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- ndarray of shape (n_samples, n_classes) Class probabilities. """ check_is_fitted(self, ['estimators_']) # Preprocess input (applies discretization if needed) X = self._preprocess_X(X) X = X.astype(int) n_samples = X.shape[0] if self.aggregation == 'averaging': # Average probabilities proba = np.zeros((n_samples, self.n_classes_)) for est in self.estimators_: proba += est.predict_proba(X) proba /= len(self.estimators_) elif self.aggregation == 'voting': # Majority voting on predictions votes = np.zeros((n_samples, self.n_classes_)) for est in self.estimators_: preds = est.predict(X) for i, pred in enumerate(preds): class_idx = np.where(self.classes_ == pred)[0][0] votes[i, class_idx] += 1 proba = votes / len(self.estimators_) else: raise ValueError(f"Unknown aggregation: {self.aggregation}") # Ensure valid probabilities proba = np.clip(proba, 1e-10, 1.0) proba /= proba.sum(axis=1, keepdims=True) return proba
def _get_fitted_state(self) -> dict[str, Any]: """Get fitted state.""" state = { 'n_estimators_actual': len(self.estimators_) if self.estimators_ else 0, 'oob_score': self.oob_score_, } if self.feature_importances_ is not None: state['feature_importances'] = self.feature_importances_.tolist() # Note: Individual estimator states could be serialized but would be large # For now, we don't serialize the full estimators return state def _set_fitted_state(self, state: dict[str, Any]) -> None: """Restore fitted state.""" self.oob_score_ = state.get('oob_score') if 'feature_importances' in state: self.feature_importances_ = np.array(state['feature_importances'])