Source code for endgame.models.bayesian.classic.ebmc

from __future__ import annotations

"""Efficient Bayesian Multivariate Classifier (EBMC).

EBMC is a Bayesian Network Classifier that performs automatic
feature selection by identifying the Markov Blanket of the target.

Key features:
- Greedy forward selection optimizing BDeu score
- Equivalence transformation to escape local optima
- Automatic Markov Blanket pruning

References
----------
Jaeger, M. (2003). Probabilistic classifiers and the concepts they recognize.
ICML.
"""

import warnings
from typing import Any

import networkx as nx
import numpy as np
from sklearn.utils.validation import check_is_fitted

from endgame.models.bayesian.base import (
    BaseBayesianClassifier,
    ConvergenceWarning,
)
from endgame.models.bayesian.structure.learning import (
    compute_mi_scores,
    get_markov_blanket,
)
from endgame.models.bayesian.structure.scores import (
    bdeu_score,
    bic_score,
    k2_score,
)


[docs] class EBMCClassifier(BaseBayesianClassifier): """ Efficient Bayesian Multivariate Classifier with automatic feature selection. EBMC learns a Bayesian Network structure and then prunes features to those in the Markov Blanket of the target. This provides built-in feature selection while maintaining interpretability. Parameters ---------- score : {'bdeu', 'bic', 'k2'}, default='bdeu' Scoring function for structure learning. equivalent_sample_size : float, default=10.0 ESS for BDeu score. Lower = more aggressive pruning. max_parents : int, default=3 Maximum parents per node (controls complexity). max_features : int | None, default=None Maximum features to select. None = no limit. convergence_threshold : float, default=1e-4 Stop when score improvement falls below this. max_iter : int, default=100 Maximum iterations for structure search. use_equivalence_transform : bool, default=True Whether to apply statistical equivalence transformation. smoothing : float, default=1.0 Laplace smoothing for CPT estimation. random_state : int, optional Random seed. verbose : bool, default=False Enable verbose output. Attributes ---------- selected_features_ : list[int] Indices of selected features (Markov Blanket). structure_ : nx.DiGraph Learned DAG structure. cpts_ : dict Conditional probability tables. current_score_ : float Final structure score. Examples -------- >>> from endgame.models.bayesian import EBMCClassifier >>> clf = EBMCClassifier(max_parents=2) >>> clf.fit(X_train, y_train) >>> print(f"Selected {len(clf.selected_features_)} features") >>> clf.predict(X_test) """ def __init__( self, score: str = 'bdeu', equivalent_sample_size: float = 10.0, max_parents: int = 3, max_features: int | None = None, convergence_threshold: float = 1e-4, max_iter: int = 100, use_equivalence_transform: bool = True, smoothing: float = 1.0, max_cardinality: int = 100, auto_discretize: bool = True, discretizer_strategy: str = 'mdlp', discretizer_max_bins: int = 10, random_state: int | None = None, verbose: bool = False, ): super().__init__( smoothing=smoothing, max_cardinality=max_cardinality, auto_discretize=auto_discretize, discretizer_strategy=discretizer_strategy, discretizer_max_bins=discretizer_max_bins, random_state=random_state, verbose=verbose, ) self.score = score self.equivalent_sample_size = equivalent_sample_size self.max_parents = max_parents self.max_features = max_features self.convergence_threshold = convergence_threshold self.max_iter = max_iter self.use_equivalence_transform = use_equivalence_transform # Set after fit self.selected_features_: list[int] | None = None self.current_score_: float | None = None self.cpts_: dict | None = None self.class_prior_: np.ndarray | None = None def _get_score_func(self): """Get the scoring function.""" if self.score == 'bdeu': return lambda data, parents, child, cards: bdeu_score( data, parents, child, cards, self.equivalent_sample_size ) elif self.score == 'bic': return bic_score elif self.score == 'k2': return k2_score else: raise ValueError(f"Unknown score: {self.score}") def _learn_structure(self, X: np.ndarray, y: np.ndarray) -> nx.DiGraph: """ Learn structure via greedy forward selection + equivalence transform. Phase 1: Greedy forward selection of features Phase 2: Equivalence transformation (if enabled) Phase 3: Markov Blanket pruning """ n_features = X.shape[1] score_func = self._get_score_func() # Combine X and y for scoring data = np.column_stack([X, y]) target_idx = n_features # Extended cardinalities including target extended_cards = dict(self.cardinalities_) extended_cards[target_idx] = self.n_classes_ # Phase 1: Greedy forward selection self._log("Phase 1: Greedy forward selection") current_graph = nx.DiGraph() current_graph.add_node('Y') # Start with MI-ranked features mi_scores = compute_mi_scores(X, y) feature_order = np.argsort(-mi_scores) self.current_score_ = self._compute_graph_score( current_graph, data, extended_cards, score_func ) features_added = [] for iteration in range(min(n_features, self.max_iter)): best_feature = None best_score = self.current_score_ best_graph = None # Try adding each remaining feature for feat in feature_order: if feat in features_added: continue if self.max_features and len(features_added) >= self.max_features: break # Create candidate graph candidate = current_graph.copy() candidate.add_node(feat) candidate.add_edge('Y', feat) # Optionally add edges from existing features for existing in features_added: if len(list(candidate.predecessors(feat))) < self.max_parents: candidate.add_edge(existing, feat) # Score candidate candidate_score = self._compute_graph_score( candidate, data, extended_cards, score_func ) if candidate_score > best_score + self.convergence_threshold: best_score = candidate_score best_feature = feat best_graph = candidate if best_feature is None: break current_graph = best_graph self.current_score_ = best_score features_added.append(best_feature) self._log(f" Added feature {best_feature}, score: {best_score:.4f}") # Phase 2: Equivalence transformation if self.use_equivalence_transform: self._log("Phase 2: Equivalence transformation") current_graph, self.current_score_ = self._apply_equivalence_transform( current_graph, data, extended_cards, score_func ) # Phase 3: Markov Blanket pruning self._log("Phase 3: Markov Blanket pruning") mb_nodes = get_markov_blanket(current_graph, 'Y') mb_nodes.add('Y') # Keep only MB nodes self.selected_features_ = sorted([n for n in mb_nodes if isinstance(n, int)]) if len(self.selected_features_) == 0: # No features selected - fall back to top MI features self.selected_features_ = feature_order[:min(3, n_features)].tolist() for feat in self.selected_features_: if feat not in current_graph: current_graph.add_node(feat) current_graph.add_edge('Y', feat) # Compute feature importances self.feature_importances_ = np.zeros(n_features) for i, feat in enumerate(self.selected_features_): self.feature_importances_[feat] = mi_scores[feat] if self.feature_importances_.sum() > 0: self.feature_importances_ /= self.feature_importances_.sum() self._log(f"Selected {len(self.selected_features_)} features: {self.selected_features_}") # Return subgraph with selected features final_nodes = set(self.selected_features_) | {'Y'} final_graph = current_graph.subgraph(final_nodes).copy() return final_graph def _compute_graph_score( self, graph: nx.DiGraph, data: np.ndarray, cardinalities: dict, score_func, ) -> float: """Compute total score for a graph.""" total = 0.0 for node in graph.nodes(): if node == 'Y': continue parents = list(graph.predecessors(node)) # Convert 'Y' to target index parent_indices = [] for p in parents: if p == 'Y': parent_indices.append(data.shape[1] - 1) else: parent_indices.append(p) total += score_func(data, parent_indices, node, cardinalities) return total def _apply_equivalence_transform( self, graph: nx.DiGraph, data: np.ndarray, cardinalities: dict, score_func, ) -> tuple[nx.DiGraph, float]: """ Apply statistical equivalence transformation to find better structure. Uses edge reversals and additions that maintain equivalence class. """ current_graph = graph.copy() current_score = self._compute_graph_score( current_graph, data, cardinalities, score_func ) improved = True iterations = 0 while improved and iterations < self.max_iter: improved = False iterations += 1 # Try edge reversals edges = list(current_graph.edges()) for u, v in edges: if u == 'Y' or v == 'Y': continue # Don't reverse edges involving class # Check if reversal is valid test_graph = current_graph.copy() test_graph.remove_edge(u, v) test_graph.add_edge(v, u) # Check max parents constraint if len(list(test_graph.predecessors(u))) > self.max_parents: continue # Check DAG property if not nx.is_directed_acyclic_graph(test_graph): continue test_score = self._compute_graph_score( test_graph, data, cardinalities, score_func ) if test_score > current_score + self.convergence_threshold: current_graph = test_graph current_score = test_score improved = True break # Try edge additions (covered edges) if not improved: nodes = [n for n in current_graph.nodes() if n != 'Y'] for u in nodes: for v in nodes: if u == v or current_graph.has_edge(u, v): continue if len(list(current_graph.predecessors(v))) >= self.max_parents: continue test_graph = current_graph.copy() test_graph.add_edge(u, v) if not nx.is_directed_acyclic_graph(test_graph): continue test_score = self._compute_graph_score( test_graph, data, cardinalities, score_func ) if test_score > current_score + self.convergence_threshold: current_graph = test_graph current_score = test_score improved = True break if improved: break if iterations >= self.max_iter: warnings.warn( f"Equivalence transform did not converge in {self.max_iter} iterations", ConvergenceWarning ) return current_graph, current_score def _learn_parameters( self, X: np.ndarray, y: np.ndarray, structure: nx.DiGraph, ) -> None: """Learn CPTs for the selected features.""" self.cpts_ = {} # Class prior y_counts = np.bincount(y.astype(int), minlength=self.n_classes_) self.class_prior_ = (y_counts + self.smoothing) / ( len(y) + self.smoothing * self.n_classes_ ) # CPT for each selected feature for node in structure.nodes(): if node == 'Y': continue parents = list(structure.predecessors(node)) self.cpts_[node] = self._estimate_cpt(X, y, node, parents) def _estimate_cpt( self, X: np.ndarray, y: np.ndarray, node: int, parents: list, ) -> np.ndarray: """Estimate CPT with Laplace smoothing.""" node_card = self.cardinalities_[node] has_class_parent = 'Y' in parents feature_parents = [p for p in parents if p != 'Y'] if not parents: counts = np.bincount(X[:, node].astype(int), minlength=node_card) return (counts + self.smoothing) / (len(X) + self.smoothing * node_card) # Build shape parent_cards = [] if has_class_parent: parent_cards.append(self.n_classes_) for p in feature_parents: parent_cards.append(self.cardinalities_[p]) shape = [node_card] + parent_cards counts = np.zeros(shape) for i in range(X.shape[0]): # Clamp node value to valid range node_val = min(max(int(X[i, node]), 0), node_card - 1) idx = [node_val] if has_class_parent: idx.append(int(y[i])) for p in feature_parents: # Clamp parent value to valid range p_card = self.cardinalities_[p] p_val = min(max(int(X[i, p]), 0), p_card - 1) idx.append(p_val) counts[tuple(idx)] += 1 parent_totals = counts.sum(axis=0, keepdims=True) cpt = (counts + self.smoothing) / (parent_totals + self.smoothing * node_card) return cpt
[docs] def predict_proba(self, X) -> np.ndarray: """Predict class probabilities using learned structure.""" check_is_fitted(self, ['structure_', 'cpts_', 'class_prior_']) # Preprocess input (applies discretization if needed) X = self._preprocess_X(X) X = X.astype(int) n_samples = X.shape[0] log_proba = np.zeros((n_samples, self.n_classes_)) # Add log class prior log_proba += np.log(self.class_prior_ + 1e-10) # Add conditionals for selected features only for node in self.selected_features_: if node not in self.cpts_: continue cpt = self.cpts_[node] parents = list(self.structure_.predecessors(node)) has_class_parent = 'Y' in parents feature_parents = [p for p in parents if p != 'Y'] for c_idx in range(self.n_classes_): for i in range(n_samples): # Clamp node value to valid range node_val = min(max(int(X[i, node]), 0), cpt.shape[0] - 1) idx = [node_val] if has_class_parent: idx.append(c_idx) for p in feature_parents: # Clamp parent value to valid range p_card = self.cardinalities_.get(p, 1) p_val = min(max(int(X[i, p]), 0), p_card - 1) idx.append(p_val) try: prob = cpt[tuple(idx)] log_proba[i, c_idx] += np.log(prob + 1e-10) except IndexError: log_proba[i, c_idx] += np.log(1.0 / cpt.shape[0]) # Normalize log_proba -= log_proba.max(axis=1, keepdims=True) proba = np.exp(log_proba) proba /= proba.sum(axis=1, keepdims=True) return proba
def _get_fitted_state(self) -> dict[str, Any]: """Get fitted state for serialization.""" state = { 'selected_features': self.selected_features_, 'current_score': self.current_score_, } if self.cpts_ is not None: state['cpts'] = {str(k): v.tolist() for k, v in self.cpts_.items()} if self.class_prior_ is not None: state['class_prior'] = self.class_prior_.tolist() if self.feature_importances_ is not None: state['feature_importances'] = self.feature_importances_.tolist() return state def _set_fitted_state(self, state: dict[str, Any]) -> None: """Restore fitted state.""" self.selected_features_ = state.get('selected_features', []) self.current_score_ = state.get('current_score') if 'cpts' in state: self.cpts_ = {int(k): np.array(v) for k, v in state['cpts'].items()} if 'class_prior' in state: self.class_prior_ = np.array(state['class_prior']) if 'feature_importances' in state: self.feature_importances_ = np.array(state['feature_importances'])