Source code for endgame.models.bayesian.structure.autosle

from __future__ import annotations

"""AutoSLE: Automatic Structure Learning Ensemble.

AutoSLE provides scalable structure learning for massive datasets
(10k+ variables) using a divide-and-conquer approach with ensemble
voting on edges.

Key features:
- Variable partitioning via spectral clustering
- Multiple solver backends (PC, FGES, Hill Climbing)
- Edge voting for robustness
- Parallelized cluster solving

References
----------
Chickering, D. M. (2002). Optimal Structure Identification With
Greedy Search. JMLR, 3.
"""

import warnings
from concurrent.futures import ThreadPoolExecutor

import networkx as nx
import numpy as np

from endgame.models.bayesian.structure.learning import (
    greedy_hill_climbing,
)


[docs] class AutoSLE: """ Scalable structure learning for massive variable sets. AutoSLE works by: 1. Partitioning variables into manageable clusters 2. Running multiple structure learning algorithms on each cluster 3. Combining results via edge voting 4. Learning inter-cluster edges 5. Fusing into a global DAG Parameters ---------- solvers : list[str], default=['pc', 'fges', 'hc'] Base solvers to ensemble: - 'pc': PC-Stable (constraint-based, via pgmpy if available) - 'fges': Fast Greedy Equivalence Search (via causal-learn if available) - 'hc': Hill Climbing with restarts (built-in) - 'ges': Greedy Equivalence Search partition_method : {'spectral', 'correlation', 'random'}, default='spectral' How to partition variables into clusters. max_cluster_size : int, default=50 Maximum variables per cluster. edge_threshold : float, default=0.5 Minimum fraction of solvers that must agree on an edge. n_jobs : int, default=-1 Parallelization for cluster solving. -1 uses all cores. random_state : int, optional Random seed for reproducibility. verbose : bool, default=False Enable verbose output. Attributes ---------- structure_ : nx.DiGraph Learned global DAG. edge_confidence_ : dict[tuple, float] Confidence score (agreement ratio) for each edge. cluster_assignments_ : np.ndarray Which cluster each variable was assigned to. Examples -------- >>> from endgame.models.bayesian.structure import AutoSLE >>> sle = AutoSLE(max_cluster_size=30) >>> structure = sle.learn(data, variable_names=['x1', 'x2', ...]) """ def __init__( self, solvers: list[str] = None, partition_method: str = 'spectral', max_cluster_size: int = 50, edge_threshold: float = 0.5, n_jobs: int = -1, random_state: int | None = None, verbose: bool = False, ): self.solvers = solvers or ['hc'] # Default to built-in HC self.partition_method = partition_method self.max_cluster_size = max_cluster_size self.edge_threshold = edge_threshold self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose self.structure_: nx.DiGraph | None = None self.edge_confidence_: dict[tuple, float] | None = None self.cluster_assignments_: np.ndarray | None = None def _log(self, message: str) -> None: """Log message if verbose.""" if self.verbose: print(f"[AutoSLE] {message}") def _effective_n_jobs(self) -> int: """Get effective number of jobs.""" import os if self.n_jobs == -1: return os.cpu_count() or 1 return self.n_jobs
[docs] def learn( self, data: np.ndarray, variable_names: list[str] | None = None, cardinalities: dict[int, int] | None = None, ) -> nx.DiGraph: """ Learn structure from data. Parameters ---------- data : np.ndarray Shape (n_samples, n_variables). Should be discrete (integer). variable_names : list[str], optional Names for variables. Defaults to indices. cardinalities : dict[int, int], optional Number of states per variable. Auto-computed if not provided. Returns ------- nx.DiGraph Learned directed acyclic graph. """ n_samples, n_vars = data.shape self._log(f"Learning structure for {n_vars} variables, {n_samples} samples") # Auto-compute cardinalities if not provided if cardinalities is None: cardinalities = { i: len(np.unique(data[:, i])) for i in range(n_vars) } # Step 1: Partition variables self._log(f"Partitioning variables (method={self.partition_method})") clusters = self._partition(data) self.cluster_assignments_ = self._assignments_from_clusters(clusters, n_vars) self._log(f"Created {len(clusters)} clusters") # Step 2: Learn structure within each cluster (parallelized) self._log("Learning intra-cluster structures...") cluster_args = [ (data[:, cluster], cluster, cardinalities) for cluster in clusters ] if self._effective_n_jobs() > 1 and len(clusters) > 1: # Parallel execution with ThreadPoolExecutor(max_workers=self._effective_n_jobs()) as executor: cluster_results = list(executor.map(self._solve_cluster, cluster_args)) else: # Sequential execution cluster_results = [self._solve_cluster(args) for args in cluster_args] # Step 3: Ensemble within each cluster self._log("Ensembling solver results...") subgraphs = [] for results in cluster_results: subgraph = self._ensemble_edges(results) subgraphs.append(subgraph) # Step 4: Learn inter-cluster edges self._log("Learning inter-cluster edges...") cluster_representatives = self._get_representatives(data, clusters) inter_edges = self._learn_inter_cluster(data, clusters, cluster_representatives) # Step 5: Fuse into global graph self._log("Fusing into global structure...") self.structure_ = self._fuse_graphs(subgraphs, inter_edges, clusters) # Relabel with names if provided if variable_names: self.structure_ = nx.relabel_nodes( self.structure_, {i: name for i, name in enumerate(variable_names)} ) # Also update edge confidence keys if self.edge_confidence_: new_conf = {} for (u, v), conf in self.edge_confidence_.items(): u_name = variable_names[u] if isinstance(u, int) and u < len(variable_names) else u v_name = variable_names[v] if isinstance(v, int) and v < len(variable_names) else v new_conf[(u_name, v_name)] = conf self.edge_confidence_ = new_conf self._log(f"Final structure: {self.structure_.number_of_nodes()} nodes, " f"{self.structure_.number_of_edges()} edges") return self.structure_
def _partition(self, data: np.ndarray) -> list[list[int]]: """Partition variables into clusters.""" n_vars = data.shape[1] if n_vars <= self.max_cluster_size: # No need to partition return [list(range(n_vars))] if self.partition_method == 'spectral': return self._spectral_partition(data) elif self.partition_method == 'correlation': return self._correlation_partition(data) elif self.partition_method == 'random': return self._random_partition(n_vars) else: raise ValueError(f"Unknown partition method: {self.partition_method}") def _spectral_partition(self, data: np.ndarray) -> list[list[int]]: """Partition using spectral clustering on correlation matrix.""" try: from sklearn.cluster import SpectralClustering except ImportError: warnings.warn("sklearn not available, falling back to random partition") return self._random_partition(data.shape[1]) n_vars = data.shape[1] n_clusters = max(1, n_vars // self.max_cluster_size) # Compute absolute correlation as affinity corr = np.abs(np.corrcoef(data.T)) # Handle NaN (constant features) corr = np.nan_to_num(corr, nan=0.0) np.fill_diagonal(corr, 1.0) clustering = SpectralClustering( n_clusters=n_clusters, affinity='precomputed', random_state=self.random_state, ).fit(corr) clusters = [ np.where(clustering.labels_ == i)[0].tolist() for i in range(n_clusters) ] # Filter empty clusters clusters = [c for c in clusters if len(c) > 0] return clusters def _correlation_partition(self, data: np.ndarray) -> list[list[int]]: """Partition using hierarchical clustering on correlation.""" try: from scipy.cluster.hierarchy import fcluster, linkage from scipy.spatial.distance import squareform except ImportError: warnings.warn("scipy not available, falling back to random partition") return self._random_partition(data.shape[1]) n_vars = data.shape[1] # Compute correlation-based distance corr = np.corrcoef(data.T) corr = np.nan_to_num(corr, nan=0.0) dist = 1 - np.abs(corr) dist = (dist + dist.T) / 2 # Force symmetry for squareform np.fill_diagonal(dist, 0) # Hierarchical clustering condensed = squareform(dist) Z = linkage(condensed, method='average') # Cut to get desired number of clusters n_clusters = max(1, n_vars // self.max_cluster_size) labels = fcluster(Z, n_clusters, criterion='maxclust') clusters = [ np.where(labels == i)[0].tolist() for i in range(1, n_clusters + 1) ] return [c for c in clusters if len(c) > 0] def _random_partition(self, n_vars: int) -> list[list[int]]: """Random partition of variables.""" rng = np.random.RandomState(self.random_state) indices = rng.permutation(n_vars) clusters = [] for i in range(0, n_vars, self.max_cluster_size): cluster = indices[i:i + self.max_cluster_size].tolist() if cluster: clusters.append(cluster) return clusters def _assignments_from_clusters( self, clusters: list[list[int]], n_vars: int, ) -> np.ndarray: """Convert cluster list to assignment array.""" assignments = np.zeros(n_vars, dtype=int) for i, cluster in enumerate(clusters): for var in cluster: assignments[var] = i return assignments def _solve_cluster( self, args: tuple[np.ndarray, list[int], dict[int, int]], ) -> dict[str, nx.DiGraph]: """Run all solvers on a single cluster.""" cluster_data, cluster_indices, full_cardinalities = args # Map cardinalities to local indices local_cards = { i: full_cardinalities.get(cluster_indices[i], len(np.unique(cluster_data[:, i]))) for i in range(len(cluster_indices)) } results = {} for solver_name in self.solvers: try: graph = self._run_solver(solver_name, cluster_data, local_cards) # Remap node indices to global indices graph = nx.relabel_nodes( graph, {i: cluster_indices[i] for i in range(len(cluster_indices))} ) results[solver_name] = graph except Exception as e: # Log but don't fail - ensemble can handle missing solvers if self.verbose: print(f"[AutoSLE] Solver {solver_name} failed: {e}") results[solver_name] = nx.DiGraph() return results def _run_solver( self, solver_name: str, data: np.ndarray, cardinalities: dict[int, int], ) -> nx.DiGraph: """Run a single structure learning solver.""" if solver_name == 'hc': return greedy_hill_climbing( data, cardinalities, score_type='bdeu', max_parents=3, random_state=self.random_state, ) elif solver_name == 'pc': try: import pandas as pd from pgmpy.estimators import PC from pgmpy.models import BayesianNetwork except ImportError: raise ImportError("PC solver requires pgmpy: pip install pgmpy") df = pd.DataFrame(data) pc = PC(df) model = pc.estimate() return nx.DiGraph(model.edges()) elif solver_name in ('fges', 'ges'): try: from causallearn.search.ScoreBased.GES import ges except ImportError: raise ImportError("GES solver requires causal-learn: pip install causal-learn") record = ges(data) G = record['G'] # Convert causal-learn graph to networkx graph = nx.DiGraph() for i in range(data.shape[1]): graph.add_node(i) # Parse adjacency from causal-learn format adj = G.graph for i in range(adj.shape[0]): for j in range(adj.shape[1]): if adj[i, j] == 1 and adj[j, i] == -1: graph.add_edge(i, j) elif adj[i, j] == -1 and adj[j, i] == 1: graph.add_edge(j, i) return graph else: raise ValueError(f"Unknown solver: {solver_name}") def _ensemble_edges( self, solver_results: dict[str, nx.DiGraph], ) -> nx.DiGraph: """Combine solver outputs via edge voting.""" edge_votes: dict[tuple, int] = {} for solver_name, graph in solver_results.items(): for u, v in graph.edges(): edge = (u, v) edge_votes[edge] = edge_votes.get(edge, 0) + 1 # Keep edges that pass threshold n_solvers = len([g for g in solver_results.values() if g.number_of_edges() > 0]) if n_solvers == 0: n_solvers = 1 final_graph = nx.DiGraph() # Add all nodes all_nodes = set() for graph in solver_results.values(): all_nodes.update(graph.nodes()) for node in all_nodes: final_graph.add_node(node) # Initialize edge confidence dict if self.edge_confidence_ is None: self.edge_confidence_ = {} for edge, votes in edge_votes.items(): confidence = votes / n_solvers if confidence >= self.edge_threshold: # Check for cycles before adding final_graph.add_edge(*edge) if not nx.is_directed_acyclic_graph(final_graph): final_graph.remove_edge(*edge) else: self.edge_confidence_[edge] = confidence return final_graph def _get_representatives( self, data: np.ndarray, clusters: list[list[int]], ) -> list[int]: """Get representative variable for each cluster (highest variance).""" representatives = [] for cluster in clusters: if len(cluster) == 0: continue cluster_data = data[:, cluster] variances = np.var(cluster_data, axis=0) best_idx = np.argmax(variances) representatives.append(cluster[best_idx]) return representatives def _learn_inter_cluster( self, data: np.ndarray, clusters: list[list[int]], representatives: list[int], ) -> list[tuple[int, int]]: """Learn edges between clusters using representatives.""" if len(clusters) <= 1: return [] # Extract representative data rep_data = data[:, representatives] rep_cards = { i: len(np.unique(rep_data[:, i])) for i in range(len(representatives)) } # Run structure learning on representatives try: rep_graph = greedy_hill_climbing( rep_data, rep_cards, score_type='bdeu', max_parents=2, random_state=self.random_state, ) except Exception: return [] # Map edges back to original indices inter_edges = [] for u, v in rep_graph.edges(): orig_u = representatives[u] orig_v = representatives[v] inter_edges.append((orig_u, orig_v)) return inter_edges def _fuse_graphs( self, subgraphs: list[nx.DiGraph], inter_edges: list[tuple[int, int]], clusters: list[list[int]], ) -> nx.DiGraph: """Fuse subgraphs and inter-cluster edges into global DAG.""" global_graph = nx.DiGraph() # Add all nodes from subgraphs for subgraph in subgraphs: global_graph.add_nodes_from(subgraph.nodes()) global_graph.add_edges_from(subgraph.edges()) # Add inter-cluster edges (check for cycles) for u, v in inter_edges: if u not in global_graph: global_graph.add_node(u) if v not in global_graph: global_graph.add_node(v) global_graph.add_edge(u, v) if not nx.is_directed_acyclic_graph(global_graph): global_graph.remove_edge(u, v) # Try reverse direction global_graph.add_edge(v, u) if not nx.is_directed_acyclic_graph(global_graph): global_graph.remove_edge(v, u) return global_graph
[docs] def get_edge_confidence(self, u: int, v: int) -> float: """ Get confidence score for an edge. Parameters ---------- u, v : int Source and target node indices. Returns ------- float Confidence score (0-1), or 0 if edge doesn't exist. """ if self.edge_confidence_ is None: return 0.0 return self.edge_confidence_.get((u, v), 0.0)
[docs] def get_highly_confident_edges( self, min_confidence: float = 0.8, ) -> list[tuple]: """ Get edges with confidence above threshold. Parameters ---------- min_confidence : float, default=0.8 Minimum confidence score. Returns ------- list[tuple] List of (source, target) edges. """ if self.edge_confidence_ is None: return [] return [ edge for edge, conf in self.edge_confidence_.items() if conf >= min_confidence ]