Source code for bioneuralnet.utils.preprocess

import pandas as pd
import numpy as np
import networkx as nx

from sklearn.preprocessing import RobustScaler
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.feature_selection import f_classif, f_regression
from statsmodels.stats.multitest import multipletests

from .logger import get_logger
logger = get_logger(__name__)

[docs] def preprocess_clinical(X: pd.DataFrame, y: pd.Series, top_k: int = 10, scale: bool = False, ignore_columns=None) -> pd.DataFrame: """ Preprocess clinical data, handling numeric and categorical features, cleaning, optional scaling, and selecting top features by RandomForest importance. Args: - X (pd.DataFrame): Clinical feature matrix (samples x features) including numeric and categorical columns. - y (pd.Series or pd.DataFrame): Target values; single-column DataFrame or Series of length n_samples. - top_k (int): Number of features to select based on importance. - scale (bool): If True, scale numeric features using RobustScaler; default is False. - ignore_columns (list): List of columns to ignore during preprocessing; default is None. Returns: - pd.DataFrame: Subset of the original features with the selected top_k features plus ignored columns. """ # Align and validate y if isinstance(y, pd.DataFrame): if y.shape[1] != 1: raise ValueError("y must be a Series or single-column DataFrame") y_series = y.iloc[:, 0] elif isinstance(y, pd.Series): y_series = y.copy() else: raise ValueError("y must be a pandas Series or single-column DataFrame") ignore_columns = ignore_columns or [] missing = set(ignore_columns) - set(X.columns) if missing: raise KeyError(f"Ignored columns not in X: {missing}") df_ignore = X[ignore_columns].copy() X = X.drop(columns=ignore_columns) df_numeric = X.select_dtypes(include="number") df_categorical = X.select_dtypes(include=["object", "category", "bool"]) df_numeric_clean = clean_inf_nan(df_numeric) if scale: scaler = RobustScaler() scaled_array = scaler.fit_transform(df_numeric_clean) df_numeric_scaled = pd.DataFrame(scaled_array,columns=df_numeric_clean.columns,index=df_numeric_clean.index) else: df_numeric_scaled = df_numeric_clean.copy() if not df_categorical.empty: df_cat_filled = df_categorical.fillna("Missing").astype(str) df_cat_encoded = pd.get_dummies(df_cat_filled, drop_first=True) else: df_cat_encoded = pd.DataFrame(index=df_numeric_scaled.index) df_combined = pd.concat([df_numeric_scaled, df_cat_encoded], axis=1) df_features = df_combined.loc[:, df_combined.std(axis=0) > 0] if y_series.nunique() <= 10: model = RandomForestClassifier(n_estimators=150,random_state=119,class_weight="balanced") else: model = RandomForestRegressor(n_estimators=150,random_state=119) model.fit(df_features, y_series) importances = model.feature_importances_ feature_names = df_features.columns.tolist() order = list(np.argsort(importances)) descending = [] for i in range(len(order) - 1, -1, -1): descending.append(order[i]) if top_k < len(descending): count = top_k logger.info(f"Selected top {count} features by RandomForest importance") else: count = len(descending) logger.info(f"Selected all {count} features by RandomForest importance") selected_idx = [] for i in range(count): selected_idx.append(descending[i]) selected_columns = [] for idx in selected_idx: selected_columns.append(feature_names[idx]) final_features = pd.concat([df_ignore, df_features[selected_columns]], axis=1) return final_features
[docs] def clean_inf_nan(df: pd.DataFrame) -> pd.DataFrame: """ Replace infinite values with NaN, impute NaNs with the column median, and drop zero-variance columns. Args: - df (pd.DataFrame): Input DataFrame containing numeric columns. Returns: - pd.DataFrame: Cleaned DataFrame with no infinite or NaN values and no zero-variance columns. """ df = df.copy() inf_count = df.isin([np.inf, -np.inf]).sum().sum() df.replace([np.inf, -np.inf], np.nan, inplace=True) nan_before = df.isna().sum().sum() med = df.median(axis=0, skipna=True) df.fillna(med, inplace=True) var_before = df.shape[1] df = df.loc[:, df.std(axis=0, ddof=0) > 0] var_after = df.shape[1] # log logger.info(f"[Inf]: Replaced {inf_count} infinite values") logger.info(f"[NaN]: Replaced {nan_before} NaNs after median imputation") logger.info(f"[Zero-Var]: {var_before-var_after} columns dropped due to zero variance") return df
[docs] def select_top_k_variance(df: pd.DataFrame, k: int = 1000, ddof: int = 0) -> pd.DataFrame: """ Select the top k features with the highest variance., Args: - df (pd.DataFrame): Input DataFrame; non-numeric columns will be ignored. - k (int): Number of top-variance features to select. - ddof (int): Delta degrees of freedom for varianceg calculation; default is 0. Returns: - pd.DataFrame: DataFrame containing only the top k features by variance. """ df_clean = clean_inf_nan(df) num = df_clean.select_dtypes(include=[np.number]).copy() variances = num.var(axis=0, ddof=ddof) k = min(k, len(variances)) top_cols = variances.nlargest(k).index.tolist() logger.info(f"Selected top {len(top_cols)} features by variance") return num[top_cols]
[docs] def select_top_k_correlation(X: pd.DataFrame, y: pd.Series = None, top_k: int = 1000) -> pd.DataFrame: """ Select the top k features by correlation, either supervised (with respect to y) or unsupervised (redundancy minimization). Args: - X (pd.DataFrame): Numeric feature matrix (samples x features). - y (pd.Series, optional): Target values for supervised selection; if None, performs unsupervised selection. - top_k (int): Number of features to select. Returns: - pd.DataFrame: Subset of X containing the selected features. Note: - Correlation computation can be expensive for large datasets. """ clean_df = clean_inf_nan(X) numbers_only = clean_df.select_dtypes(include=[np.number]).copy() # if y is provided then is supervised if y is not None: logger.info("Selecting features by supervised correlation with y") # input validation for y if isinstance(y, pd.DataFrame): if y.shape[1] != 1: raise ValueError("y must be a Series or single-column DataFrame") y = y.iloc[:, 0] elif not isinstance(y, pd.Series): raise ValueError("y must be a pandas Series or DataFrame") correlations = {} for column in numbers_only.columns: col = numbers_only[column].corr(y) if pd.isna(col): correlations[column] = 0.0 else: correlations[column] = abs(col) # descending correlations features = list(correlations.keys()) features.sort(key=correlations.get, reverse=True) select = min(top_k, len(features)) selected = features[: select] # unsupervised else: logger.info("Selecting features by unsupervised correlation") # full absolute correlationm matrix correlations_matrix = numbers_only.corr().abs() # zeroing out the diagonal for i in range(correlations_matrix.shape[0]): correlations_matrix.iat[i, i] = 0.0 # mean correlation for each column correlations_avg = {} columns = list(correlations_matrix.columns) for col in columns: total = 0.0 for others in columns: total += correlations_matrix.at[col, others] avg = total / (len(columns) - 1) correlations_avg[col] = avg features = list(correlations_avg.keys()) features.sort(key=correlations_avg.get) select = min(top_k, len(features)) selected = features[: select] logger.info(f"Selected {len(selected)} features by correlation") return numbers_only[selected]
[docs] def select_top_randomforest(X: pd.DataFrame, y: pd.Series, top_k: int = 1000, seed: int = 119) -> pd.DataFrame: """ Select the top k features using RandomForest feature importances. Args: - X (pd.DataFrame): Numeric feature matrix (samples x features); must contain only numeric columns. - y (pd.Series or pd.DataFrame): Target values; single-column DataFrame or Series. - top_k (int): Number of features to select. - seed (int): Random seed for the RandomForest model; default is 119. Returns: - pd.DataFrame: Subset of X containing the selected top_k features by importance. """ if isinstance(y, pd.DataFrame): if y.shape[1] != 1: raise ValueError("y must be a Series or a single-column DataFrame") y = y.iloc[:, 0] elif not isinstance(y, pd.Series): raise ValueError("y must be a pandas Series or DataFrame") non_numeric = [] for col, dt in X.dtypes.items(): if not pd.api.types.is_numeric_dtype(dt): non_numeric.append(col) if non_numeric: raise ValueError(f"Non-numeric columns detected: {non_numeric}") df_num = clean_inf_nan(X) df_clean = df_num.loc[:, df_num.std(axis=0, ddof=0) > 0] is_classif = (y.nunique() <= 10) if is_classif: Model = RandomForestClassifier else: Model = RandomForestRegressor model = Model(n_estimators=100, random_state=seed) model.fit(df_clean, y) importances = pd.Series(model.feature_importances_, index=df_clean.columns) top_feats = importances.nlargest(min(top_k, len(importances))).index return df_clean[top_feats]
[docs] def top_anova_f_features(X: pd.DataFrame, y: pd.Series,max_features: int, alpha: float = 0.05, task: str = "classification") -> pd.DataFrame: """ Select top features based on ANOVA F-test (with false recovery rate correction). This function is suitable for both classification and regression tasks. Args: - X (pd.DataFrame): Numeric feature matrix (samples x features). - y (pd.Series): Target vector; categorical for classification or continuous for regression. - max_features (int): Maximum number of features to return. - alpha (float): Significance threshold for false recovery rate correction; default is 0.05. - task (str): 'classification' to use f_classif or 'regression' to use f_regression. Returns: - pd.DataFrame: Subset of X with the selected features, padded if necessary. """ X = X.copy() y = y.copy() df_clean = clean_inf_nan(X) num = df_clean.select_dtypes(include=[np.number]).copy() if isinstance(y, pd.DataFrame): y = y.squeeze() if not isinstance(y, pd.Series): raise ValueError("y must be a pandas Series or a single-column DataFrame") y_aligned = y.loc[num.index] if task == "classification": F_vals, p_vals = f_classif(num, y_aligned.values) elif task == "regression": F_vals, p_vals = f_regression(num, y_aligned.values) else: raise ValueError("task must be classification or regression") _, p_adj, _, _ = multipletests(p_vals, alpha=alpha, method="fdr_bh") significant = p_adj < alpha order_all = np.argsort(-F_vals) sig_idx = [] non_sig = [] for i in order_all: if significant[i]: sig_idx.append(i) else: non_sig.append(i) n_sig = len(sig_idx) if n_sig >= max_features: final_idx = sig_idx[:max_features] n_pad = 0 else: n_pad = max_features - n_sig final_idx = sig_idx + non_sig[:n_pad] logger.info(f"Selected {len(final_idx)} features by ANOVA (task={task}), {n_sig} significant, {n_pad} padded") return num.iloc[:, final_idx]
[docs] def prune_network(adjacency_matrix, weight_threshold=0.0): """ Prune a network based on a weight threshold, removing nodes with weak connections. Parameters: - adjacency_matrix (pd.DataFrame): The adjacency matrix of the network. - weight_threshold (float): Minimum weight to keep an edge (default: 0.0). Returns: - pd.DataFrame: """ logger.info(f"Pruning network with weight threshold: {weight_threshold}") full_G = nx.from_pandas_adjacency(adjacency_matrix) total_nodes = full_G.number_of_nodes() total_edges = full_G.number_of_edges() G = full_G.copy() if weight_threshold > 0: edges_to_remove = [] for u, v, d in G.edges(data=True): weight = d.get('weight', 0) if weight < weight_threshold: edges_to_remove.append((u, v)) G.remove_edges_from(edges_to_remove) isolated_nodes = list(nx.isolates(G)) G.remove_nodes_from(isolated_nodes) network_after_prunning = nx.to_pandas_adjacency(G, dtype=float) current_nodes = G.number_of_nodes() current_edges = G.number_of_edges() logger.info(f"Pruning network with weight threshold: {weight_threshold}") logger.info(f"Number of nodes in full network: {total_nodes}") logger.info(f"Number of edges in full network: {total_edges}") logger.info(f"Number of nodes after pruning: {current_nodes}") logger.info(f"Number of edges after pruning: {current_edges}") return network_after_prunning
[docs] def prune_network_by_quantile(adjacency_matrix, quantile=0.5): """ Prune a network by removing edges below a quantile-based weight threshold and dropping isolated nodes. Args: - adjacency_matrix (pd.DataFrame): Weighted adjacency matrix (nodes x nodes). - quantile (float): Quantile in [0,1] to compute weight threshold; default is 0.5. Returns: - pd.DataFrame: Pruned adjacency matrix with edges below the quantile threshold removed. """ logger.info(f"Pruning network using quantile: {quantile}") G = nx.from_pandas_adjacency(adjacency_matrix) weights = [] for u, v, data in G.edges(data=True): weight = data.get('weight', 0) weights.append(weight) if len(weights) == 0: logger.warning("Network contains no edges") return nx.to_pandas_adjacency(G, dtype=float) weight_threshold = np.quantile(weights, quantile) logger.info(f"Computed weight threshold: {weight_threshold} for quantile: {quantile}") edges_to_remove = [] for u, v, data in G.edges(data=True): if data.get('weight', 0) < weight_threshold: edges_to_remove.append((u, v)) G.remove_edges_from(edges_to_remove) isolated_nodes = list(nx.isolates(G)) G.remove_nodes_from(isolated_nodes) pruned_adjacency = nx.to_pandas_adjacency(G, dtype=float) logger.info(f"Number of nodes after pruning: {G.number_of_nodes()}") logger.info(f"Number of edges after pruning: {G.number_of_edges()}") return pruned_adjacency
[docs] def network_remove_low_variance(network: pd.DataFrame, threshold: float = 1e-6) -> pd.DataFrame: """ Remove rows and columns from adjacency matrix where the variance is below a threshold. Parameters: network (pd.DataFrame): Adjacency matrix. threshold (float): Variance threshold. Returns: pd.DataFrame: Filtered adjacency matrix. """ logger.info(f"Removing low-variance rows/columns with threshold {threshold}.") variances = network.var(axis=0) valid_indices = variances[variances > threshold].index filtered_network = network.loc[valid_indices, valid_indices] logger.info(f"Original network shape: {network.shape}, Filtered shape: {filtered_network.shape}") return filtered_network
[docs] def network_remove_high_zero_fraction(network: pd.DataFrame, threshold: float = 0.95) -> pd.DataFrame: """ Remove rows and columns from adjacency matrix where the fraction of zero entries is higher than the threshold. Parameters: network (pd.DataFrame): Adjacency matrix. threshold (float): Zero-fraction threshold. Returns: pd.DataFrame: Filtered adjacency matrix. """ logger.info(f"Removing high zero fraction features with threshold: {threshold}.") zero_fraction = (network == 0).sum(axis=0) / network.shape[0] valid_indices = zero_fraction[zero_fraction < threshold].index filtered_network = network.loc[valid_indices, valid_indices] logger.info(f"Original network shape: {network.shape}, Filtered shape: {filtered_network.shape}") return filtered_network