Source code for bioneuralnet.clustering.hybrid_louvain

import networkx as nx
import pandas as pd
import numpy as np
from typing import Union, Optional
import torch

from bioneuralnet.clustering.correlated_pagerank import CorrelatedPageRank
from bioneuralnet.clustering.correlated_louvain import CorrelatedLouvain

from ..utils.logger import get_logger

logger = get_logger(__name__)

[docs] class HybridLouvain: """ HybridLouvain Class that combines Correlated Louvain and Correlated PageRank for community detection. Attributes: G (nx.Graph): NetworkX graph object. B (pd.DataFrame): Omics data. Y (pd.DataFrame): Phenotype data. k3 (float): Weight for Correlated Louvain. k4 (float): Weight for Correlated Louvain. max_iter (int): Maximum number of iterations. weight (str): Edge weight parameter name. tune (bool): Flag to enable tuning of parameters """ def __init__( self, G: nx.Graph, B: pd.DataFrame, Y: pd.DataFrame, k3: float = 0.2, k4: float = 0.8, max_iter: int = 10, weight: str = "weight", gpu: bool = False, seed: Optional[int] = None, tune: Optional[bool] = False, ): self.logger = get_logger(__name__) if seed is not None: torch.manual_seed(seed) np.random.seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False self.gpu = gpu self.seed = seed self.logger.info("Initializing HybridLouvain...") self.G = G graph_nodes = set(map(str, G.nodes())) omics_cols = set(B.columns.astype(str)) keep_omics = B.columns[B.columns.astype(str).isin(graph_nodes)] dropped_omics = sorted(omics_cols - graph_nodes) if dropped_omics: self.logger.info( f"Dropping {len(dropped_omics)} omics columns not in graph: " f"{dropped_omics[:5]}{'…' if len(dropped_omics) > 5 else ''}" ) self.B = B.loc[:, keep_omics] # pheno_idx = set(Y.index.astype(str)) # isin_mask = Y.index.astype(str).isin(graph_nodes) # keep_pheno = Y.index[isin_mask] # dropped_pheno = sorted(pheno_idx - graph_nodes) # if dropped_pheno: # self.logger.info( # f"Dropping {len(dropped_pheno)} phenotype rows not in graph: " # f"{dropped_pheno[:5]}{'…' if len(dropped_pheno) > 5 else ''}" # ) if isinstance(Y, pd.DataFrame): self.Y = Y.squeeze() elif isinstance(Y, pd.Series): self.Y = Y self.k3 = k3 self.k4 = k4 self.weight = weight self.max_iter = max_iter self.tune = tune self.logger.info( f"Initialized HybridLouvain with {len(self.G)} graph nodes, " f"{self.B.shape[1]} omics columns, {self.Y.shape[0]} phenotype rows; " f"max_iter={max_iter}, k3={k3}, k4={k4}, tune={tune}" )
[docs] def run(self, as_dfs: bool = False) -> Union[dict, list]: iteration = 0 prev_size = len(self.G.nodes()) current_partition = None all_clusters = {} while iteration < self.max_iter: self.logger.info( f"\nIteration {iteration+1}/{self.max_iter}: Running Correlated Louvain..." ) if self.tune: self.logger.info("Tuning Correlated Louvain for current iteration...") louvain_tuner = CorrelatedLouvain( self.G, B=self.B, Y=self.Y, k3=self.k3, k4=self.k4, weight=self.weight, seed=self.seed, tune=True, gpu=self.gpu, ) best_config_louvain = louvain_tuner.run_tuning(num_samples=5) tuned_k4 = best_config_louvain.get("k4", self.k4) tuned_k3 = 1.0 - tuned_k4 self.logger.info( f"Using tuned Louvain parameters: k3={tuned_k3}, k4={tuned_k4}" ) louvain = CorrelatedLouvain( self.G, B=self.B, Y=self.Y, k3=tuned_k3, k4=tuned_k4, weight=self.weight, tune=False, gpu=self.gpu, seed=self.seed, ) else: louvain = CorrelatedLouvain( self.G, B=self.B, Y=self.Y, k3=self.k3, k4=self.k4, weight=self.weight, tune=False, gpu=self.gpu, seed=self.seed, ) partition = louvain.run() quality_val = louvain.get_quality() self.logger.info( f"Iteration {iteration+1}: Louvain Quality = {quality_val:.4f}" ) current_partition = partition best_corr = 0 best_seed = None for com in set(partition.values()): nodes = [n for n in self.G.nodes() if partition[n] == com] if len(nodes) < 2: continue try: corr, _ = louvain._compute_community_correlation(nodes) if abs(corr) > abs(best_corr): best_corr = corr best_seed = nodes except Exception as e: self.logger.info( f"Error computing correlation for community {com}: {e}" ) if best_seed is None: self.logger.info("No valid seed community found; stopping iterations.") break self.logger.info( f"Selected seed community of size {len(best_seed)} with correlation {best_corr:.4f}" ) if self.tune: self.logger.info("Tuning Correlated PageRank for current iteration...") pagerank_tuner = CorrelatedPageRank( graph=self.G, omics_data=self.B, phenotype_data=self.Y, alpha=0.9, max_iter=100, tol=1e-6, k=0.5, seed=self.seed, gpu=self.gpu, tune=True, ) best_config_pr = pagerank_tuner.run_tuning(num_samples=5) tuned_alpha = best_config_pr.get("alpha", 0.9) tuned_max_iter = best_config_pr.get("max_iter", 100) tuned_tol = best_config_pr.get("tol", 1e-6) tuned_k = best_config_pr.get("k", 0.5) self.logger.info( f"Using tuned PageRank parameters: alpha={tuned_alpha}, max_iter={tuned_max_iter}, tol={tuned_tol}, k={tuned_k}" ) pagerank_instance = CorrelatedPageRank( graph=self.G, omics_data=self.B, phenotype_data=self.Y, alpha=tuned_alpha, max_iter=tuned_max_iter, tol=tuned_tol, k=tuned_k, tune=False, gpu=self.gpu, seed=self.seed, ) else: pagerank_instance = CorrelatedPageRank( graph=self.G, omics_data=self.B, phenotype_data=self.Y, tune=False, seed=self.seed, gpu=self.gpu, ) pagerank_results = pagerank_instance.run(best_seed) refined_nodes = pagerank_results.get("cluster_nodes", []) new_size = len(refined_nodes) all_clusters[iteration] = refined_nodes self.logger.info(f"Refined subgraph size: {new_size}") if new_size == prev_size or new_size <= 1: self.logger.info( "Subgraph size converged or too small. Stopping iterations." ) break prev_size = new_size self.G = self.G.subgraph(refined_nodes).copy() iteration += 1 self.logger.info(f"Hybrid Louvain completed after {iteration+1} iterations.") if as_dfs: dfs = [] for nodes in all_clusters.values(): if len(nodes) > 2: dfs.append(self.B.loc[:, nodes].copy()) return dfs else: return {"curr": current_partition, "clus": all_clusters}