Source code for bioneuralnet.clustering.correlated_louvain
import numpy as np
import networkx as nx
import pandas as pd
import torch
import os
from typing import Optional, Union
from community.community_louvain import (
modularity as original_modularity,
best_partition,
)
from scipy.stats import pearsonr
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from ..utils.logger import get_logger
from ray import tune
from ray.tune import CLIReporter
from ray.air import session
from ray.tune.schedulers import ASHAScheduler
logger = get_logger(__name__)
[docs]
class CorrelatedLouvain:
"""
CorrelatedLouvain Class for Community Detection with Correlated Omics Data.
Attributes:
G (nx.Graph): NetworkX graph object.
B (pd.DataFrame): Omics data.
Y (pd.DataFrame): Phenotype data.
k3 (float): Weight for Correlated Louvain.
k4 (float): Weight for Correlated Louvain.
weight (str): Edge weight parameter name.
tune (bool): Flag to enable tuning of parameters
"""
def __init__(
self,
G: nx.Graph,
B: pd.DataFrame,
Y=None,
k3: float = 0.2,
k4: float = 0.8,
weight: str = "weight",
tune: bool = False,
gpu: bool = False,
seed: Optional[int] = None,
):
self.logger = get_logger(__name__)
self.G = G.copy()
self.B = B.copy()
self.Y = Y
self.K3 = k3
self.K4 = k4
self.weight = weight
self.tune = tune
self.logger.info(
f"Initialized CorrelatedLouvain with k3 = {self.K3}, k4 = {self.K4}, "
)
if self.B is not None:
self.logger.info(f"Original omics data shape: {self.B.shape}")
self.logger.info(f"Original graph has {self.G.number_of_nodes()} nodes.")
if self.B is not None:
self.logger.info(f"Final omics data shape: {self.B.shape}")
self.logger.info(
f"Graph has {self.G.number_of_nodes()} nodes and {self.G.number_of_edges()} edges."
)
if seed is not None:
torch.manual_seed(seed)
np.random.seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
self.seed = seed
self.gpu = gpu
self.device = torch.device("cuda" if gpu and torch.cuda.is_available() else "cpu")
self.logger.info(f"Initialized Correlated Louvain. device={self.device}")
def _compute_community_correlation(self, nodes) -> tuple:
"""
Compute the Pearson correlation between the first principal component (PC1) of the omics data
(for the given nodes) and the phenotype.
Drops columns that are completely zero.
"""
try:
self.logger.info(
f"Computing community correlation for {len(nodes)} nodes..."
)
node_cols = [str(n) for n in nodes if str(n) in self.B.columns]
if not node_cols:
self.logger.info(
"No valid columns found for these nodes; returning (0.0, 1.0)."
)
return 0.0, 1.0
B_sub = self.B.loc[:, node_cols]
zero_mask = (B_sub == 0).all(axis=0)
num_zero_columns = int(zero_mask.sum())
if num_zero_columns > 0:
self.logger.info(
f"WARNING: {num_zero_columns} columns are all zeros in community subset."
)
B_sub = B_sub.loc[:, ~zero_mask]
if B_sub.shape[1] == 0:
self.logger.info("All columns dropped; returning (0.0, 1.0).")
return 0.0, 1.0
self.logger.info(
f"B_sub shape: {B_sub.shape}, first few columns: {node_cols[:5]}"
)
scaler = StandardScaler()
scaled = scaler.fit_transform(B_sub)
pca = PCA(n_components=1)
pc1 = pca.fit_transform(scaled).flatten()
target = (
self.Y.iloc[:, 0].values
if isinstance(self.Y, pd.DataFrame)
else self.Y.values
)
corr, pvalue = pearsonr(pc1, target)
return corr, pvalue
except Exception as e:
self.logger.info(f"Error in _compute_community_correlation: {e}")
raise
def _quality_correlated(self, partition) -> float:
"""
Compute the overall quality metric as:
Q* = k3 * Q + k4 * avg_abs_corr,
where Q is the standard modularity and avg_abs_corr is the average absolute Pearson correlation
(computed over communities with at least 2 nodes).
"""
Q = original_modularity(partition, self.G, self.weight)
if self.B is None or self.Y is None:
self.logger.info(
"Omics/phenotype data not provided; returning standard modularity."
)
return Q
community_corrs = []
for com in set(partition.values()):
nodes = [n for n in self.G.nodes() if partition[n] == com]
if len(nodes) < 2:
continue
corr, _ = self._compute_community_correlation(nodes)
community_corrs.append(abs(corr))
avg_corr = np.mean(community_corrs) if community_corrs else 0.0
quality = self.K3 * Q + self.K4 * avg_corr
self.logger.info(
f"Computed quality: Q = {Q:.4f}, avg_corr = {avg_corr:.4f}, combined = {quality:.4f}"
)
return quality
[docs]
def run(self, as_dfs: bool = False) -> Union[dict, list]:
"""
Run correlated Louvain clustering.
If as_dfs is True, returns a list of adjacency matrices (DataFrames),
where each adjacency matrix represents a cluster with more than 2 nodes.
Otherwise, returns the partition dictionary.
If tune is True and as_dfs is False, hyperparameter tuning is performed and the best parameters are returned.
If tune is True and as_dfs is True, tuning is performed, and then standard detection is run using the tuned parameters.
"""
if self.tune and not as_dfs:
self.logger.info("Tuning enabled. Running hyperparameter tuning...")
best_config = self.run_tuning(num_samples=10)
self.logger.info("Tuning completed successfully.")
return {"best_config": best_config}
elif self.tune and as_dfs:
self.logger.info("Tuning enabled and adjacency matrices output requested.")
best_config = self.run_tuning(num_samples=10)
tuned_k4 = best_config.get("k4", 0.8)
tuned_k3 = 1.0 - tuned_k4
tuned_instance = CorrelatedLouvain(
G=self.G,
B=self.B,
Y=self.Y,
k3=tuned_k3,
k4=tuned_k4,
weight=self.weight,
tune=False,
gpu=self.gpu,
seed=self.seed,
)
return tuned_instance.run(as_dfs=True)
else:
self.logger.info("Running standard community detection...")
partition = best_partition(self.G, weight=self.weight)
quality = self._quality_correlated(partition)
self.logger.info(f"Final quality: {quality:.4f}")
self.partition = partition
if as_dfs:
self.logger.info("Raw partition output:", self.partition)
clusters_dfs = self.partition_to_adjacency(self.partition)
print(f"Returning {len(clusters_dfs)} clusters after filtering")
return clusters_dfs
else:
return partition
[docs]
def partition_to_adjacency(self, partition: dict) -> list:
"""
Convert the partition dictionary into a list of adjacency matrices (DataFrames),
where each adjacency matrix represents a cluster with more than 2 nodes.
"""
clusters = {}
for node, cl in partition.items():
clusters.setdefault(cl, []).append(node)
self.logger.debug(f"Total detected clusters: {len(clusters)}")
adjacency_matrices = []
for cl, nodes in clusters.items():
self.logger.debug(f"Cluster {cl} size: {len(nodes)}")
if len(nodes) > 2:
valid_nodes = list(set(nodes).intersection(set(self.B.columns)))
if valid_nodes:
adjacency_matrix = self.B.loc[:, valid_nodes].fillna(0)
adjacency_matrices.append(adjacency_matrix)
print(f"Clusters with >2 nodes: {len(adjacency_matrices)}")
return adjacency_matrices
[docs]
def get_quality(self) -> float:
if not hasattr(self, "partition"):
raise ValueError("No partition computed. Call run() first.")
return self._quality_correlated(self.partition)
def _tune_helper(self, config):
k4 = config["k4"]
k3 = 1.0 - k4
tuned_instance = CorrelatedLouvain(
G=self.G,
B=self.B,
Y=self.Y,
k3=k3,
k4=k4,
weight=self.weight,
gpu=self.gpu,
seed=self.seed,
tune=False,
)
tuned_instance.run()
quality = tuned_instance.get_quality()
session.report({"quality": quality})
[docs]
def run_tuning(self, num_samples=10):
search_config = {"k4": tune.grid_search([0.5, 0.6, 0.7, 0.8, 0.9])}
scheduler = ASHAScheduler(
metric="quality",
mode="max",
grace_period=1,
reduction_factor=2,
)
reporter = CLIReporter(metric_columns=["quality", "training_iteration"])
def short_dirname_creator(trial):
return f"_{trial.trial_id}"
resources = {"cpu": 1, "gpu": 1} if self.device.type == "cuda" else {"cpu": 1, "gpu": 0}
self.logger.info("Starting hyperparameter tuning...")
analysis = tune.run(
tune.with_parameters(self._tune_helper),
config=search_config,
verbose=0,
num_samples=num_samples,
scheduler=scheduler,
progress_reporter=reporter,
storage_path=os.path.expanduser("~/cl"),
trial_dirname_creator=short_dirname_creator,
resources_per_trial=resources,
name="l",
)
best_config = analysis.get_best_config(metric="quality", mode="max")
self.logger.info(f"Best hyperparameters found: {best_config}")
return best_config