import torch
import pandas as pd
import numpy as np
import torch.nn.functional as F
from sklearn.covariance import GraphicalLasso
[docs]
def gen_similarity_graph(X:pd.DataFrame, k:int = 15, metric:str = "cosine", mutual:bool = False, per_node:bool = True, self_loops:bool = True) -> pd.DataFrame:
"""
Build a normalized knn similarity graph from feature vectors. Computes pairwise cosine or ecledian disntace,then sparsifies via knn or global a threshold.
Optionally prunes to mutual neighbors and/or adds self-loops.
Args:
- X: pandas.DataFrame of shape (N, D) (rows = nodes, cols = features)
- k (int): Number of neighbors to keep per node.
- metric (str): "cosine" or "euclidean" (uses gaussian kernel on distances).
- mutual (bool): If True, retain only mutual edges (i->j and j->i).
- per_node (bool): If True, use per-node top_k; else global cutoff.
- self_loops (bool): If True, add self-loop weight of 1.
Returns:
- DataFrame of shape (N, N) the normalized adjacency matrix
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if isinstance(X, pd.DataFrame):
nodes = X.index
x_torch = torch.tensor(X.values, dtype=torch.float32, device=device)
else:
raise TypeError("X must be a pandas.DataFrame")
N = x_torch.size(0)
# full similarity matrix
if metric == "cosine":
X_normalized = F.normalize(x_torch, p=2, dim=1)
S = torch.mm(X_normalized, X_normalized.t())
else:
D2 = torch.cdist(x_torch, x_torch).pow(2)
median_d2 = D2.median()
S = torch.exp(-D2 / (median_d2 + 1e-8))
# building the knn graph or global threshold mask
if per_node:
_, index = torch.topk(S, k=k+1, dim=1)
mask = torch.zeros(N, N, dtype=torch.bool, device=device)
for i in range(N):
for j in index[i, 1:k+1]:
mask[i, j] = True
else:
flat = S.reshape(-1)
threshold = torch.kthvalue(flat, k * N).values
mask = S >= threshold
mask.fill_diagonal_(False)
# mutual pruning option
if mutual:
mask = torch.logical_and(mask, mask.t())
# mask and add self loops if set to true
A = S * mask.float()
if self_loops:
A = A + torch.eye(N, device=device, dtype=x_torch.dtype)
# row normalize
A = F.normalize(A, p=1, dim=1)
A_numpy = A.cpu().numpy()
final_graph =pd.DataFrame(A_numpy, index=nodes, columns=nodes)
return final_graph
[docs]
def gen_correlation_graph(X: pd.DataFrame, k: int = 15,method: str = 'pearson', mutual: bool = False, per_node: bool = True,threshold: float = None, self_loops:bool = True) -> pd.DataFrame:
"""
Build a graph based on pairwise Pearson or Spearman correlations.
Args:
- X pd.dataframe (N, D) data tensor where rows are nodes.
- k (int): Number of neighbors to keep per node if per_node is True.
- method (str): 'pearson' or 'spearman'.
- mutual (bool): If True, only mutual knn edges.
- per_node (bool): If True, use per node topk selection, else global threshold.
- threshold (float): Correlation cutoff when per_node is False.
- self_loops (bool): If True, adds weight 1 to diagonal.
Returns:
- pandas.DataFrame. Normalized adjacency matrix (N x N) of the sparse correlation graph.
Note:
- Correlation is very expensive to compute, so this function is not recommended for large datasets.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if isinstance(X, pd.DataFrame):
nodes = X.index
x_torch = torch.tensor(X.values, dtype=torch.float32, device=device)
else:
raise TypeError("X must be a pandas.DataFrame")
N = x_torch.size(0)
# rank transform for Spearman
if method == 'spearman':
x_ranked = x_torch.argsort(dim=1).argsort(dim=1).float()
x_correlation = x_ranked - x_ranked.mean(dim=1, keepdim=True)
else:
x_correlation = x_torch - x_torch.mean(dim=1, keepdim=True)
# correlation with clamping and normalization
num = torch.mm(x_correlation, x_correlation.t())
sum_sq = (x_correlation**2).sum(dim=1, keepdim=True)
denom = torch.sqrt(torch.mm(sum_sq, sum_sq.t())).clamp(min=1e-8)
C = num / denom
S = C.abs()
# build mask
if per_node:
_, index = torch.topk(S, k=k+1, dim=1)
mask = torch.zeros(N, N, dtype=torch.bool, device=device)
for i in range(N):
for j in index[i, 1:k+1]:
mask[i, j] = True
else:
if threshold is not None:
mask = S >= threshold
mask.fill_diagonal_(False)
else:
flat = S.reshape(-1)
thresh = torch.kthvalue(flat, k * N).values
mask = S >= thresh
mask.fill_diagonal_(False)
if mutual:
mask = torch.logical_and(mask, mask.t())
W = S * mask.float()
if self_loops:
W.fill_diagonal_(1.0)
W = F.normalize(W, p=1, dim=1)
final_graph = pd.DataFrame(W.cpu().numpy(), index=nodes, columns=nodes)
return final_graph
[docs]
def gen_threshold_graph(X:pd.DataFrame, b: float = 6.0,k: int = 15, mutual: bool = False, self_loops: bool = True) -> pd.DataFrame:
"""
Generate a soft threshold co-xpression network this is very similar to how WGCNA works
Args:
- X (pd.DataFrame): Data matrix where rows are nodes and columns are features.
- b (float): Thresholding exponent applied to absolute correlations.
- k (int): Number of neighbors to keep per node.
- mutual (bool): If True, only mutual knn edges.
- self_loops (bool): If True, adds weight 1 to diagonal.
Returns:
- pandas.DataFrame: Normalized adjacency matrix (N x N) of the soft-thresholded graph.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if isinstance(X, pd.DataFrame):
nodes = X.index
x_torch = torch.tensor(X.values, dtype=torch.float32, device=device)
else:
raise TypeError("X must be a pandas.DataFrame")
N = x_torch.size(0)
# pearson correlation matrix
Xc = x_torch - x_torch.mean(dim=1, keepdim=True)
num = torch.mm(Xc, Xc.t())
sum_sq = (Xc**2).sum(dim=1, keepdim=True)
denom = torch.sqrt(torch.mm(sum_sq, sum_sq.t())).clamp(min=1e-8)
C = num / denom
# threshold
S = C.abs().pow(b)
# mask building
_, index = torch.topk(S, k=k+1, dim=1)
mask = torch.zeros(N, N, dtype=torch.bool, device=device)
for i in range(N):
for j in index[i, 1:k+1]:
mask[i, j] = True
if mutual:
mask = torch.logical_and(mask, mask.t())
# apply mask (and self-loops if set)
W = S * mask.float()
if self_loops:
W.fill_diagonal_(1.0)
W = F.normalize(W, p=1, dim=1)
final_graph = pd.DataFrame(W.cpu().numpy(), index=nodes, columns=nodes)
return final_graph
[docs]
def gen_gaussian_knn_graph(X: pd.DataFrame,k: int = 15,sigma: float = None,mutual: bool = False,self_loops: bool = True) -> pd.DataFrame:
"""
Build a normalized knn similarity graph from feature vectors. Computes pairwise cosine or Euclidean similarities, sparsifies via k-nearest neighbors or
a global threshold. Optionally prunes to mutual neighbors and/or adds self-loops.
Args:
- X (pd.DataFrame): Feature matrix where rows are nodes and columns are features.
- k (int): Number of neighbors to keep per node.
- metric (str): 'cosine' or 'euclidean', uses Gaussian kernel for distances.
- mutual (bool): If True, only mutual knn edges.
- per_node (bool): If True, use per-node topk selection; else global threshold.
- self_loops (bool): If True, adds weight 1 to diagonal.
Returns:
- pandas.DataFrame: Normalized adjacency matrix (N x N) of the sparse similarity graph.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if isinstance(X, pd.DataFrame):
nodes = X.index
X_torch = torch.tensor(X.values, dtype=torch.float32, device=device)
else:
raise TypeError("X must be a pandas.DataFrame")
N = X_torch.size(0)
D2 = torch.cdist(X_torch, X_torch).pow(2)
if sigma is None:
sigma = D2.median().item()
S = torch.exp(-D2 / (2 * sigma))
_, index = torch.topk(S, k=k+1, dim=1)
mask = torch.zeros(N, N, dtype=torch.bool, device=device)
for i in range(N):
for j in index[i, 1:k+1]:
mask[i, j] = True
if mutual:
mask = torch.logical_and(mask, mask.t())
W = S * mask.float()
if self_loops:
W.fill_diagonal_(1.0)
W = F.normalize(W, p=1, dim=1)
final_graph = pd.DataFrame(W.cpu().numpy(), index=nodes, columns=nodes)
return final_graph
[docs]
def gen_lasso_graph(X: pd.DataFrame, alpha: float = 0.01, self_loops: bool = True) -> pd.DataFrame:
"""
Infer a sparse network via Graphical Lasso.
Args:
- X (pd.DataFrame): Data matrix where rows are nodes and columns are features.
- alpha (float): Regularization parameter for Graphical Lasso.
- self_loops (bool): If True, adds weight 1 to diagonal.
Returns:
- pandas.DataFrame: Normalized adjacency matrix (N x N) of the inferred network.
"""
if isinstance(X, pd.DataFrame):
nodes = X.index
x_numpy = X.values
else:
raise TypeError("X must be a pandas.DataFrame")
model = GraphicalLasso(alpha=alpha, max_iter=200)
model.fit(x_numpy)
P = torch.from_numpy(model.precision_).abs()
W = (P + P.t()) / 2
if self_loops:
W.fill_diagonal_(W.diagonal() + 1.0)
W = F.normalize(W, p=1, dim=1)
final_graph = pd.DataFrame(W.cpu().numpy(), index=nodes, columns=nodes)
return final_graph
[docs]
def gen_mst_graph(X: pd.DataFrame, self_loops: bool = True) -> pd.DataFrame:
"""
Compute the minimum spanning tree (MST) on Euclidean distances.
Args:
- X (pd.DataFrame): Feature matrix where rows are nodes and columns are features.
- self_loops (bool): If True, adds weight 1 to diagonal.
Returns:
- pandas.DataFrame: Normalized adjacency matrix (N x N) of the MST graph.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if isinstance(X, pd.DataFrame):
nodes = X.index
X_torch = torch.tensor(X.values, dtype=torch.float32, device=device)
else:
raise TypeError("X must be a pandas.DataFrame")
N = X_torch.size(0)
D = torch.cdist(X_torch, X_torch)
visited = torch.zeros(N, dtype=torch.bool, device=device)
visited[0] = True
min_cost = D[0].clone()
closest = torch.zeros(N, dtype=torch.long, device=device)
mst = torch.zeros((N, N), dtype=X_torch.dtype, device=device)
for tmp in range(N - 1):
cost_masked = min_cost.clone()
cost_masked[visited] = float('inf')
j = torch.argmin(cost_masked)
i = closest[j].item()
w = D[i, j]
mst[i, j] = w
mst[j, i] = w
visited[j] = True
distance_j = D[j]
update = torch.logical_not(visited) & (distance_j < min_cost)
min_cost = torch.where(update, distance_j, min_cost)
closest = torch.where(update, j, closest)
W = mst
if self_loops:
W.fill_diagonal_(1.0)
W = F.normalize(W, p=1, dim=1)
final_graph = pd.DataFrame(W.cpu().numpy(), index=nodes, columns=nodes)
return final_graph
[docs]
def gen_snn_graph(X: pd.DataFrame,k: int = 15,mutual: bool = False, self_loops: bool = True) -> pd.DataFrame:
"""
Build a shared nearest neighbor (SNN) graph.
Args:
- X (pd.DataFrame): Feature matrix where rows are nodes and columns are features.
- k (int): Number of neighbors to keep per node.
- mutual (bool): If True, only mutual knn edges.
- self_loops (bool): If True, adds weight 1 to diagonal.
Returns:
- pandas.DataFrame: Normalized adjacency matrix (N x N) of the SNN graph.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if isinstance(X, pd.DataFrame):
nodes = X.index
X_torch = torch.tensor(X.values, dtype=torch.float32, device=device)
else:
raise TypeError("X must be a pandas.DataFrame")
N = X_torch.size(0)
S = torch.mm(X_torch, X_torch.t())
_, index = torch.topk(S, k=k+1, dim=1)
neighbors = []
for i in range(N):
neighbors.append(set(index[i, 1:k+1].tolist()))
W = torch.zeros((N, N), dtype=X_torch.dtype, device=device)
for i in range(N):
for j in index[i, 1:k+1]:
j = j.item()
shared = len(neighbors
[i].intersection(neighbors[j]))
W[i, j] = shared
if mutual:
Wij = W.clone()
W = torch.min(Wij, Wij.t())
if self_loops:
W.fill_diagonal_(1.0)
W = F.normalize(W, p=1, dim=1)
final_graph = pd.DataFrame(W.cpu().numpy(), index=nodes, columns=nodes)
return final_graph