Source code for bioneuralnet.clustering.correlated_pagerank

from typing import List, Tuple, Dict, Any,Optional
import pandas as pd
import networkx as nx
import numpy as np
import os

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from scipy.stats import pearsonr

from ray import tune
from ray.tune import CLIReporter
from ray.air import session
from ray.tune.schedulers import ASHAScheduler
import torch

from ..utils.logger import get_logger

[docs] class CorrelatedPageRank: """ PageRank Class for Clustering Nodes Based on Personalized PageRank. This class handles the execution of the Personalized PageRank algorithm and identification of clusters based on sweep cuts. Attributes: alpha (float): Damping factor for PageRank. max_iter (int): Maximum number of iterations for PageRank convergence. tol (float): Tolerance for convergence. k (float): Weighting factor for composite correlation-conductance score. output_dir (str): Directory to save outputs. """ def __init__( self, graph: nx.Graph, omics_data: pd.DataFrame, phenotype_data: pd.DataFrame, alpha: float = 0.9, max_iter: int = 100, tol: float = 1e-6, k: float = 0.5, tune: bool = False, gpu: bool = False, seed: Optional[int] = None, ): """ Initializes the PageRank instance with direct data structures. Args: graph (nx.Graph): NetworkX graph object representing the network. omics_data (pd.DataFrame): Omics data DataFrame. phenotype_data (pd.DataFrame): Phenotype data Series. alpha (float, optional): Damping factor for PageRank. Defaults to 0.9. max_iter (int, optional): Maximum iterations for PageRank. Defaults to 100. tol (float, optional): Tolerance for convergence. Defaults to 1e-6. k (float, optional): Weighting factor for composite score. Defaults to 0.9. """ self.G = graph self.B = omics_data self.Y = phenotype_data.squeeze() self.alpha = alpha self.max_iter = max_iter self.tol = tol self.k = k self.tune = tune self.logger = get_logger(__name__) self.logger.info("Initialized PageRank with the following parameters:") self.logger.info( f"Graph: NetworkX Graph with {self.G.number_of_nodes()} nodes and {self.G.number_of_edges()} edges." ) self.logger.info(f"Omics Data: DataFrame with shape {self.B.shape}.") self.logger.info(f"Phenotype Data: Series with {len(self.Y)} samples.") self.logger.info(f"Alpha: {self.alpha}") self.logger.info(f"Max Iterations: {self.max_iter}") self.logger.info(f"Tolerance: {self.tol}") self.logger.info(f"K (Composite Score Weight): {self.k}") self._validate_inputs() if seed is not None: torch.manual_seed(seed) np.random.seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False self.seed = seed self.gpu = gpu self.device = torch.device("cuda" if gpu and torch.cuda.is_available() else "cpu") self.logger.info(f"Initialized Correlated Louvain. device={self.device}") def _validate_inputs(self): """ Validates the consistency of input data structures. """ try: if not isinstance(self.G, nx.Graph): raise TypeError("graph must be a networkx.Graph instance.") if not isinstance(self.B, pd.DataFrame): raise TypeError("omics_data must be a pandas DataFrame.") if not isinstance(self.Y, pd.Series): raise TypeError("phenotype_data must be a pandas Series.") graph_nodes = set(self.G.nodes()) omics_nodes = set(self.B.columns) phenotype_nodes = set(self.Y.index) if not graph_nodes.issubset(omics_nodes): missing = graph_nodes - omics_nodes raise ValueError(f"Omics data is missing nodes: {missing}") except Exception as e: self.logger.error(f"Input validation error: {e}") raise
[docs] def phen_omics_corr(self, nodes: List[Any]) -> Tuple[float, str]: """ Calculates the Pearson correlation between the PCA of omics data and phenotype. Args: nodes (List[Any]): List of node identifiers to include in the calculation. Returns: Tuple[float, str]: Correlation coefficient and formatted correlation with p-value. """ try: B_sub = self.B[nodes] scaler = StandardScaler() scaled = scaler.fit_transform(B_sub) pca = PCA(n_components=1) g1 = pca.fit_transform(scaled).flatten() g2 = self.Y corr, pvalue = pearsonr(g1, g2) corr = round(corr, 2) p_value = format(pvalue, ".3g") corr_pvalue = f"{corr} ({p_value})" return corr, corr_pvalue except Exception as e: self.logger.error(f"Error in phen_omics_corr: {e}") raise
[docs] def sweep_cut( self, p: Dict[Any, float] ) -> Tuple[List[Any], int, float, float, float, str]: try: best_cluster = set() min_comp_score = float("inf") best_len = 0 best_cond = 1.0 best_corr = 0.0 best_corr_pval = "" degrees = dict(self.G.degree(weight="weight")) vec = sorted( [ (p[node] / degrees[node] if degrees[node] > 0 else 0, node) for node in p.keys() ], reverse=True, ) current_cluster = set() for i, (val, node) in enumerate(vec): self.logger.debug( f"Iteration {i}: Adding node {node} with norm value {val:.3f}" ) if val == 0: continue current_cluster.add(node) if len(current_cluster) < len(self.G.nodes()): vol_S = sum( d for n, d in self.G.degree(current_cluster, weight="weight") ) vol_T = sum( d for n, d in self.G.degree( set(self.G.nodes()) - current_cluster, weight="weight" ) ) if min(vol_S, vol_T) == 0: self.logger.warning( f"Skipping iteration {i} due to zero volume (vol_S={vol_S}, vol_T={vol_T})." ) continue cluster_cond = nx.conductance( self.G, current_cluster, weight="weight" ) cluster_corr, corr_pvalue = self.phen_omics_corr( list(current_cluster) ) composite_score = (1 - self.k) * cluster_cond + self.k * ( -abs(cluster_corr) ) self.logger.debug( f"Cluster size={len(current_cluster)}, cond={cluster_cond:.3f}, " f"corr={cluster_corr:.3f}, composite={composite_score:.3f}" ) # cluster must be larger than 10 nodes if len(current_cluster) >= 5 and composite_score < min_comp_score: min_comp_score = composite_score best_cluster = set(current_cluster) best_len = len(current_cluster) best_cond = cluster_cond best_corr = cluster_corr best_corr_pval = corr_pvalue if best_cluster: return ( list(best_cluster), best_len, round(best_cond, 3), round(best_corr, 3), round(min_comp_score, 3), best_corr_pval, ) else: self.logger.warning( "No valid sweep cut found. Returning empty cluster." ) return [], 0, 0.0, 0.0, 0.0, "0 (1.0)" except Exception as e: self.logger.error(f"Error in sweep_cut: {e}") raise
[docs] def generate_weighted_personalization(self, nodes: List[Any]) -> Dict[Any, float]: """ Generates a weighted personalization vector for PageRank. Args: nodes (List[Any]): List of node identifiers to consider. Returns: Dict[Any, float]: Personalization vector with weights for each node. """ try: total_corr, _ = self.phen_omics_corr(nodes) corr_contribution = [] for i in range(len(nodes)): nodes_excl = nodes[:i] + nodes[i + 1 :] if not nodes_excl: contribution = 0.0 else: corr_excl, _ = self.phen_omics_corr(nodes_excl) contribution = abs(corr_excl) - abs(total_corr) corr_contribution.append(contribution) max_contribution = max(corr_contribution, key=abs) if corr_contribution else 1 if max_contribution == 0: max_contribution = 1 weighted_personalization = { node: self.alpha * (corr_contribution[i] / max_contribution) for i, node in enumerate(nodes) } return weighted_personalization except Exception as e: self.logger.error(f"Error in generate_weighted_personalization: {e}") raise
[docs] def run_pagerank_clustering(self, seed_nodes: List[Any]) -> Dict[str, Any]: """ Executes the PageRank clustering algorithm. Args: seed_nodes (List[Any]): List of seed node identifiers for personalization. Returns: Dict[str, Any]: Dictionary containing clustering results. """ if not seed_nodes: self.logger.error("No seed nodes provided for PageRank clustering.") raise ValueError("Seed nodes list cannot be empty.") if not set(seed_nodes).issubset(set(self.G.nodes())): missing = set(seed_nodes) - set(self.G.nodes()) self.logger.error(f"Seed nodes not in graph: {missing}") raise ValueError(f"Seed nodes not in graph: {missing}") try: personalization = self.generate_weighted_personalization(seed_nodes) self.logger.info( f"Generated personalization vector for seed nodes: {seed_nodes}" ) try: p = nx.pagerank( self.G, alpha=self.alpha, personalization=personalization, max_iter=self.max_iter, tol=self.tol, weight="weight", ) except nx.exception.PowerIterationFailedConvergence as e: self.logger.warning( f"PageRank did not converge in {self.max_iter} iterations. Retrying with increased max_iter." ) # retry with doubled iterations p = nx.pagerank( self.G, alpha=self.alpha, personalization=personalization, max_iter=self.max_iter * 2, tol=self.tol, weight="weight", ) self.logger.info("PageRank computation completed.") nodes, n, cond, corr, min_corr, pval = self.sweep_cut(p) if not nodes: self.logger.warning("Sweep cut did not identify any cluster.") else: self.logger.info( f"Sweep cut resulted in cluster of size {n} with conductance {cond} and correlation {corr}." ) results = { "cluster_nodes": nodes, "cluster_size": n, "conductance": cond, "correlation": corr, "composite_score": min_corr, "correlation_pvalue": pval, } return results except Exception as e: self.logger.error(f"Error in run_pagerank_clustering: {e}") raise
[docs] def run(self, seed_nodes: List[Any]) -> Dict[str, Any]: """ Executes the correlated PageRank clustering pipeline. **Steps:** 1. **Initializing Clustering**: - Receives a list of seed nodes to personalize the PageRank algorithm. - Prepares the input graph and relevant parameters for clustering. 2. **PageRank Execution**: - Applies the PageRank algorithm with personalization based on the seed nodes. - Computes node scores and determines cluster memberships. 3. **Result Compilation**: - Compiles clustering results, including cluster sizes and node memberships, into a dictionary. - Logs the successful completion of the clustering process. **Args**: seed_nodes (List[Any]): - A list of node identifiers used as seed nodes for personalized PageRank. - These nodes influence the clustering process by biasing the algorithm. **Returns**: Dict[str, Any] - A dictionary containing the clustering results. Keys may include: - `clusters`: Lists of nodes grouped into clusters. - `scores`: PageRank scores for each node. - `metadata`: Additional metrics or details about the clustering process. **Raises**: - ValueError: If the input graph is empty or seed nodes are invalid. - Exception: For any unexpected errors during clustering execution. **Notes**: - Seed nodes strongly influence the clustering outcome; select them carefully based on prior knowledge or experimental goals. - The PageRank algorithm requires a well-defined and connected graph to produce meaningful results. - Results are sensitive to the alpha (damping factor) and other hyperparameters. """ if self.tune: best_config = self.run_tuning(num_samples=10) self.logger.info("Tuning completed successfully.") return {"best_config": best_config} try: results = self.run_pagerank_clustering(seed_nodes) self.logger.info("PageRank clustering completed successfully.") return results except Exception as e: self.logger.error(f"Error in run method: {e}") raise
[docs] def get_quality(self) -> float: """ Returns the composite score (or correlation) from the latest clustering run. """ if hasattr(self, "results"): # return the composite score return self.results.get("composite_score", 0.0) else: # run clustering on all nodes and then return the score. self.results = self.run_pagerank_clustering(seed_nodes=list(self.G.nodes())) return self.results.get("composite_score", 0.0)
def _tune_helper(self, config): alpha = config["alpha"] max_iter = config["max_iter"] tol = config["tol"] k = config["k"] tuned_instance = CorrelatedPageRank( graph=self.G, omics_data=self.B, phenotype_data=self.Y, alpha=alpha, max_iter=max_iter, tol=tol, k=k, gpu=(self.device.type == "cuda"), seed=self.seed, tune=False, ) # tuning uses all nodes as seed nodes. tuned_instance.run_pagerank_clustering(seed_nodes=list(self.G.nodes())) quality = tuned_instance.get_quality() session.report({"quality": quality})
[docs] def run_tuning(self, num_samples: int = 10) -> Dict[str, Any]: search_config = { "alpha": tune.grid_search([0.8, 0.85, 0.9, 0.95]), "k": tune.grid_search([0.5, 0.6, 0.7, 0.8]), "max_iter": tune.choice([100, 500, 1000]), "tol": tune.loguniform(1e-3, 1e-5), } scheduler = ASHAScheduler( metric="quality", mode="max", grace_period=1, reduction_factor=2, ) reporter = CLIReporter(metric_columns=["quality", "training_iteration"]) def short_dirname_creator(trial): return f"_{trial.trial_id}" resources = {"cpu": 1, "gpu": 1} if self.device.type == "cuda" else {"cpu": 1, "gpu": 0} analysis = tune.run( tune.with_parameters(self._tune_helper), config=search_config, verbose=0, num_samples=num_samples, scheduler=scheduler, progress_reporter=reporter, storage_path=os.path.expanduser("~/pr"), trial_dirname_creator=short_dirname_creator, resources_per_trial=resources, name="l", ) best_config = analysis.get_best_config(metric="quality", mode="max") self.logger.info(f"Best hyperparameters found: {best_config}") return best_config