Source code for bioneuralnet.downstream_task.subject_representation

import pandas as pd
import os
import json
from typing import Optional, Dict, Any, List
from pathlib import Path
from datetime import datetime

import torch
import torch.nn as nn
import torch.optim as optim

from ray import tune
from ray.tune import TuneError
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import accuracy_score, r2_score
import tempfile

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from ..utils.logger import get_logger

[docs] class SubjectRepresentation: """ SubjectRepresentation Class for Integrating Network Embeddings into Omics Data. """ def __init__( self, omics_data: pd.DataFrame, embeddings: pd.DataFrame, phenotype_data: Optional[pd.DataFrame] = None, phenotype_col: str = "phenotype", reduce_method: str = "AE", seed: Optional[int] = None, tune: Optional[bool] = False, output_dir: Optional[str] = None, ): """ Initializes the SubjectRepresentation instance. Parameters: - omics_data : pd.DataFrame of omics features (columns). - embeddings : pd.DataFrame with embeddings (indexed by feature names). - phenotype_data : Optional[pd.DataFrame] with phenotype labels. - phenotype_col : str name of the phenotype column. - reduce_method : Default reduction method (e.g., "AE"). - tune : Whether to run hyperparameter tuning. """ self.logger = get_logger(__name__) self.logger.info("Initializing SubjectRepresentation with provided data inputs.") if omics_data is None or omics_data.empty: raise ValueError("Omics data must be non-empty.") if embeddings is None or embeddings.empty: self.logger.info( "No embeddings provided, please review documentation to see how to generate embeddings." ) raise ValueError("Embeddings must be non-empty.") if not isinstance(embeddings, pd.DataFrame): raise ValueError("Embeddings must be provided as a pandas DataFrame.") if tune and phenotype_data is None: raise ValueError("Phenotype data must be provided for classification-based tuning.") if seed is not None: torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False self.seed = seed self.logger.info(f"Seed set to {self.seed}.") self.omics_data = omics_data.copy(deep=True) self.embeddings = embeddings.copy(deep=True) self.phenotype_data = phenotype_data.copy(deep=True) self.phenotype_col = phenotype_col self.reduce_method = reduce_method.upper() self.tune = tune # match the embedding index embeddings_features = set(self.embeddings.index) omics_features = set(self.omics_data.columns) if len(embeddings_features) != len(omics_features): raise ValueError( f"Number of features in embeddings and omics data do not match.\n" f"Embeddings: {self.embeddings.shape} and Omics: {self.omics_data.shape}" ) common_features = embeddings_features.intersection(omics_features) if len(common_features) == 0: raise ValueError( f"No common features found between the embeddings and omics data.\n" f"Embeddings: {self.embeddings.shape} and Omics: {self.omics_data.shape}" ) self.logger.info(f"Found {len(common_features)} common features between network and omics data.") # output directory if output_dir is None: self.temp_dir_obj = tempfile.TemporaryDirectory() self.output_dir = self.temp_dir_obj.name self.logger.info(f"No output_dir provided; using temporary directory: {self.output_dir}") else: self.output_dir = Path(output_dir) self.logger.info(f"Output directory set to: {self.output_dir}") # create the directory with pathlib Path(self.output_dir).mkdir(parents=True, exist_ok=True)
[docs] def run(self) -> pd.DataFrame: """ Executes the Subject Representation workflow. If tuning is enabled, runs hyperparameter tuning and uses the best config to reduce embeddings. Otherwise, uses the default reduction method. Returns: - Enhanced omics data as a DataFrame. """ self.logger.info("Starting Subject Representation workflow.") if self.embeddings.empty: self.logger.warning( "No embeddings provided. Please generate embeddings using GNNEmbeddings class.\nReturning original omics_data." ) return self.omics_data try: if self.tune: best_config = self._run_tuning() self.logger.info(f"Best tuning config selected: {best_config}") ae_params = best_config.get("ae_params", {"epochs": 16, "hidden_dim": 8, "lr": 1e-3, "dropout": 0.2, "activation": "relu"}) reduced = self._reduce_embeddings(method=best_config["method"], ae_params=ae_params, compressed_dim=best_config["compressed_dim"]) enhanced_omics_data = self._integrate_embeddings(reduced=reduced, method=best_config["integration_method"], alpha=best_config["alpha"], beta=best_config["beta"]) else: method = self.reduce_method.upper() ae_params_def = {"epochs": 16, "hidden_dim": 8, "lr": 1e-3, "dropout": 0.2, "activation": "relu"} reduced = self._reduce_embeddings(method=method, ae_params=ae_params_def, compressed_dim=2) enhanced_omics_data = self._integrate_embeddings(reduced, method="multiply", alpha=1.0, beta=0.8) if enhanced_omics_data.empty: self.logger.warning("Enhanced omics data is empty. Returning original omics_data.") return self.omics_data self.logger.info(f"Subject Representation completed successfully. Final shape: {enhanced_omics_data.shape}") return enhanced_omics_data except Exception as e: self.logger.error(f"Error in Subject Representation workflow: {e}") raise
def _reduce_embeddings(self, method: str, ae_params: dict = None, compressed_dim: int = 2) -> pd.DataFrame: """ Reduces the dimensionality of the embeddings. Returns a DataFrame with `compressed_dim` columns. Parameters: - method: Reduction method ("PCA", "AE"). - ae_params: Parameters for AutoEncoder. - compressed_dim: Number of dimensions to reduce to. Returns: - Reduced embeddings as a DataFrame. """ self.logger.info(f"Reducing embeddings using method='{method}' with compressed_dim={compressed_dim}.") if self.embeddings.empty: raise ValueError("Embeddings DataFrame is empty.") method = method.lower() if method == "pca": pca = PCA(n_components=compressed_dim) pcs = pca.fit_transform(self.embeddings) columns = [] for i in range(compressed_dim): columns.append(f"PC{i+1}") reduced_df = pd.DataFrame(pcs, index=self.embeddings.index, columns=columns) self.logger.info(f"PCA reduction completed. Variance explained: {pca.explained_variance_ratio_.sum():.4f}") elif method in ["ae"]: self.logger.info("Using autoencoder for reduction.") ae_params = ae_params or { "epochs": 64, "hidden_dim": 8, "lr": 1e-3, "dropout": 0.2, "activation": "relu", } X = torch.tensor(self.embeddings.values, dtype=torch.float) model = AutoEncoder( input_dim=self.embeddings.shape[1], hidden_dims=ae_params["hidden_dim"], compressed_dim=compressed_dim, dropout=ae_params["dropout"], activation=ae_params["activation"] ) optimizer = optim.Adam(model.parameters(), lr=ae_params["lr"]) loss_fn = nn.MSELoss() model.train() for epoch in range(ae_params["epochs"]): optimizer.zero_grad() z, recon = model(X) loss = loss_fn(recon, X) loss.backward() optimizer.step() if (epoch + 1) % max(1, ae_params["epochs"] // 5) == 0: self.logger.info(f"AE Epoch {epoch+1}/{ae_params['epochs']} - Loss: {loss.item():.4f}") model.eval() with torch.no_grad(): z, _ = model(X) z_np = z.detach().cpu().numpy() if z_np.ndim == 1: z_np = z_np.reshape(-1, 1) columns = [] for i in range(compressed_dim): columns.append(f"AE_{i+1}") reduced_df = pd.DataFrame(z_np, index=self.embeddings.index, columns=columns) self.logger.info("Autoencoder reduction completed") else: raise ValueError(f"Unsupported reduction method:{method}") return reduced_df def _integrate_embeddings(self, reduced: pd.DataFrame, method="multiply", alpha=2.0, beta=0.5) -> pd.DataFrame: """ Integrates the reduced embeddings with the omics data using a multiplicative approach. With the default parameters (alpha = 2.0, beta = 0.5), each feature is updated as: - enhanced = beta * raw + (1 - beta) * (alpha * normalized_weight * raw) For example, with alpha = 2.0 and beta = 0.5: - If a features normalized weight is 1.0: - enhanced = 0.5xraw + 0.5x(2.0x1.0xraw) = 0.5xraw + raw = 1.5xraw - If a features normalized weight is 0.5: - enhanced = 0.5xraw + 0.5x(2.0x0.5xraw) = 0.5xraw + 0.5xraw = raw This is so at least 50% of the final output is influenced by the computed weight """ missing_features = set(self.omics_data.columns) - set(reduced.index) if missing_features: self.logger.warning(f"Missing {len(missing_features)} features in reduced embeddings: {list(missing_features)[:5]}") if method == "multiply": # the reduced embeddings to match the omics data columns. if not reduced.index.equals(self.omics_data.columns): self.logger.info("Aligning reduced embeddings index with omics_data columns.") reduced = reduced.reindex(self.omics_data.columns) # normalize the embeddings and compute a weight series if isinstance(reduced, pd.DataFrame): for col in reduced.columns: reduced[col] = (reduced[col] - reduced[col].min()) / (reduced[col].max() - reduced[col].min()) weight_series = reduced.mean(axis=1) elif isinstance(reduced, pd.Series): weight_series = (reduced - reduced.min()) / (reduced.max() - reduced.min()) else: raise ValueError("Reduced embeddings must be a pandas DataFrame or Series.") weight_series = weight_series.fillna(0) ranks = weight_series.rank(method="average") scaled_ranks = 2 * (ranks - ranks.min()) / (ranks.max() - ranks.min()) - 1 # look for duplicate feature names. if not scaled_ranks.index.is_unique: self.logger.error("Duplicate feature names detected in the reduced embeddings index.") # re-index to ensure we have a weight per omics_data feature. feature_weights = scaled_ranks.reindex(self.omics_data.columns,fill_value=0.0) enhanced = self.omics_data.copy() for feature in self.omics_data.columns: weight_val = feature_weights.get(feature) # checking weight_val is series or scalar if isinstance(weight_val, pd.Series): weight_val = weight_val.iloc[0] if pd.notnull(weight_val): enhanced[feature] = beta * enhanced[feature] + (1 - beta) * (alpha * weight_val * enhanced[feature]) else: self.logger.warning(f"Feature {feature} not found in the reduced weights.") else: #Curently only supoort one method but left the parameter in case we supp0ort more in the future. raise ValueError(f"Unknown integration method: {method}.") self.logger.info(f"Final Enhanced Omics Shape: {enhanced.shape}") return enhanced def _run_tuning(self) -> Dict[str, Any]: """ Runs tuning for SubjectRepresentation. """ self.logger.info("Running tuning for SubjectRepresentation.") return self._run_classification_tuning() def _run_classification_tuning(self) -> Dict[str, Any]: """ Performs hyperparameter tuning using Ray Tune. The search space includes: - PCA - AutoEncoder (AE) parameters: epochs, hidden_dim, dropout, lr, activation. Uses RandomForest for classification or regression depending on phenotype. """ search_config = { "method": tune.choice(["PCA", "AE"]), "compressed_dim": tune.choice([1, 2, 3, 4]), "ae_params": { "epochs": tune.choice([64, 128, 256, 512, 1024]), "hidden_dim": tune.choice([16, 32, 64, 128, 256, 512]), "dropout": tune.choice([0.1, 0.2, 0.3, 0.4, 0.5]), "lr": tune.choice([1e-3, 5e-4, 1e-4, 1e-5]), "activation": tune.choice(["relu", "tanh", "sigmoid"]), }, "integration_method": tune.choice(["multiply"]), "alpha": tune.choice([1.5, 2.0, 2.5, 3.0]), "beta": tune.choice([0.1, 0.3, 0.5, 0.7]), } def tune_helper(config): try: method = config["method"].upper() ae_params = config.get("ae_params", { "epochs": 64, "hidden_dim": 4, "dropout": 0.2, "lr": 1e-3, "activation": "relu", }) alpha = config.get("alpha", 2.0) beta = config.get("beta", 0.5) compressed_dim = config.get("compressed_dim", 2) reduced = self._reduce_embeddings(method=method, compressed_dim=compressed_dim,ae_params=ae_params) enhanced = self._integrate_embeddings(reduced, method="multiply", alpha=alpha, beta=beta) common_index = enhanced.index.intersection(self.phenotype_data.index) X = enhanced.loc[common_index].values y = self.phenotype_data.loc[common_index, self.phenotype_col] is_classification = y.dtype != float and y.nunique() <= 20 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=self.seed) if is_classification: model = RandomForestClassifier(random_state=self.seed) model.fit(X_train, y_train.astype(int)) y_pred = model.predict(X_test) score = accuracy_score(y_test, y_pred) else: model = RandomForestRegressor(random_state=self.seed) model.fit(X_train, y_train) y_pred = model.predict(X_test) score = r2_score(y_test, y_pred) tune.report({"score": score}) except Exception as e: self.logger.error(f"Tuning trial failed: {e}") import traceback traceback.print_exc() tune.report({"score": 0.0}) scheduler = ASHAScheduler(metric="score", mode="max", grace_period=1, reduction_factor=2) reporter = CLIReporter(metric_columns=["score", "training_iteration"]) def short_dirname_creator(trial): return f"_{trial.trial_id}" resources = {"cpu": 1, "gpu": 1} if torch.cuda.is_available() else {"cpu": 1, "gpu": 0} try: analysis = tune.run( tune_helper, config=search_config, num_samples=10, scheduler=scheduler, progress_reporter=reporter, storage_path=os.path.expanduser("~/sr"), trial_dirname_creator=short_dirname_creator, resources_per_trial=resources, name="t_sr", verbose=1, ) except TuneError as e: self.logger.warning(f"Tuning completed with failures: {e}") analysis = None if len(e.args) > 1 and hasattr(e.args[1], "get_best_trial"): analysis = e.args[1] best_trial = None best_score = 0.0 if analysis and hasattr(analysis, "get_best_trial"): try: best_trial = analysis.get_best_trial("score", "max", "last") best_score = best_trial.last_result.get("score", 0.0) except Exception as e: self.logger.error(f"Could not retrieve best trial details: {e}") else: self.logger.error("Analysis object is invalid or missing get_best_trial().") self.logger.info(f"Best trial final score: {best_score}") if best_trial: timestamp = datetime.now().strftime("%m%d_%H_%M_%S") best_params_file = Path(self.output_dir) / f"Subjects_tune_{timestamp}.json" with open(best_params_file, "w") as f: json.dump(best_trial.config, f, indent=4) self.logger.info(f"Best Graph Embedding parameters saved to {best_params_file}") else: self.logger.info("No valid best trial config found; skipping save.") return best_trial.config if best_trial else {}
[docs] def get_activation(activation: str): """Maps a string to a PyTorch activation module.""" activation = activation.lower() if activation == "relu": return nn.ReLU() elif activation == "tanh": return nn.Tanh() elif activation == "sigmoid": return nn.Sigmoid() else: raise ValueError(f"Unsupported activation function: {activation}")
[docs] def generate_hidden_dims(init_dim: int, min_dim: int = 2) -> List[int]: """ Generates a list of hidden dimensions by halving the initial dimension until the minimum is reached. For example, if init_dim is 64, this returns [64, 32, 16, 8, 4, 2]. """ dims = [] current = init_dim while current >= min_dim: dims.append(current) current = current // 2 return dims
[docs] class AutoEncoder(nn.Module): """ Generic Autoencoder for configurable reduction. Builds encoder and decoder layers based on a list of hidden dimensions. Allows tuning of dropout, activation, and network architecture. """ def __init__(self, input_dim: int, hidden_dims: int = 64, compressed_dim: int = 1, dropout: float = 0.0, activation: str = "relu"): super(AutoEncoder, self).__init__() self.activation = get_activation(activation) if isinstance(hidden_dims, (int, float)): hidden_dims = generate_hidden_dims(int(hidden_dims)) elif not isinstance(hidden_dims, list): raise ValueError("hidden_dims must be an int, float, or a list of ints.") # encoder: encoder_layers = [] current_dim = input_dim for h_dim in hidden_dims: encoder_layers.append(nn.Linear(current_dim, h_dim)) encoder_layers.append(self.activation) encoder_layers.append(nn.Dropout(dropout)) current_dim = h_dim encoder_layers.append(nn.Linear(current_dim, compressed_dim)) # the * operator unpacks the list into separate arguments self.encoder = nn.Sequential(*encoder_layers) # decoder: decoder_layers = [] current_dim = compressed_dim for h_dim in reversed(hidden_dims): decoder_layers.append(nn.Linear(current_dim, h_dim)) decoder_layers.append(self.activation) decoder_layers.append(nn.Dropout(dropout)) current_dim = h_dim decoder_layers.append(nn.Linear(current_dim, input_dim)) self.decoder = nn.Sequential(*decoder_layers)
[docs] def forward(self, x): z = self.encoder(x) recon = self.decoder(z) return z, recon