gaitsetpy
GaitSetPy - A Python package for gait analysis and recognition.
This package provides a comprehensive toolkit for gait data analysis with both a modern class-based architecture and legacy function-based API for backward compatibility.
Features:
- Modular architecture with singleton design pattern
- Plugin-based system for easy extension
- Comprehensive dataset loaders (Daphnet, MobiFall, Arduous, PhysioNet)
- Feature extraction and preprocessing pipelines
- Machine learning models for classification
- Exploratory data analysis tools
- Backward compatibility with legacy API
Architecture:
- Core: Base classes and singleton managers
- Dataset: Data loading and preprocessing
- Features: Feature extraction and analysis
- Preprocessing: Data cleaning and transformation
- EDA: Exploratory data analysis and visualization
- Classification: Machine learning models and evaluation
Maintainer: @aharshit123456
1""" 2GaitSetPy - A Python package for gait analysis and recognition. 3 4This package provides a comprehensive toolkit for gait data analysis with both 5a modern class-based architecture and legacy function-based API for backward compatibility. 6 7Features: 8- Modular architecture with singleton design pattern 9- Plugin-based system for easy extension 10- Comprehensive dataset loaders (Daphnet, MobiFall, Arduous, PhysioNet) 11- Feature extraction and preprocessing pipelines 12- Machine learning models for classification 13- Exploratory data analysis tools 14- Backward compatibility with legacy API 15 16Architecture: 17- Core: Base classes and singleton managers 18- Dataset: Data loading and preprocessing 19- Features: Feature extraction and analysis 20- Preprocessing: Data cleaning and transformation 21- EDA: Exploratory data analysis and visualization 22- Classification: Machine learning models and evaluation 23 24Maintainer: @aharshit123456 25""" 26 27# Core architecture components 28from .core import ( 29 BaseDatasetLoader, 30 BaseFeatureExtractor, 31 BasePreprocessor, 32 BaseEDAAnalyzer, 33 BaseClassificationModel, 34 DatasetManager, 35 FeatureManager, 36 PreprocessingManager, 37 EDAManager, 38 ClassificationManager 39) 40 41# New class-based API 42from .dataset import ( 43 DaphnetLoader, 44 MobiFallLoader, 45 ArduousLoader, 46 PhysioNetLoader, 47 HARUPLoader, 48 get_dataset_manager, 49 get_available_datasets, 50 load_dataset 51) 52 53from .features import ( 54 GaitFeatureExtractor, 55 LBPFeatureExtractor, 56 FourierSeriesFeatureExtractor, 57 PhysioNetFeatureExtractor, 58 get_feature_manager, 59 get_available_extractors, 60 extract_features 61) 62 63from .preprocessing import ( 64 ClippingPreprocessor, 65 NoiseRemovalPreprocessor, 66 OutlierRemovalPreprocessor, 67 BaselineRemovalPreprocessor, 68 DriftRemovalPreprocessor, 69 HighFrequencyNoiseRemovalPreprocessor, 70 LowFrequencyNoiseRemovalPreprocessor, 71 ArtifactRemovalPreprocessor, 72 TrendRemovalPreprocessor, 73 DCOffsetRemovalPreprocessor, 74 get_preprocessing_manager, 75 get_available_preprocessors, 76 preprocess_data, 77 create_preprocessing_pipeline 78) 79 80from .eda import ( 81 DaphnetVisualizationAnalyzer, 82 SensorStatisticsAnalyzer, 83 get_eda_manager, 84 get_available_analyzers, 85 analyze_data, 86 visualize_data, 87 plot_daphnet_data, 88 analyze_sensor_statistics, 89 plot_sensor_features 90) 91 92from .classification import ( 93 RandomForestModel, 94 get_classification_manager, 95 get_available_models, 96 train_model, 97 predict, 98 evaluate_model_performance, 99 create_random_forest, 100 train_random_forest 101) 102 103# Legacy API for backward compatibility 104# Explicitly import all public exports from submodules instead of using wildcard imports 105# This improves code clarity and makes it easier to track what's being exported 106 107# Dataset legacy functions 108from .dataset import ( 109 load_daphnet_data, 110 create_sliding_windows, 111 load_mobifall_data, 112 load_arduous_data, 113 load_physionet_data, 114 create_physionet_windows, 115 load_harup_data, 116 create_harup_windows, 117 extract_harup_features, 118 download_dataset, 119 extract_dataset, 120 sliding_window 121) 122 123# Features legacy functions 124from .features import ( 125 calculate_mean, 126 calculate_standard_deviation, 127 calculate_variance, 128 calculate_skewness, 129 calculate_kurtosis, 130 calculate_root_mean_square, 131 calculate_range, 132 calculate_median, 133 calculate_mode, 134 calculate_mean_absolute_value, 135 calculate_median_absolute_deviation, 136 calculate_peak_height, 137 calculate_stride_times, 138 calculate_step_time, 139 calculate_cadence, 140 calculate_freezing_index, 141 calculate_dominant_frequency, 142 calculate_peak_frequency, 143 calculate_power_spectral_entropy, 144 calculate_principal_harmonic_frequency, 145 calculate_entropy, 146 calculate_interquartile_range, 147 calculate_correlation, 148 calculate_auto_regression_coefficients, 149 get_mean_for_windows, 150 get_standard_deviation_for_windows, 151 get_variance_for_windows 152) 153 154# Preprocessing legacy functions 155from .preprocessing import ( 156 clip_sliding_windows, 157 remove_noise, 158 remove_outliers, 159 remove_baseline, 160 remove_drift, 161 remove_artifacts, 162 remove_trend, 163 remove_dc_offset, 164 remove_high_frequency_noise, 165 remove_low_frequency_noise 166) 167 168# EDA legacy functions 169from .eda import ( 170 plot_thigh_data, 171 plot_shank_data, 172 plot_trunk_data, 173 plot_all_data, 174 plot_all_thigh_data, 175 plot_all_shank_data, 176 plot_all_trunk_data, 177 plot_all_datasets, 178 plot_sensor_with_features 179) 180 181# Classification legacy functions 182from .classification import ( 183 create_random_forest_model, 184 preprocess_features, 185 evaluate_model 186) 187 188__version__ = "0.2.2" # Updated version to reflect new architecture 189__author__ = "Harshit Agarwal | Alohomora Labs" 190 191# Convenient access to all managers 192def get_all_managers(): 193 """ 194 Get all singleton managers. 195 196 Returns: 197 Dictionary containing all manager instances 198 """ 199 return { 200 'dataset': DatasetManager(), 201 'feature': FeatureManager(), 202 'preprocessing': PreprocessingManager(), 203 'eda': EDAManager(), 204 'classification': ClassificationManager() 205 } 206 207# System information 208def get_system_info(): 209 """ 210 Get information about the available components in the system. 211 212 Returns: 213 Dictionary containing system information 214 """ 215 return { 216 'version': __version__, 217 'author': __author__, 218 'available_datasets': get_available_datasets(), 219 'available_extractors': get_available_extractors(), 220 'available_preprocessors': get_available_preprocessors(), 221 'available_analyzers': get_available_analyzers(), 222 'available_models': get_available_models(), 223 'architecture': 'Modular with singleton design pattern' 224 } 225 226# Shortcut functions for common workflows 227def load_and_analyze_daphnet(data_dir: str, sensor_type: str = 'all', window_size: int = 192): 228 """ 229 Complete workflow for loading and analyzing Daphnet data. 230 231 Args: 232 data_dir: Directory containing the Daphnet dataset 233 sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk') 234 window_size: Size of sliding windows for feature extraction 235 236 Returns: 237 Dictionary containing data, features, and analysis results 238 """ 239 # Load dataset 240 loader = DaphnetLoader() 241 data, names = loader.load_data(data_dir) 242 243 # Create sliding windows 244 windows = loader.create_sliding_windows(data, names, window_size=window_size) 245 246 # Extract features 247 extractor = GaitFeatureExtractor() 248 features = extractor.extract_features(windows[0]['windows'], fs=64) 249 250 # Analyze data 251 analyzer = DaphnetVisualizationAnalyzer() 252 analysis = analyzer.analyze(data) 253 254 return { 255 'data': data, 256 'names': names, 257 'windows': windows, 258 'features': features, 259 'analysis': analysis, 260 'loader': loader, 261 'extractor': extractor, 262 'analyzer': analyzer 263 } 264 265def load_and_analyze_physionet(data_dir: str, window_size: int = 600, step_size: int = 100): 266 """ 267 Complete workflow for loading and analyzing PhysioNet VGRF data. 268 269 Args: 270 data_dir: Directory to store/find the PhysioNet dataset 271 window_size: Size of sliding windows for feature extraction (default: 600) 272 step_size: Step size for sliding windows (default: 100) 273 274 Returns: 275 Dictionary containing data, features, and analysis results 276 """ 277 # Load dataset 278 loader = PhysioNetLoader() 279 data, names = loader.load_data(data_dir) 280 281 # Create sliding windows 282 windows = loader.create_sliding_windows(data, names, window_size=window_size, step_size=step_size) 283 284 # Extract PhysioNet-specific features 285 extractor = PhysioNetFeatureExtractor() 286 all_features = [] 287 288 for window_dict in windows: 289 if 'windows' in window_dict: 290 features = extractor.extract_features(window_dict['windows'], fs=100) 291 all_features.append({ 292 'name': window_dict['name'], 293 'features': features, 294 'metadata': window_dict.get('metadata', {}) 295 }) 296 297 return { 298 'data': data, 299 'names': names, 300 'windows': windows, 301 'features': all_features, 302 'labels': loader.get_labels(), 303 'loader': loader, 304 'extractor': extractor 305 } 306 307def train_gait_classifier(features, model_type: str = 'random_forest', **kwargs): 308 """ 309 Train a gait classification model. 310 311 Args: 312 features: List of feature dictionaries 313 model_type: Type of model to train ('random_forest', etc.) 314 **kwargs: Additional arguments for model training 315 316 Returns: 317 Trained model instance 318 """ 319 if model_type == 'random_forest': 320 model = RandomForestModel(**kwargs) 321 model.train(features, **kwargs) 322 return model 323 else: 324 raise ValueError(f"Model type '{model_type}' not supported") 325 326__all__ = [ 327 # Core architecture 328 'BaseDatasetLoader', 329 'BaseFeatureExtractor', 330 'BasePreprocessor', 331 'BaseEDAAnalyzer', 332 'BaseClassificationModel', 333 'DatasetManager', 334 'FeatureManager', 335 'PreprocessingManager', 336 'EDAManager', 337 'ClassificationManager', 338 339 # New class-based API 340 'DaphnetLoader', 341 'MobiFallLoader', 342 'ArduousLoader', 343 'PhysioNetLoader', 344 'GaitFeatureExtractor', 345 'LBPFeatureExtractor', 346 'FourierSeriesFeatureExtractor', 347 'PhysioNetFeatureExtractor', 348 'ClippingPreprocessor', 349 'NoiseRemovalPreprocessor', 350 'OutlierRemovalPreprocessor', 351 'BaselineRemovalPreprocessor', 352 'DriftRemovalPreprocessor', 353 'HighFrequencyNoiseRemovalPreprocessor', 354 'LowFrequencyNoiseRemovalPreprocessor', 355 'ArtifactRemovalPreprocessor', 356 'TrendRemovalPreprocessor', 357 'DCOffsetRemovalPreprocessor', 358 'DaphnetVisualizationAnalyzer', 359 'SensorStatisticsAnalyzer', 360 'RandomForestModel', 361 362 # Manager access functions 363 'get_dataset_manager', 364 'get_feature_manager', 365 'get_preprocessing_manager', 366 'get_eda_manager', 367 'get_classification_manager', 368 'get_all_managers', 369 370 # Utility functions 371 'get_available_datasets', 372 'get_available_extractors', 373 'get_available_preprocessors', 374 'get_available_analyzers', 375 'get_available_models', 376 'get_system_info', 377 378 # Workflow functions 379 'load_and_analyze_daphnet', 380 'load_and_analyze_physionet', 381 'train_gait_classifier', 382 383 # Legacy dataset functions 384 'load_daphnet_data', 385 'create_sliding_windows', 386 'load_mobifall_data', 387 'load_arduous_data', 388 'load_physionet_data', 389 'create_physionet_windows', 390 'load_harup_data', 391 'create_harup_windows', 392 'extract_harup_features', 393 'download_dataset', 394 'extract_dataset', 395 'sliding_window', 396 397 # Legacy feature functions 398 'calculate_mean', 399 'calculate_standard_deviation', 400 'calculate_variance', 401 'calculate_skewness', 402 'calculate_kurtosis', 403 'calculate_root_mean_square', 404 'calculate_range', 405 'calculate_median', 406 'calculate_mode', 407 'calculate_mean_absolute_value', 408 'calculate_median_absolute_deviation', 409 'calculate_peak_height', 410 'calculate_stride_times', 411 'calculate_step_time', 412 'calculate_cadence', 413 'calculate_freezing_index', 414 'calculate_dominant_frequency', 415 'calculate_peak_frequency', 416 'calculate_power_spectral_entropy', 417 'calculate_principal_harmonic_frequency', 418 'calculate_entropy', 419 'calculate_interquartile_range', 420 'calculate_correlation', 421 'calculate_auto_regression_coefficients', 422 'get_mean_for_windows', 423 'get_standard_deviation_for_windows', 424 'get_variance_for_windows', 425 426 # Legacy preprocessing functions 427 'clip_sliding_windows', 428 'remove_noise', 429 'remove_outliers', 430 'remove_baseline', 431 'remove_drift', 432 'remove_artifacts', 433 'remove_trend', 434 'remove_dc_offset', 435 'remove_high_frequency_noise', 436 'remove_low_frequency_noise', 437 438 # Legacy EDA functions 439 'plot_thigh_data', 440 'plot_shank_data', 441 'plot_trunk_data', 442 'plot_all_data', 443 'plot_all_thigh_data', 444 'plot_all_shank_data', 445 'plot_all_trunk_data', 446 'plot_all_datasets', 447 'plot_sensor_with_features', 448 449 # Legacy classification functions 450 'create_random_forest_model', 451 'preprocess_features', 452 'evaluate_model', 453]
17class BaseDatasetLoader(ABC): 18 """ 19 Base class for all dataset loaders. 20 21 All dataset loaders should inherit from this class and implement the required methods. 22 """ 23 24 def __init__(self, name: str, description: str = ""): 25 """ 26 Initialize the dataset loader. 27 28 Args: 29 name: Name of the dataset 30 description: Description of the dataset 31 """ 32 self.name = name 33 self.description = description 34 self.data = None 35 self.metadata = {} 36 37 @abstractmethod 38 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 39 """ 40 Load dataset from the specified directory. 41 42 Args: 43 data_dir: Directory containing the dataset 44 **kwargs: Additional arguments specific to the dataset 45 46 Returns: 47 Tuple of (data_list, names_list) 48 """ 49 pass 50 51 @abstractmethod 52 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 53 window_size: int = 192, step_size: int = 32) -> List[Dict]: 54 """ 55 Create sliding windows from the loaded data. 56 57 Args: 58 data: List of DataFrames 59 names: List of names corresponding to the data 60 window_size: Size of each sliding window 61 step_size: Step size for sliding windows 62 63 Returns: 64 List of dictionaries containing sliding windows 65 """ 66 pass 67 68 @abstractmethod 69 def get_supported_formats(self) -> List[str]: 70 """ 71 Get list of supported file formats. 72 73 Returns: 74 List of supported file extensions 75 """ 76 pass 77 78 def get_info(self) -> Dict[str, Any]: 79 """ 80 Get information about the dataset. 81 82 Returns: 83 Dictionary containing dataset information 84 """ 85 return { 86 'name': self.name, 87 'description': self.description, 88 'metadata': self.metadata, 89 'supported_formats': self.get_supported_formats() 90 }
Base class for all dataset loaders.
All dataset loaders should inherit from this class and implement the required methods.
24 def __init__(self, name: str, description: str = ""): 25 """ 26 Initialize the dataset loader. 27 28 Args: 29 name: Name of the dataset 30 description: Description of the dataset 31 """ 32 self.name = name 33 self.description = description 34 self.data = None 35 self.metadata = {}
Initialize the dataset loader.
Args: name: Name of the dataset description: Description of the dataset
37 @abstractmethod 38 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 39 """ 40 Load dataset from the specified directory. 41 42 Args: 43 data_dir: Directory containing the dataset 44 **kwargs: Additional arguments specific to the dataset 45 46 Returns: 47 Tuple of (data_list, names_list) 48 """ 49 pass
Load dataset from the specified directory.
Args: data_dir: Directory containing the dataset **kwargs: Additional arguments specific to the dataset
Returns: Tuple of (data_list, names_list)
51 @abstractmethod 52 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 53 window_size: int = 192, step_size: int = 32) -> List[Dict]: 54 """ 55 Create sliding windows from the loaded data. 56 57 Args: 58 data: List of DataFrames 59 names: List of names corresponding to the data 60 window_size: Size of each sliding window 61 step_size: Step size for sliding windows 62 63 Returns: 64 List of dictionaries containing sliding windows 65 """ 66 pass
Create sliding windows from the loaded data.
Args: data: List of DataFrames names: List of names corresponding to the data window_size: Size of each sliding window step_size: Step size for sliding windows
Returns: List of dictionaries containing sliding windows
68 @abstractmethod 69 def get_supported_formats(self) -> List[str]: 70 """ 71 Get list of supported file formats. 72 73 Returns: 74 List of supported file extensions 75 """ 76 pass
Get list of supported file formats.
Returns: List of supported file extensions
78 def get_info(self) -> Dict[str, Any]: 79 """ 80 Get information about the dataset. 81 82 Returns: 83 Dictionary containing dataset information 84 """ 85 return { 86 'name': self.name, 87 'description': self.description, 88 'metadata': self.metadata, 89 'supported_formats': self.get_supported_formats() 90 }
Get information about the dataset.
Returns: Dictionary containing dataset information
93class BaseFeatureExtractor(ABC): 94 """ 95 Base class for all feature extractors. 96 97 All feature extractors should inherit from this class and implement the required methods. 98 """ 99 100 def __init__(self, name: str, description: str = ""): 101 """ 102 Initialize the feature extractor. 103 104 Args: 105 name: Name of the feature extractor 106 description: Description of the feature extractor 107 """ 108 self.name = name 109 self.description = description 110 self.config = {} 111 112 @abstractmethod 113 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 114 """ 115 Extract features from sliding windows. 116 117 Args: 118 windows: List of sliding window dictionaries 119 fs: Sampling frequency 120 **kwargs: Additional arguments for feature extraction 121 122 Returns: 123 List of feature dictionaries 124 """ 125 pass 126 127 @abstractmethod 128 def get_feature_names(self) -> List[str]: 129 """ 130 Get names of features extracted by this extractor. 131 132 Returns: 133 List of feature names 134 """ 135 pass 136 137 def configure(self, config: Dict[str, Any]): 138 """ 139 Configure the feature extractor. 140 141 Args: 142 config: Configuration dictionary 143 """ 144 self.config.update(config) 145 146 def get_info(self) -> Dict[str, Any]: 147 """ 148 Get information about the feature extractor. 149 150 Returns: 151 Dictionary containing feature extractor information 152 """ 153 return { 154 'name': self.name, 155 'description': self.description, 156 'config': self.config, 157 'feature_names': self.get_feature_names() 158 }
Base class for all feature extractors.
All feature extractors should inherit from this class and implement the required methods.
100 def __init__(self, name: str, description: str = ""): 101 """ 102 Initialize the feature extractor. 103 104 Args: 105 name: Name of the feature extractor 106 description: Description of the feature extractor 107 """ 108 self.name = name 109 self.description = description 110 self.config = {}
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
112 @abstractmethod 113 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 114 """ 115 Extract features from sliding windows. 116 117 Args: 118 windows: List of sliding window dictionaries 119 fs: Sampling frequency 120 **kwargs: Additional arguments for feature extraction 121 122 Returns: 123 List of feature dictionaries 124 """ 125 pass
Extract features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments for feature extraction
Returns: List of feature dictionaries
127 @abstractmethod 128 def get_feature_names(self) -> List[str]: 129 """ 130 Get names of features extracted by this extractor. 131 132 Returns: 133 List of feature names 134 """ 135 pass
Get names of features extracted by this extractor.
Returns: List of feature names
137 def configure(self, config: Dict[str, Any]): 138 """ 139 Configure the feature extractor. 140 141 Args: 142 config: Configuration dictionary 143 """ 144 self.config.update(config)
Configure the feature extractor.
Args: config: Configuration dictionary
146 def get_info(self) -> Dict[str, Any]: 147 """ 148 Get information about the feature extractor. 149 150 Returns: 151 Dictionary containing feature extractor information 152 """ 153 return { 154 'name': self.name, 155 'description': self.description, 156 'config': self.config, 157 'feature_names': self.get_feature_names() 158 }
Get information about the feature extractor.
Returns: Dictionary containing feature extractor information
161class BasePreprocessor(ABC): 162 """ 163 Base class for all preprocessors. 164 165 All preprocessors should inherit from this class and implement the required methods. 166 """ 167 168 def __init__(self, name: str, description: str = ""): 169 """ 170 Initialize the preprocessor. 171 172 Args: 173 name: Name of the preprocessor 174 description: Description of the preprocessor 175 """ 176 self.name = name 177 self.description = description 178 self.config = {} 179 self.fitted = False 180 181 @abstractmethod 182 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 183 """ 184 Fit the preprocessor to the data. 185 186 Args: 187 data: Input data to fit on 188 **kwargs: Additional arguments for fitting 189 """ 190 pass 191 192 @abstractmethod 193 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 194 """ 195 Transform the data using the fitted preprocessor. 196 197 Args: 198 data: Input data to transform 199 **kwargs: Additional arguments for transformation 200 201 Returns: 202 Transformed data 203 """ 204 pass 205 206 def fit_transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 207 """ 208 Fit the preprocessor and transform the data. 209 210 Args: 211 data: Input data to fit and transform 212 **kwargs: Additional arguments 213 214 Returns: 215 Transformed data 216 """ 217 self.fit(data, **kwargs) 218 return self.transform(data, **kwargs) 219 220 def configure(self, config: Dict[str, Any]): 221 """ 222 Configure the preprocessor. 223 224 Args: 225 config: Configuration dictionary 226 """ 227 self.config.update(config) 228 229 def get_info(self) -> Dict[str, Any]: 230 """ 231 Get information about the preprocessor. 232 233 Returns: 234 Dictionary containing preprocessor information 235 """ 236 return { 237 'name': self.name, 238 'description': self.description, 239 'config': self.config, 240 'fitted': self.fitted 241 }
Base class for all preprocessors.
All preprocessors should inherit from this class and implement the required methods.
168 def __init__(self, name: str, description: str = ""): 169 """ 170 Initialize the preprocessor. 171 172 Args: 173 name: Name of the preprocessor 174 description: Description of the preprocessor 175 """ 176 self.name = name 177 self.description = description 178 self.config = {} 179 self.fitted = False
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
181 @abstractmethod 182 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 183 """ 184 Fit the preprocessor to the data. 185 186 Args: 187 data: Input data to fit on 188 **kwargs: Additional arguments for fitting 189 """ 190 pass
Fit the preprocessor to the data.
Args: data: Input data to fit on **kwargs: Additional arguments for fitting
192 @abstractmethod 193 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 194 """ 195 Transform the data using the fitted preprocessor. 196 197 Args: 198 data: Input data to transform 199 **kwargs: Additional arguments for transformation 200 201 Returns: 202 Transformed data 203 """ 204 pass
Transform the data using the fitted preprocessor.
Args: data: Input data to transform **kwargs: Additional arguments for transformation
Returns: Transformed data
206 def fit_transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 207 """ 208 Fit the preprocessor and transform the data. 209 210 Args: 211 data: Input data to fit and transform 212 **kwargs: Additional arguments 213 214 Returns: 215 Transformed data 216 """ 217 self.fit(data, **kwargs) 218 return self.transform(data, **kwargs)
Fit the preprocessor and transform the data.
Args: data: Input data to fit and transform **kwargs: Additional arguments
Returns: Transformed data
220 def configure(self, config: Dict[str, Any]): 221 """ 222 Configure the preprocessor. 223 224 Args: 225 config: Configuration dictionary 226 """ 227 self.config.update(config)
Configure the preprocessor.
Args: config: Configuration dictionary
229 def get_info(self) -> Dict[str, Any]: 230 """ 231 Get information about the preprocessor. 232 233 Returns: 234 Dictionary containing preprocessor information 235 """ 236 return { 237 'name': self.name, 238 'description': self.description, 239 'config': self.config, 240 'fitted': self.fitted 241 }
Get information about the preprocessor.
Returns: Dictionary containing preprocessor information
244class BaseEDAAnalyzer(ABC): 245 """ 246 Base class for all EDA analyzers. 247 248 All EDA analyzers should inherit from this class and implement the required methods. 249 """ 250 251 def __init__(self, name: str, description: str = ""): 252 """ 253 Initialize the EDA analyzer. 254 255 Args: 256 name: Name of the EDA analyzer 257 description: Description of the EDA analyzer 258 """ 259 self.name = name 260 self.description = description 261 self.config = {} 262 263 @abstractmethod 264 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 265 """ 266 Perform analysis on the data. 267 268 Args: 269 data: Input data to analyze 270 **kwargs: Additional arguments for analysis 271 272 Returns: 273 Dictionary containing analysis results 274 """ 275 pass 276 277 @abstractmethod 278 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 279 """ 280 Create visualizations of the data. 281 282 Args: 283 data: Input data to visualize 284 **kwargs: Additional arguments for visualization 285 """ 286 pass 287 288 def configure(self, config: Dict[str, Any]): 289 """ 290 Configure the EDA analyzer. 291 292 Args: 293 config: Configuration dictionary 294 """ 295 self.config.update(config) 296 297 def get_info(self) -> Dict[str, Any]: 298 """ 299 Get information about the EDA analyzer. 300 301 Returns: 302 Dictionary containing EDA analyzer information 303 """ 304 return { 305 'name': self.name, 306 'description': self.description, 307 'config': self.config 308 }
Base class for all EDA analyzers.
All EDA analyzers should inherit from this class and implement the required methods.
251 def __init__(self, name: str, description: str = ""): 252 """ 253 Initialize the EDA analyzer. 254 255 Args: 256 name: Name of the EDA analyzer 257 description: Description of the EDA analyzer 258 """ 259 self.name = name 260 self.description = description 261 self.config = {}
Initialize the EDA analyzer.
Args: name: Name of the EDA analyzer description: Description of the EDA analyzer
263 @abstractmethod 264 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 265 """ 266 Perform analysis on the data. 267 268 Args: 269 data: Input data to analyze 270 **kwargs: Additional arguments for analysis 271 272 Returns: 273 Dictionary containing analysis results 274 """ 275 pass
Perform analysis on the data.
Args: data: Input data to analyze **kwargs: Additional arguments for analysis
Returns: Dictionary containing analysis results
277 @abstractmethod 278 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 279 """ 280 Create visualizations of the data. 281 282 Args: 283 data: Input data to visualize 284 **kwargs: Additional arguments for visualization 285 """ 286 pass
Create visualizations of the data.
Args: data: Input data to visualize **kwargs: Additional arguments for visualization
288 def configure(self, config: Dict[str, Any]): 289 """ 290 Configure the EDA analyzer. 291 292 Args: 293 config: Configuration dictionary 294 """ 295 self.config.update(config)
Configure the EDA analyzer.
Args: config: Configuration dictionary
297 def get_info(self) -> Dict[str, Any]: 298 """ 299 Get information about the EDA analyzer. 300 301 Returns: 302 Dictionary containing EDA analyzer information 303 """ 304 return { 305 'name': self.name, 306 'description': self.description, 307 'config': self.config 308 }
Get information about the EDA analyzer.
Returns: Dictionary containing EDA analyzer information
311class BaseClassificationModel(ABC): 312 """ 313 Base class for all classification models. 314 315 All classification models should inherit from this class and implement the required methods. 316 """ 317 318 def __init__(self, name: str, description: str = ""): 319 """ 320 Initialize the classification model. 321 322 Args: 323 name: Name of the classification model 324 description: Description of the classification model 325 """ 326 self.name = name 327 self.description = description 328 self.model = None 329 self.config = {} 330 self.trained = False 331 332 @abstractmethod 333 def train(self, features: List[Dict], **kwargs): 334 """ 335 Train the classification model. 336 337 Args: 338 features: List of feature dictionaries 339 **kwargs: Additional arguments for training 340 """ 341 pass 342 343 @abstractmethod 344 def predict(self, features: List[Dict], **kwargs) -> np.ndarray: 345 """ 346 Make predictions using the trained model. 347 348 Args: 349 features: List of feature dictionaries 350 **kwargs: Additional arguments for prediction 351 352 Returns: 353 Array of predictions 354 """ 355 pass 356 357 @abstractmethod 358 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 359 """ 360 Evaluate the model performance. 361 362 Args: 363 features: List of feature dictionaries 364 **kwargs: Additional arguments for evaluation 365 366 Returns: 367 Dictionary containing evaluation metrics 368 """ 369 pass 370 371 @abstractmethod 372 def save_model(self, filepath: str): 373 """ 374 Save the trained model to a file. 375 376 Args: 377 filepath: Path to save the model 378 """ 379 pass 380 381 @abstractmethod 382 def load_model(self, filepath: str): 383 """ 384 Load a trained model from a file. 385 386 Args: 387 filepath: Path to the saved model 388 """ 389 pass 390 391 def configure(self, config: Dict[str, Any]): 392 """ 393 Configure the classification model. 394 395 Args: 396 config: Configuration dictionary 397 """ 398 self.config.update(config) 399 400 def get_info(self) -> Dict[str, Any]: 401 """ 402 Get information about the classification model. 403 404 Returns: 405 Dictionary containing model information 406 """ 407 return { 408 'name': self.name, 409 'description': self.description, 410 'config': self.config, 411 'trained': self.trained 412 }
Base class for all classification models.
All classification models should inherit from this class and implement the required methods.
318 def __init__(self, name: str, description: str = ""): 319 """ 320 Initialize the classification model. 321 322 Args: 323 name: Name of the classification model 324 description: Description of the classification model 325 """ 326 self.name = name 327 self.description = description 328 self.model = None 329 self.config = {} 330 self.trained = False
Initialize the classification model.
Args: name: Name of the classification model description: Description of the classification model
332 @abstractmethod 333 def train(self, features: List[Dict], **kwargs): 334 """ 335 Train the classification model. 336 337 Args: 338 features: List of feature dictionaries 339 **kwargs: Additional arguments for training 340 """ 341 pass
Train the classification model.
Args: features: List of feature dictionaries **kwargs: Additional arguments for training
343 @abstractmethod 344 def predict(self, features: List[Dict], **kwargs) -> np.ndarray: 345 """ 346 Make predictions using the trained model. 347 348 Args: 349 features: List of feature dictionaries 350 **kwargs: Additional arguments for prediction 351 352 Returns: 353 Array of predictions 354 """ 355 pass
Make predictions using the trained model.
Args: features: List of feature dictionaries **kwargs: Additional arguments for prediction
Returns: Array of predictions
357 @abstractmethod 358 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 359 """ 360 Evaluate the model performance. 361 362 Args: 363 features: List of feature dictionaries 364 **kwargs: Additional arguments for evaluation 365 366 Returns: 367 Dictionary containing evaluation metrics 368 """ 369 pass
Evaluate the model performance.
Args: features: List of feature dictionaries **kwargs: Additional arguments for evaluation
Returns: Dictionary containing evaluation metrics
371 @abstractmethod 372 def save_model(self, filepath: str): 373 """ 374 Save the trained model to a file. 375 376 Args: 377 filepath: Path to save the model 378 """ 379 pass
Save the trained model to a file.
Args: filepath: Path to save the model
381 @abstractmethod 382 def load_model(self, filepath: str): 383 """ 384 Load a trained model from a file. 385 386 Args: 387 filepath: Path to the saved model 388 """ 389 pass
Load a trained model from a file.
Args: filepath: Path to the saved model
391 def configure(self, config: Dict[str, Any]): 392 """ 393 Configure the classification model. 394 395 Args: 396 config: Configuration dictionary 397 """ 398 self.config.update(config)
Configure the classification model.
Args: config: Configuration dictionary
400 def get_info(self) -> Dict[str, Any]: 401 """ 402 Get information about the classification model. 403 404 Returns: 405 Dictionary containing model information 406 """ 407 return { 408 'name': self.name, 409 'description': self.description, 410 'config': self.config, 411 'trained': self.trained 412 }
Get information about the classification model.
Returns: Dictionary containing model information
138class DatasetManager(BaseManager): 139 """ 140 Singleton manager for dataset loaders. 141 """ 142 143 def register_dataset(self, name: str, dataset_class: Type[BaseDatasetLoader]): 144 """ 145 Register a dataset loader. 146 147 Args: 148 name: Name to register the dataset under 149 dataset_class: Dataset loader class 150 """ 151 if not issubclass(dataset_class, BaseDatasetLoader): 152 raise ValueError(f"Dataset class must inherit from BaseDatasetLoader") 153 self.register(name, dataset_class) 154 155 def load_dataset(self, name: str, data_dir: str, **kwargs) -> BaseDatasetLoader: 156 """ 157 Load a dataset using the registered loader. 158 159 Args: 160 name: Name of the dataset loader 161 data_dir: Directory containing the dataset 162 **kwargs: Additional arguments for the loader 163 164 Returns: 165 Dataset loader instance with loaded data 166 """ 167 loader = self.create_instance(name, name, f"{name} dataset loader") 168 loader.load_data(data_dir, **kwargs) 169 return loader
Singleton manager for dataset loaders.
143 def register_dataset(self, name: str, dataset_class: Type[BaseDatasetLoader]): 144 """ 145 Register a dataset loader. 146 147 Args: 148 name: Name to register the dataset under 149 dataset_class: Dataset loader class 150 """ 151 if not issubclass(dataset_class, BaseDatasetLoader): 152 raise ValueError(f"Dataset class must inherit from BaseDatasetLoader") 153 self.register(name, dataset_class)
Register a dataset loader.
Args: name: Name to register the dataset under dataset_class: Dataset loader class
155 def load_dataset(self, name: str, data_dir: str, **kwargs) -> BaseDatasetLoader: 156 """ 157 Load a dataset using the registered loader. 158 159 Args: 160 name: Name of the dataset loader 161 data_dir: Directory containing the dataset 162 **kwargs: Additional arguments for the loader 163 164 Returns: 165 Dataset loader instance with loaded data 166 """ 167 loader = self.create_instance(name, name, f"{name} dataset loader") 168 loader.load_data(data_dir, **kwargs) 169 return loader
Load a dataset using the registered loader.
Args: name: Name of the dataset loader data_dir: Directory containing the dataset **kwargs: Additional arguments for the loader
Returns: Dataset loader instance with loaded data
172class FeatureManager(BaseManager): 173 """ 174 Singleton manager for feature extractors. 175 """ 176 177 def register_extractor(self, name: str, extractor_class: Type[BaseFeatureExtractor]): 178 """ 179 Register a feature extractor. 180 181 Args: 182 name: Name to register the extractor under 183 extractor_class: Feature extractor class 184 """ 185 if not issubclass(extractor_class, BaseFeatureExtractor): 186 raise ValueError(f"Extractor class must inherit from BaseFeatureExtractor") 187 self.register(name, extractor_class) 188 189 def extract_features(self, extractor_name: str, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 190 """ 191 Extract features using the specified extractor. 192 193 Args: 194 extractor_name: Name of the feature extractor 195 windows: List of sliding window dictionaries 196 fs: Sampling frequency 197 **kwargs: Additional arguments for feature extraction 198 199 Returns: 200 List of feature dictionaries 201 """ 202 extractor = self.get_cached_instance(extractor_name, extractor_name, f"{extractor_name} feature extractor") 203 return extractor.extract_features(windows, fs, **kwargs)
Singleton manager for feature extractors.
177 def register_extractor(self, name: str, extractor_class: Type[BaseFeatureExtractor]): 178 """ 179 Register a feature extractor. 180 181 Args: 182 name: Name to register the extractor under 183 extractor_class: Feature extractor class 184 """ 185 if not issubclass(extractor_class, BaseFeatureExtractor): 186 raise ValueError(f"Extractor class must inherit from BaseFeatureExtractor") 187 self.register(name, extractor_class)
Register a feature extractor.
Args: name: Name to register the extractor under extractor_class: Feature extractor class
189 def extract_features(self, extractor_name: str, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 190 """ 191 Extract features using the specified extractor. 192 193 Args: 194 extractor_name: Name of the feature extractor 195 windows: List of sliding window dictionaries 196 fs: Sampling frequency 197 **kwargs: Additional arguments for feature extraction 198 199 Returns: 200 List of feature dictionaries 201 """ 202 extractor = self.get_cached_instance(extractor_name, extractor_name, f"{extractor_name} feature extractor") 203 return extractor.extract_features(windows, fs, **kwargs)
Extract features using the specified extractor.
Args: extractor_name: Name of the feature extractor windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments for feature extraction
Returns: List of feature dictionaries
206class PreprocessingManager(BaseManager): 207 """ 208 Singleton manager for preprocessors. 209 """ 210 211 def register_preprocessor(self, name: str, preprocessor_class: Type[BasePreprocessor]): 212 """ 213 Register a preprocessor. 214 215 Args: 216 name: Name to register the preprocessor under 217 preprocessor_class: Preprocessor class 218 """ 219 if not issubclass(preprocessor_class, BasePreprocessor): 220 raise ValueError(f"Preprocessor class must inherit from BasePreprocessor") 221 self.register(name, preprocessor_class) 222 223 def preprocess_data(self, preprocessor_name: str, data: Any, **kwargs) -> Any: 224 """ 225 Preprocess data using the specified preprocessor. 226 227 Args: 228 preprocessor_name: Name of the preprocessor 229 data: Input data to preprocess 230 **kwargs: Additional arguments for preprocessing 231 232 Returns: 233 Preprocessed data 234 """ 235 preprocessor = self.get_cached_instance(preprocessor_name, preprocessor_name, f"{preprocessor_name} preprocessor") 236 return preprocessor.fit_transform(data, **kwargs)
Singleton manager for preprocessors.
211 def register_preprocessor(self, name: str, preprocessor_class: Type[BasePreprocessor]): 212 """ 213 Register a preprocessor. 214 215 Args: 216 name: Name to register the preprocessor under 217 preprocessor_class: Preprocessor class 218 """ 219 if not issubclass(preprocessor_class, BasePreprocessor): 220 raise ValueError(f"Preprocessor class must inherit from BasePreprocessor") 221 self.register(name, preprocessor_class)
Register a preprocessor.
Args: name: Name to register the preprocessor under preprocessor_class: Preprocessor class
223 def preprocess_data(self, preprocessor_name: str, data: Any, **kwargs) -> Any: 224 """ 225 Preprocess data using the specified preprocessor. 226 227 Args: 228 preprocessor_name: Name of the preprocessor 229 data: Input data to preprocess 230 **kwargs: Additional arguments for preprocessing 231 232 Returns: 233 Preprocessed data 234 """ 235 preprocessor = self.get_cached_instance(preprocessor_name, preprocessor_name, f"{preprocessor_name} preprocessor") 236 return preprocessor.fit_transform(data, **kwargs)
Preprocess data using the specified preprocessor.
Args: preprocessor_name: Name of the preprocessor data: Input data to preprocess **kwargs: Additional arguments for preprocessing
Returns: Preprocessed data
239class EDAManager(BaseManager): 240 """ 241 Singleton manager for EDA analyzers. 242 """ 243 244 def register_analyzer(self, name: str, analyzer_class: Type[BaseEDAAnalyzer]): 245 """ 246 Register an EDA analyzer. 247 248 Args: 249 name: Name to register the analyzer under 250 analyzer_class: EDA analyzer class 251 """ 252 if not issubclass(analyzer_class, BaseEDAAnalyzer): 253 raise ValueError(f"Analyzer class must inherit from BaseEDAAnalyzer") 254 self.register(name, analyzer_class) 255 256 def analyze_data(self, analyzer_name: str, data: Any, **kwargs) -> Dict[str, Any]: 257 """ 258 Analyze data using the specified analyzer. 259 260 Args: 261 analyzer_name: Name of the EDA analyzer 262 data: Input data to analyze 263 **kwargs: Additional arguments for analysis 264 265 Returns: 266 Analysis results dictionary 267 """ 268 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 269 return analyzer.analyze(data, **kwargs) 270 271 def visualize_data(self, analyzer_name: str, data: Any, **kwargs): 272 """ 273 Create visualizations using the specified analyzer. 274 275 Args: 276 analyzer_name: Name of the EDA analyzer 277 data: Input data to visualize 278 **kwargs: Additional arguments for visualization 279 """ 280 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 281 analyzer.visualize(data, **kwargs)
Singleton manager for EDA analyzers.
244 def register_analyzer(self, name: str, analyzer_class: Type[BaseEDAAnalyzer]): 245 """ 246 Register an EDA analyzer. 247 248 Args: 249 name: Name to register the analyzer under 250 analyzer_class: EDA analyzer class 251 """ 252 if not issubclass(analyzer_class, BaseEDAAnalyzer): 253 raise ValueError(f"Analyzer class must inherit from BaseEDAAnalyzer") 254 self.register(name, analyzer_class)
Register an EDA analyzer.
Args: name: Name to register the analyzer under analyzer_class: EDA analyzer class
256 def analyze_data(self, analyzer_name: str, data: Any, **kwargs) -> Dict[str, Any]: 257 """ 258 Analyze data using the specified analyzer. 259 260 Args: 261 analyzer_name: Name of the EDA analyzer 262 data: Input data to analyze 263 **kwargs: Additional arguments for analysis 264 265 Returns: 266 Analysis results dictionary 267 """ 268 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 269 return analyzer.analyze(data, **kwargs)
Analyze data using the specified analyzer.
Args: analyzer_name: Name of the EDA analyzer data: Input data to analyze **kwargs: Additional arguments for analysis
Returns: Analysis results dictionary
271 def visualize_data(self, analyzer_name: str, data: Any, **kwargs): 272 """ 273 Create visualizations using the specified analyzer. 274 275 Args: 276 analyzer_name: Name of the EDA analyzer 277 data: Input data to visualize 278 **kwargs: Additional arguments for visualization 279 """ 280 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 281 analyzer.visualize(data, **kwargs)
Create visualizations using the specified analyzer.
Args: analyzer_name: Name of the EDA analyzer data: Input data to visualize **kwargs: Additional arguments for visualization
284class ClassificationManager(BaseManager): 285 """ 286 Singleton manager for classification models. 287 """ 288 289 def register_model(self, name: str, model_class: Type[BaseClassificationModel]): 290 """ 291 Register a classification model. 292 293 Args: 294 name: Name to register the model under 295 model_class: Classification model class 296 """ 297 if not issubclass(model_class, BaseClassificationModel): 298 raise ValueError(f"Model class must inherit from BaseClassificationModel") 299 self.register(name, model_class) 300 301 def train_model(self, model_name: str, features: List[Dict], **kwargs) -> BaseClassificationModel: 302 """ 303 Train a classification model. 304 305 Args: 306 model_name: Name of the classification model 307 features: List of feature dictionaries 308 **kwargs: Additional arguments for training 309 310 Returns: 311 Trained model instance 312 """ 313 model = self.create_instance(model_name, model_name, f"{model_name} classification model") 314 model.train(features, **kwargs) 315 return model 316 317 def predict(self, model_name: str, features: List[Dict], **kwargs) -> Any: 318 """ 319 Make predictions using a trained model. 320 321 Args: 322 model_name: Name of the classification model 323 features: List of feature dictionaries 324 **kwargs: Additional arguments for prediction 325 326 Returns: 327 Predictions array 328 """ 329 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 330 return model.predict(features, **kwargs) 331 332 def evaluate_model(self, model_name: str, features: List[Dict], **kwargs) -> Dict[str, float]: 333 """ 334 Evaluate a classification model. 335 336 Args: 337 model_name: Name of the classification model 338 features: List of feature dictionaries 339 **kwargs: Additional arguments for evaluation 340 341 Returns: 342 Evaluation metrics dictionary 343 """ 344 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 345 return model.evaluate(features, **kwargs)
Singleton manager for classification models.
289 def register_model(self, name: str, model_class: Type[BaseClassificationModel]): 290 """ 291 Register a classification model. 292 293 Args: 294 name: Name to register the model under 295 model_class: Classification model class 296 """ 297 if not issubclass(model_class, BaseClassificationModel): 298 raise ValueError(f"Model class must inherit from BaseClassificationModel") 299 self.register(name, model_class)
Register a classification model.
Args: name: Name to register the model under model_class: Classification model class
301 def train_model(self, model_name: str, features: List[Dict], **kwargs) -> BaseClassificationModel: 302 """ 303 Train a classification model. 304 305 Args: 306 model_name: Name of the classification model 307 features: List of feature dictionaries 308 **kwargs: Additional arguments for training 309 310 Returns: 311 Trained model instance 312 """ 313 model = self.create_instance(model_name, model_name, f"{model_name} classification model") 314 model.train(features, **kwargs) 315 return model
Train a classification model.
Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for training
Returns: Trained model instance
317 def predict(self, model_name: str, features: List[Dict], **kwargs) -> Any: 318 """ 319 Make predictions using a trained model. 320 321 Args: 322 model_name: Name of the classification model 323 features: List of feature dictionaries 324 **kwargs: Additional arguments for prediction 325 326 Returns: 327 Predictions array 328 """ 329 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 330 return model.predict(features, **kwargs)
Make predictions using a trained model.
Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for prediction
Returns: Predictions array
332 def evaluate_model(self, model_name: str, features: List[Dict], **kwargs) -> Dict[str, float]: 333 """ 334 Evaluate a classification model. 335 336 Args: 337 model_name: Name of the classification model 338 features: List of feature dictionaries 339 **kwargs: Additional arguments for evaluation 340 341 Returns: 342 Evaluation metrics dictionary 343 """ 344 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 345 return model.evaluate(features, **kwargs)
Evaluate a classification model.
Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for evaluation
Returns: Evaluation metrics dictionary
18class DaphnetLoader(BaseDatasetLoader): 19 """ 20 Daphnet dataset loader class. 21 22 This class handles loading and processing of the Daphnet dataset for gait analysis. 23 """ 24 25 def __init__(self): 26 super().__init__( 27 name="daphnet", 28 description="Daphnet Freezing of Gait Dataset - Contains accelerometer data from subjects with Parkinson's disease" 29 ) 30 self.metadata = { 31 'sensors': ['shank', 'thigh', 'trunk'], 32 'components': ['h_fd', 'v', 'h_l'], # horizontal forward, vertical, horizontal lateral 33 'sampling_frequency': 64, 34 'annotations': { 35 0: 'not_valid', 36 1: 'no_freeze', 37 2: 'freeze' 38 } 39 } 40 41 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 42 """ 43 Load Daphnet dataset from the specified directory. 44 45 Args: 46 data_dir: Directory to store/find the dataset 47 **kwargs: Additional arguments (unused for Daphnet) 48 49 Returns: 50 Tuple of (data_list, names_list) 51 """ 52 # Download and extract if needed 53 download_dataset("daphnet", data_dir) 54 extract_dataset("daphnet", data_dir) 55 56 file_path = os.path.join(data_dir, "dataset_fog_release/dataset") 57 daphnet_data = [] 58 daphnet_names = [] 59 60 # Load all subject files 61 for file in sorted(glob(os.path.join(file_path, "S*.txt"))): 62 # Extract filename from path 63 filename = os.path.basename(file) 64 daphnet_names.append(filename) 65 66 # Load CSV with proper column names 67 column_names = [ 68 "time", "shank_h_fd", "shank_v", "shank_h_l", 69 "thigh_h_fd", "thigh_v", "thigh_h_l", 70 "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations" 71 ] 72 73 df = pd.read_csv(file, sep=" ", names=column_names) 74 75 # Set time as index 76 df = df.set_index("time") 77 78 # Calculate magnitude for each sensor 79 df["thigh"] = np.sqrt(df["thigh_h_l"]**2 + df["thigh_v"]**2 + df["thigh_h_fd"]**2) 80 df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2) 81 df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2) 82 83 # Reorder columns for consistency 84 df = df[["shank", "shank_h_fd", "shank_v", "shank_h_l", 85 "thigh", "thigh_h_fd", "thigh_v", "thigh_h_l", 86 "trunk", "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"]] 87 88 daphnet_data.append(df) 89 90 # Store loaded data 91 self.data = daphnet_data 92 self.names = daphnet_names 93 94 return daphnet_data, daphnet_names 95 96 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 97 window_size: int = 192, step_size: int = 32) -> List[Dict]: 98 """ 99 Create sliding windows from the Daphnet dataset. 100 101 Args: 102 data: List of DataFrames containing Daphnet data 103 names: List of names corresponding to the data 104 window_size: Size of the sliding window (default: 192) 105 step_size: Step size for the sliding window (default: 32) 106 107 Returns: 108 List of dictionaries containing sliding windows for each DataFrame 109 """ 110 windows_data = [] 111 112 for idx, df in enumerate(data): 113 # Filter out invalid data (annotations == 0) 114 df_filtered = df[df.annotations > 0] 115 116 if df_filtered.empty: 117 continue 118 119 windows = [] 120 processed_columns = set() 121 122 # Process each sensor column 123 for col in df_filtered.columns: 124 if col != "annotations" and col not in processed_columns: 125 window_data = sliding_window(df_filtered[col], window_size, step_size) 126 windows.append({"name": col, "data": window_data}) 127 processed_columns.add(col) 128 129 # Include annotations separately 130 annotations_window = sliding_window(df_filtered["annotations"], window_size, step_size) 131 windows.append({"name": "annotations", "data": annotations_window}) 132 133 windows_data.append({"name": names[idx], "windows": windows}) 134 135 return windows_data 136 137 def get_supported_formats(self) -> List[str]: 138 """ 139 Get list of supported file formats for Daphnet dataset. 140 141 Returns: 142 List of supported file extensions 143 """ 144 return ['.txt'] 145 146 def get_sensor_info(self) -> Dict[str, List[str]]: 147 """ 148 Get information about sensors in the dataset. 149 150 Returns: 151 Dictionary containing sensor information 152 """ 153 return { 154 'sensors': self.metadata['sensors'], 155 'components': self.metadata['components'], 156 'sampling_frequency': self.metadata['sampling_frequency'] 157 } 158 159 def get_annotation_info(self) -> Dict[int, str]: 160 """ 161 Get information about annotations in the dataset. 162 163 Returns: 164 Dictionary mapping annotation values to descriptions 165 """ 166 return self.metadata['annotations']
Daphnet dataset loader class.
This class handles loading and processing of the Daphnet dataset for gait analysis.
25 def __init__(self): 26 super().__init__( 27 name="daphnet", 28 description="Daphnet Freezing of Gait Dataset - Contains accelerometer data from subjects with Parkinson's disease" 29 ) 30 self.metadata = { 31 'sensors': ['shank', 'thigh', 'trunk'], 32 'components': ['h_fd', 'v', 'h_l'], # horizontal forward, vertical, horizontal lateral 33 'sampling_frequency': 64, 34 'annotations': { 35 0: 'not_valid', 36 1: 'no_freeze', 37 2: 'freeze' 38 } 39 }
Initialize the dataset loader.
Args: name: Name of the dataset description: Description of the dataset
41 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 42 """ 43 Load Daphnet dataset from the specified directory. 44 45 Args: 46 data_dir: Directory to store/find the dataset 47 **kwargs: Additional arguments (unused for Daphnet) 48 49 Returns: 50 Tuple of (data_list, names_list) 51 """ 52 # Download and extract if needed 53 download_dataset("daphnet", data_dir) 54 extract_dataset("daphnet", data_dir) 55 56 file_path = os.path.join(data_dir, "dataset_fog_release/dataset") 57 daphnet_data = [] 58 daphnet_names = [] 59 60 # Load all subject files 61 for file in sorted(glob(os.path.join(file_path, "S*.txt"))): 62 # Extract filename from path 63 filename = os.path.basename(file) 64 daphnet_names.append(filename) 65 66 # Load CSV with proper column names 67 column_names = [ 68 "time", "shank_h_fd", "shank_v", "shank_h_l", 69 "thigh_h_fd", "thigh_v", "thigh_h_l", 70 "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations" 71 ] 72 73 df = pd.read_csv(file, sep=" ", names=column_names) 74 75 # Set time as index 76 df = df.set_index("time") 77 78 # Calculate magnitude for each sensor 79 df["thigh"] = np.sqrt(df["thigh_h_l"]**2 + df["thigh_v"]**2 + df["thigh_h_fd"]**2) 80 df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2) 81 df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2) 82 83 # Reorder columns for consistency 84 df = df[["shank", "shank_h_fd", "shank_v", "shank_h_l", 85 "thigh", "thigh_h_fd", "thigh_v", "thigh_h_l", 86 "trunk", "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"]] 87 88 daphnet_data.append(df) 89 90 # Store loaded data 91 self.data = daphnet_data 92 self.names = daphnet_names 93 94 return daphnet_data, daphnet_names
Load Daphnet dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for Daphnet)
Returns: Tuple of (data_list, names_list)
96 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 97 window_size: int = 192, step_size: int = 32) -> List[Dict]: 98 """ 99 Create sliding windows from the Daphnet dataset. 100 101 Args: 102 data: List of DataFrames containing Daphnet data 103 names: List of names corresponding to the data 104 window_size: Size of the sliding window (default: 192) 105 step_size: Step size for the sliding window (default: 32) 106 107 Returns: 108 List of dictionaries containing sliding windows for each DataFrame 109 """ 110 windows_data = [] 111 112 for idx, df in enumerate(data): 113 # Filter out invalid data (annotations == 0) 114 df_filtered = df[df.annotations > 0] 115 116 if df_filtered.empty: 117 continue 118 119 windows = [] 120 processed_columns = set() 121 122 # Process each sensor column 123 for col in df_filtered.columns: 124 if col != "annotations" and col not in processed_columns: 125 window_data = sliding_window(df_filtered[col], window_size, step_size) 126 windows.append({"name": col, "data": window_data}) 127 processed_columns.add(col) 128 129 # Include annotations separately 130 annotations_window = sliding_window(df_filtered["annotations"], window_size, step_size) 131 windows.append({"name": "annotations", "data": annotations_window}) 132 133 windows_data.append({"name": names[idx], "windows": windows}) 134 135 return windows_data
Create sliding windows from the Daphnet dataset.
Args: data: List of DataFrames containing Daphnet data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)
Returns: List of dictionaries containing sliding windows for each DataFrame
137 def get_supported_formats(self) -> List[str]: 138 """ 139 Get list of supported file formats for Daphnet dataset. 140 141 Returns: 142 List of supported file extensions 143 """ 144 return ['.txt']
Get list of supported file formats for Daphnet dataset.
Returns: List of supported file extensions
146 def get_sensor_info(self) -> Dict[str, List[str]]: 147 """ 148 Get information about sensors in the dataset. 149 150 Returns: 151 Dictionary containing sensor information 152 """ 153 return { 154 'sensors': self.metadata['sensors'], 155 'components': self.metadata['components'], 156 'sampling_frequency': self.metadata['sampling_frequency'] 157 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
159 def get_annotation_info(self) -> Dict[int, str]: 160 """ 161 Get information about annotations in the dataset. 162 163 Returns: 164 Dictionary mapping annotation values to descriptions 165 """ 166 return self.metadata['annotations']
Get information about annotations in the dataset.
Returns: Dictionary mapping annotation values to descriptions
Inherited Members
17class MobiFallLoader(BaseDatasetLoader): 18 """ 19 MobiFall dataset loader class. 20 21 This class handles loading and processing of the MobiFall dataset for gait analysis. 22 """ 23 24 def __init__(self): 25 super().__init__( 26 name="mobifall", 27 description="MobiFall Dataset - Contains accelerometer and gyroscope data for fall detection" 28 ) 29 self.metadata = { 30 'sensors': ['accelerometer', 'gyroscope'], 31 'components': ['x', 'y', 'z'], 32 'sampling_frequency': 100, # Typical for MobiFall 33 'activities': ['ADL', 'FALL'] # Activities of Daily Living and Falls 34 } 35 36 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 37 """ 38 Load MobiFall dataset from the specified directory. 39 40 Args: 41 data_dir: Directory to store/find the dataset 42 **kwargs: Additional arguments (unused for MobiFall) 43 44 Returns: 45 Tuple of (data_list, names_list) 46 """ 47 # TODO: Implement MobiFall data loading 48 # This is a placeholder implementation 49 print("MobiFall data loading is not yet implemented") 50 return [], [] 51 52 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 53 window_size: int = 192, step_size: int = 32) -> List[Dict]: 54 """ 55 Create sliding windows from the MobiFall dataset. 56 57 Args: 58 data: List of DataFrames containing MobiFall data 59 names: List of names corresponding to the data 60 window_size: Size of the sliding window (default: 192) 61 step_size: Step size for the sliding window (default: 32) 62 63 Returns: 64 List of dictionaries containing sliding windows for each DataFrame 65 """ 66 # TODO: Implement MobiFall sliding window creation 67 # This is a placeholder implementation 68 print("MobiFall sliding window creation is not yet implemented") 69 return [] 70 71 def get_supported_formats(self) -> List[str]: 72 """ 73 Get list of supported file formats for MobiFall dataset. 74 75 Returns: 76 List of supported file extensions 77 """ 78 return ['.csv', '.txt'] 79 80 def get_sensor_info(self) -> Dict[str, List[str]]: 81 """ 82 Get information about sensors in the dataset. 83 84 Returns: 85 Dictionary containing sensor information 86 """ 87 return { 88 'sensors': self.metadata['sensors'], 89 'components': self.metadata['components'], 90 'sampling_frequency': self.metadata['sampling_frequency'] 91 } 92 93 def get_activity_info(self) -> List[str]: 94 """ 95 Get information about activities in the dataset. 96 97 Returns: 98 List of activity types 99 """ 100 return self.metadata['activities']
MobiFall dataset loader class.
This class handles loading and processing of the MobiFall dataset for gait analysis.
24 def __init__(self): 25 super().__init__( 26 name="mobifall", 27 description="MobiFall Dataset - Contains accelerometer and gyroscope data for fall detection" 28 ) 29 self.metadata = { 30 'sensors': ['accelerometer', 'gyroscope'], 31 'components': ['x', 'y', 'z'], 32 'sampling_frequency': 100, # Typical for MobiFall 33 'activities': ['ADL', 'FALL'] # Activities of Daily Living and Falls 34 }
Initialize the dataset loader.
Args: name: Name of the dataset description: Description of the dataset
36 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 37 """ 38 Load MobiFall dataset from the specified directory. 39 40 Args: 41 data_dir: Directory to store/find the dataset 42 **kwargs: Additional arguments (unused for MobiFall) 43 44 Returns: 45 Tuple of (data_list, names_list) 46 """ 47 # TODO: Implement MobiFall data loading 48 # This is a placeholder implementation 49 print("MobiFall data loading is not yet implemented") 50 return [], []
Load MobiFall dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for MobiFall)
Returns: Tuple of (data_list, names_list)
52 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 53 window_size: int = 192, step_size: int = 32) -> List[Dict]: 54 """ 55 Create sliding windows from the MobiFall dataset. 56 57 Args: 58 data: List of DataFrames containing MobiFall data 59 names: List of names corresponding to the data 60 window_size: Size of the sliding window (default: 192) 61 step_size: Step size for the sliding window (default: 32) 62 63 Returns: 64 List of dictionaries containing sliding windows for each DataFrame 65 """ 66 # TODO: Implement MobiFall sliding window creation 67 # This is a placeholder implementation 68 print("MobiFall sliding window creation is not yet implemented") 69 return []
Create sliding windows from the MobiFall dataset.
Args: data: List of DataFrames containing MobiFall data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)
Returns: List of dictionaries containing sliding windows for each DataFrame
71 def get_supported_formats(self) -> List[str]: 72 """ 73 Get list of supported file formats for MobiFall dataset. 74 75 Returns: 76 List of supported file extensions 77 """ 78 return ['.csv', '.txt']
Get list of supported file formats for MobiFall dataset.
Returns: List of supported file extensions
80 def get_sensor_info(self) -> Dict[str, List[str]]: 81 """ 82 Get information about sensors in the dataset. 83 84 Returns: 85 Dictionary containing sensor information 86 """ 87 return { 88 'sensors': self.metadata['sensors'], 89 'components': self.metadata['components'], 90 'sampling_frequency': self.metadata['sampling_frequency'] 91 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
93 def get_activity_info(self) -> List[str]: 94 """ 95 Get information about activities in the dataset. 96 97 Returns: 98 List of activity types 99 """ 100 return self.metadata['activities']
Get information about activities in the dataset.
Returns: List of activity types
Inherited Members
17class ArduousLoader(BaseDatasetLoader): 18 """ 19 Arduous dataset loader class. 20 21 This class handles loading and processing of the Arduous dataset for gait analysis. 22 """ 23 24 def __init__(self): 25 super().__init__( 26 name="arduous", 27 description="Arduous Dataset - Contains multi-sensor wearable data for daily activity recognition" 28 ) 29 self.metadata = { 30 'sensors': ['accelerometer', 'gyroscope', 'magnetometer'], 31 'components': ['x', 'y', 'z'], 32 'sampling_frequency': 50, # Typical for Arduous 33 'activities': ['walking', 'running', 'sitting', 'standing', 'lying'] 34 } 35 36 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 37 """ 38 Load Arduous dataset from the specified directory. 39 40 Args: 41 data_dir: Directory to store/find the dataset 42 **kwargs: Additional arguments (unused for Arduous) 43 44 Returns: 45 Tuple of (data_list, names_list) 46 """ 47 # TODO: Implement Arduous data loading 48 # This is a placeholder implementation 49 print("Arduous data loading is not yet implemented") 50 return [], [] 51 52 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 53 window_size: int = 192, step_size: int = 32) -> List[Dict]: 54 """ 55 Create sliding windows from the Arduous dataset. 56 57 Args: 58 data: List of DataFrames containing Arduous data 59 names: List of names corresponding to the data 60 window_size: Size of the sliding window (default: 192) 61 step_size: Step size for the sliding window (default: 32) 62 63 Returns: 64 List of dictionaries containing sliding windows for each DataFrame 65 """ 66 # TODO: Implement Arduous sliding window creation 67 # This is a placeholder implementation 68 print("Arduous sliding window creation is not yet implemented") 69 return [] 70 71 def get_supported_formats(self) -> List[str]: 72 """ 73 Get list of supported file formats for Arduous dataset. 74 75 Returns: 76 List of supported file extensions 77 """ 78 return ['.csv', '.txt'] 79 80 def get_sensor_info(self) -> Dict[str, List[str]]: 81 """ 82 Get information about sensors in the dataset. 83 84 Returns: 85 Dictionary containing sensor information 86 """ 87 return { 88 'sensors': self.metadata['sensors'], 89 'components': self.metadata['components'], 90 'sampling_frequency': self.metadata['sampling_frequency'] 91 } 92 93 def get_activity_info(self) -> List[str]: 94 """ 95 Get information about activities in the dataset. 96 97 Returns: 98 List of activity types 99 """ 100 return self.metadata['activities']
Arduous dataset loader class.
This class handles loading and processing of the Arduous dataset for gait analysis.
24 def __init__(self): 25 super().__init__( 26 name="arduous", 27 description="Arduous Dataset - Contains multi-sensor wearable data for daily activity recognition" 28 ) 29 self.metadata = { 30 'sensors': ['accelerometer', 'gyroscope', 'magnetometer'], 31 'components': ['x', 'y', 'z'], 32 'sampling_frequency': 50, # Typical for Arduous 33 'activities': ['walking', 'running', 'sitting', 'standing', 'lying'] 34 }
Initialize the dataset loader.
Args: name: Name of the dataset description: Description of the dataset
36 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 37 """ 38 Load Arduous dataset from the specified directory. 39 40 Args: 41 data_dir: Directory to store/find the dataset 42 **kwargs: Additional arguments (unused for Arduous) 43 44 Returns: 45 Tuple of (data_list, names_list) 46 """ 47 # TODO: Implement Arduous data loading 48 # This is a placeholder implementation 49 print("Arduous data loading is not yet implemented") 50 return [], []
Load Arduous dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for Arduous)
Returns: Tuple of (data_list, names_list)
52 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 53 window_size: int = 192, step_size: int = 32) -> List[Dict]: 54 """ 55 Create sliding windows from the Arduous dataset. 56 57 Args: 58 data: List of DataFrames containing Arduous data 59 names: List of names corresponding to the data 60 window_size: Size of the sliding window (default: 192) 61 step_size: Step size for the sliding window (default: 32) 62 63 Returns: 64 List of dictionaries containing sliding windows for each DataFrame 65 """ 66 # TODO: Implement Arduous sliding window creation 67 # This is a placeholder implementation 68 print("Arduous sliding window creation is not yet implemented") 69 return []
Create sliding windows from the Arduous dataset.
Args: data: List of DataFrames containing Arduous data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)
Returns: List of dictionaries containing sliding windows for each DataFrame
71 def get_supported_formats(self) -> List[str]: 72 """ 73 Get list of supported file formats for Arduous dataset. 74 75 Returns: 76 List of supported file extensions 77 """ 78 return ['.csv', '.txt']
Get list of supported file formats for Arduous dataset.
Returns: List of supported file extensions
80 def get_sensor_info(self) -> Dict[str, List[str]]: 81 """ 82 Get information about sensors in the dataset. 83 84 Returns: 85 Dictionary containing sensor information 86 """ 87 return { 88 'sensors': self.metadata['sensors'], 89 'components': self.metadata['components'], 90 'sampling_frequency': self.metadata['sampling_frequency'] 91 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
93 def get_activity_info(self) -> List[str]: 94 """ 95 Get information about activities in the dataset. 96 97 Returns: 98 List of activity types 99 """ 100 return self.metadata['activities']
Get information about activities in the dataset.
Returns: List of activity types
Inherited Members
25class PhysioNetLoader(BaseDatasetLoader): 26 """ 27 PhysioNet VGRF dataset loader class. 28 29 This class handles loading and processing of the PhysioNet Gait in Parkinson's Disease dataset. 30 The dataset contains vertical ground reaction force (VGRF) data from subjects with Parkinson's 31 disease and healthy controls. 32 """ 33 34 def __init__(self): 35 super().__init__( 36 name="physionet", 37 description="PhysioNet Gait in Parkinson's Disease Dataset - Contains VGRF data from subjects with Parkinson's disease and healthy controls" 38 ) 39 self.metadata = { 40 'sensors': ['VGRF_L1', 'VGRF_L2', 'VGRF_L3', 'VGRF_L4', 'VGRF_L5', 'VGRF_L6', 'VGRF_L7', 'VGRF_L8', 41 'VGRF_R1', 'VGRF_R2', 'VGRF_R3', 'VGRF_R4', 'VGRF_R5', 'VGRF_R6', 'VGRF_R7', 'VGRF_R8'], 42 'sampling_frequency': 100, # 100 Hz sampling frequency 43 'subjects': { 44 'Co': 'Control subjects', 45 'Pt': 'Parkinson\'s disease patients' 46 }, 47 'window_size': 600, # 6 seconds at 100 Hz 48 'url': 'https://physionet.org/files/gaitpdb/1.0.0/' 49 } 50 self.labels = [] 51 self.subject_types = [] 52 53 def _download_physionet_data(self, data_dir: str) -> str: 54 """ 55 Download PhysioNet dataset if not already present. 56 57 Args: 58 data_dir: Directory to store the dataset 59 60 Returns: 61 Path to the downloaded/existing dataset directory 62 """ 63 dataset_path = os.path.join(data_dir, "physionet_gaitpdb") 64 65 if os.path.exists(dataset_path) and len(os.listdir(dataset_path)) > 0: 66 print(f"PhysioNet dataset already exists at: {dataset_path}") 67 return dataset_path 68 69 os.makedirs(dataset_path, exist_ok=True) 70 71 # Download the dataset files 72 base_url = "https://physionet.org/files/gaitpdb/1.0.0/" 73 74 # Get list of files (basic file names based on the reference) 75 file_patterns = [ 76 # Control subjects - Ga prefix 77 *[f"GaCo{i:02d}_{j:02d}.txt" for i in range(1, 18) for j in range(1, 3)], 78 "GaCo22_01.txt", "GaCo22_10.txt", 79 80 # Parkinson's patients - Ga prefix 81 *[f"GaPt{i:02d}_{j:02d}.txt" for i in range(3, 10) for j in range(1, 3)], 82 *[f"GaPt{i:02d}_{j:02d}.txt" for i in range(12, 34) for j in range(1, 3)], 83 *[f"GaPt{i:02d}_10.txt" for i in range(13, 34)], 84 85 # Control subjects - Ju prefix 86 *[f"JuCo{i:02d}_01.txt" for i in range(1, 27)], 87 88 # Parkinson's patients - Ju prefix 89 *[f"JuPt{i:02d}_{j:02d}.txt" for i in range(1, 30) for j in range(1, 8)], 90 91 # Control subjects - Si prefix 92 *[f"SiCo{i:02d}_01.txt" for i in range(1, 31)], 93 94 # Parkinson's patients - Si prefix 95 *[f"SiPt{i:02d}_01.txt" for i in range(2, 41)] 96 ] 97 98 print(f"Downloading PhysioNet dataset to {dataset_path}") 99 for filename in tqdm(file_patterns, desc="Downloading files"): 100 file_url = base_url + filename 101 file_path = os.path.join(dataset_path, filename) 102 103 if os.path.exists(file_path): 104 continue 105 106 try: 107 response = requests.get(file_url, stream=True) 108 if response.status_code == 200: 109 with open(file_path, 'wb') as f: 110 for chunk in response.iter_content(chunk_size=8192): 111 f.write(chunk) 112 else: 113 print(f"Could not download {filename} (status: {response.status_code})") 114 except Exception as e: 115 print(f"Error downloading {filename}: {e}") 116 117 return dataset_path 118 119 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 120 """ 121 Load PhysioNet VGRF dataset from the specified directory. 122 123 Args: 124 data_dir: Directory to store/find the dataset 125 **kwargs: Additional arguments (unused for PhysioNet) 126 127 Returns: 128 Tuple of (data_list, names_list) 129 """ 130 # Download dataset if needed 131 dataset_path = self._download_physionet_data(data_dir) 132 133 physionet_data = [] 134 physionet_names = [] 135 self.labels = [] 136 self.subject_types = [] 137 138 # Load all available files 139 for filepath in sorted(glob(os.path.join(dataset_path, "Ga*.txt"))): 140 filename = os.path.basename(filepath) 141 142 # Extract subject type from filename 143 if 'Co' in filename: 144 subject_type = 'Control' 145 label = 'Co' 146 elif 'Pt' in filename: 147 subject_type = 'Patient' 148 label = 'Pt' 149 else: 150 continue # Skip files that don't match expected pattern 151 152 try: 153 # Read the file - PhysioNet files are tab-delimited with variable columns 154 # Column 0: time, Columns 1-16: VGRF sensors, additional columns may exist 155 df = pd.read_csv(filepath, delimiter='\t', header=None) 156 157 # Handle variable number of columns 158 n_cols = min(df.shape[1], 19) # Limit to 19 columns max 159 df = df.iloc[:, :n_cols] 160 161 # Create column names 162 col_names = ['time'] 163 for i in range(1, n_cols): 164 if i <= 8: 165 col_names.append(f'VGRF_L{i}') 166 elif i <= 16: 167 col_names.append(f'VGRF_R{i-8}') 168 else: 169 col_names.append(f'sensor_{i}') 170 171 df.columns = col_names 172 173 # Set time as index 174 df = df.set_index('time') 175 176 # Add subject metadata 177 df['subject_type'] = subject_type 178 df['label'] = label 179 180 physionet_data.append(df) 181 physionet_names.append(filename) 182 self.labels.append(label) 183 self.subject_types.append(subject_type) 184 185 except Exception as e: 186 print(f"Error loading {filename}: {e}") 187 continue 188 189 # Store loaded data 190 self.data = physionet_data 191 self.names = physionet_names 192 193 print(f"Loaded {len(physionet_data)} PhysioNet files") 194 print(f"Subject distribution: {dict(zip(*np.unique(self.subject_types, return_counts=True)))}") 195 196 return physionet_data, physionet_names 197 198 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 199 window_size: int = 600, step_size: int = 100) -> List[Dict]: 200 """ 201 Create sliding windows from the PhysioNet dataset. 202 203 Args: 204 data: List of DataFrames containing PhysioNet data 205 names: List of names corresponding to the data 206 window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz) 207 step_size: Step size for the sliding window (default: 100) 208 209 Returns: 210 List of dictionaries containing sliding windows for each DataFrame 211 """ 212 windows_data = [] 213 214 for idx, df in enumerate(data): 215 # Remove metadata columns for windowing 216 sensor_columns = [col for col in df.columns if col.startswith('VGRF_') or col.startswith('sensor_')] 217 df_sensors = df[sensor_columns] 218 219 if df_sensors.empty or len(df_sensors) < window_size: 220 continue 221 222 windows = [] 223 224 # Create windows for each sensor 225 for col in sensor_columns: 226 try: 227 window_data = sliding_window(df_sensors[col].values, window_size, step_size) 228 windows.append({"name": col, "data": window_data}) 229 except Exception as e: 230 print(f"Error creating windows for {col} in {names[idx]}: {e}") 231 continue 232 233 if windows: 234 windows_data.append({ 235 "name": names[idx], 236 "windows": windows, 237 "metadata": { 238 "subject_type": df['subject_type'].iloc[0] if 'subject_type' in df.columns else 'Unknown', 239 "label": df['label'].iloc[0] if 'label' in df.columns else 'Unknown', 240 "window_size": window_size, 241 "step_size": step_size, 242 "num_windows": len(windows[0]["data"]) if windows else 0 243 } 244 }) 245 246 return windows_data 247 248 def get_supported_formats(self) -> List[str]: 249 """ 250 Get list of supported file formats for PhysioNet dataset. 251 252 Returns: 253 List of supported file extensions 254 """ 255 return ['.txt'] 256 257 def get_sensor_info(self) -> Dict[str, List[str]]: 258 """ 259 Get information about sensors in the dataset. 260 261 Returns: 262 Dictionary containing sensor information 263 """ 264 return { 265 'sensors': self.metadata['sensors'], 266 'sampling_frequency': self.metadata['sampling_frequency'], 267 'window_size': self.metadata['window_size'] 268 } 269 270 def get_subject_info(self) -> Dict[str, str]: 271 """ 272 Get information about subjects in the dataset. 273 274 Returns: 275 Dictionary containing subject information 276 """ 277 return self.metadata['subjects'] 278 279 def get_labels(self) -> List[str]: 280 """ 281 Get labels for loaded data. 282 283 Returns: 284 List of labels corresponding to loaded data 285 """ 286 return self.labels 287 288 def filter_by_subject_type(self, subject_type: str) -> Tuple[List[pd.DataFrame], List[str]]: 289 """ 290 Filter loaded data by subject type. 291 292 Args: 293 subject_type: 'Control' or 'Patient' 294 295 Returns: 296 Tuple of (filtered_data, filtered_names) 297 """ 298 if not self.data: 299 raise ValueError("No data loaded. Call load_data() first.") 300 301 filtered_data = [] 302 filtered_names = [] 303 304 for i, df in enumerate(self.data): 305 if df['subject_type'].iloc[0] == subject_type: 306 filtered_data.append(df) 307 filtered_names.append(self.names[i]) 308 309 return filtered_data, filtered_names
PhysioNet VGRF dataset loader class.
This class handles loading and processing of the PhysioNet Gait in Parkinson's Disease dataset. The dataset contains vertical ground reaction force (VGRF) data from subjects with Parkinson's disease and healthy controls.
34 def __init__(self): 35 super().__init__( 36 name="physionet", 37 description="PhysioNet Gait in Parkinson's Disease Dataset - Contains VGRF data from subjects with Parkinson's disease and healthy controls" 38 ) 39 self.metadata = { 40 'sensors': ['VGRF_L1', 'VGRF_L2', 'VGRF_L3', 'VGRF_L4', 'VGRF_L5', 'VGRF_L6', 'VGRF_L7', 'VGRF_L8', 41 'VGRF_R1', 'VGRF_R2', 'VGRF_R3', 'VGRF_R4', 'VGRF_R5', 'VGRF_R6', 'VGRF_R7', 'VGRF_R8'], 42 'sampling_frequency': 100, # 100 Hz sampling frequency 43 'subjects': { 44 'Co': 'Control subjects', 45 'Pt': 'Parkinson\'s disease patients' 46 }, 47 'window_size': 600, # 6 seconds at 100 Hz 48 'url': 'https://physionet.org/files/gaitpdb/1.0.0/' 49 } 50 self.labels = [] 51 self.subject_types = []
Initialize the dataset loader.
Args: name: Name of the dataset description: Description of the dataset
119 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 120 """ 121 Load PhysioNet VGRF dataset from the specified directory. 122 123 Args: 124 data_dir: Directory to store/find the dataset 125 **kwargs: Additional arguments (unused for PhysioNet) 126 127 Returns: 128 Tuple of (data_list, names_list) 129 """ 130 # Download dataset if needed 131 dataset_path = self._download_physionet_data(data_dir) 132 133 physionet_data = [] 134 physionet_names = [] 135 self.labels = [] 136 self.subject_types = [] 137 138 # Load all available files 139 for filepath in sorted(glob(os.path.join(dataset_path, "Ga*.txt"))): 140 filename = os.path.basename(filepath) 141 142 # Extract subject type from filename 143 if 'Co' in filename: 144 subject_type = 'Control' 145 label = 'Co' 146 elif 'Pt' in filename: 147 subject_type = 'Patient' 148 label = 'Pt' 149 else: 150 continue # Skip files that don't match expected pattern 151 152 try: 153 # Read the file - PhysioNet files are tab-delimited with variable columns 154 # Column 0: time, Columns 1-16: VGRF sensors, additional columns may exist 155 df = pd.read_csv(filepath, delimiter='\t', header=None) 156 157 # Handle variable number of columns 158 n_cols = min(df.shape[1], 19) # Limit to 19 columns max 159 df = df.iloc[:, :n_cols] 160 161 # Create column names 162 col_names = ['time'] 163 for i in range(1, n_cols): 164 if i <= 8: 165 col_names.append(f'VGRF_L{i}') 166 elif i <= 16: 167 col_names.append(f'VGRF_R{i-8}') 168 else: 169 col_names.append(f'sensor_{i}') 170 171 df.columns = col_names 172 173 # Set time as index 174 df = df.set_index('time') 175 176 # Add subject metadata 177 df['subject_type'] = subject_type 178 df['label'] = label 179 180 physionet_data.append(df) 181 physionet_names.append(filename) 182 self.labels.append(label) 183 self.subject_types.append(subject_type) 184 185 except Exception as e: 186 print(f"Error loading {filename}: {e}") 187 continue 188 189 # Store loaded data 190 self.data = physionet_data 191 self.names = physionet_names 192 193 print(f"Loaded {len(physionet_data)} PhysioNet files") 194 print(f"Subject distribution: {dict(zip(*np.unique(self.subject_types, return_counts=True)))}") 195 196 return physionet_data, physionet_names
Load PhysioNet VGRF dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for PhysioNet)
Returns: Tuple of (data_list, names_list)
198 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 199 window_size: int = 600, step_size: int = 100) -> List[Dict]: 200 """ 201 Create sliding windows from the PhysioNet dataset. 202 203 Args: 204 data: List of DataFrames containing PhysioNet data 205 names: List of names corresponding to the data 206 window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz) 207 step_size: Step size for the sliding window (default: 100) 208 209 Returns: 210 List of dictionaries containing sliding windows for each DataFrame 211 """ 212 windows_data = [] 213 214 for idx, df in enumerate(data): 215 # Remove metadata columns for windowing 216 sensor_columns = [col for col in df.columns if col.startswith('VGRF_') or col.startswith('sensor_')] 217 df_sensors = df[sensor_columns] 218 219 if df_sensors.empty or len(df_sensors) < window_size: 220 continue 221 222 windows = [] 223 224 # Create windows for each sensor 225 for col in sensor_columns: 226 try: 227 window_data = sliding_window(df_sensors[col].values, window_size, step_size) 228 windows.append({"name": col, "data": window_data}) 229 except Exception as e: 230 print(f"Error creating windows for {col} in {names[idx]}: {e}") 231 continue 232 233 if windows: 234 windows_data.append({ 235 "name": names[idx], 236 "windows": windows, 237 "metadata": { 238 "subject_type": df['subject_type'].iloc[0] if 'subject_type' in df.columns else 'Unknown', 239 "label": df['label'].iloc[0] if 'label' in df.columns else 'Unknown', 240 "window_size": window_size, 241 "step_size": step_size, 242 "num_windows": len(windows[0]["data"]) if windows else 0 243 } 244 }) 245 246 return windows_data
Create sliding windows from the PhysioNet dataset.
Args: data: List of DataFrames containing PhysioNet data names: List of names corresponding to the data window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz) step_size: Step size for the sliding window (default: 100)
Returns: List of dictionaries containing sliding windows for each DataFrame
248 def get_supported_formats(self) -> List[str]: 249 """ 250 Get list of supported file formats for PhysioNet dataset. 251 252 Returns: 253 List of supported file extensions 254 """ 255 return ['.txt']
Get list of supported file formats for PhysioNet dataset.
Returns: List of supported file extensions
257 def get_sensor_info(self) -> Dict[str, List[str]]: 258 """ 259 Get information about sensors in the dataset. 260 261 Returns: 262 Dictionary containing sensor information 263 """ 264 return { 265 'sensors': self.metadata['sensors'], 266 'sampling_frequency': self.metadata['sampling_frequency'], 267 'window_size': self.metadata['window_size'] 268 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
270 def get_subject_info(self) -> Dict[str, str]: 271 """ 272 Get information about subjects in the dataset. 273 274 Returns: 275 Dictionary containing subject information 276 """ 277 return self.metadata['subjects']
Get information about subjects in the dataset.
Returns: Dictionary containing subject information
279 def get_labels(self) -> List[str]: 280 """ 281 Get labels for loaded data. 282 283 Returns: 284 List of labels corresponding to loaded data 285 """ 286 return self.labels
Get labels for loaded data.
Returns: List of labels corresponding to loaded data
288 def filter_by_subject_type(self, subject_type: str) -> Tuple[List[pd.DataFrame], List[str]]: 289 """ 290 Filter loaded data by subject type. 291 292 Args: 293 subject_type: 'Control' or 'Patient' 294 295 Returns: 296 Tuple of (filtered_data, filtered_names) 297 """ 298 if not self.data: 299 raise ValueError("No data loaded. Call load_data() first.") 300 301 filtered_data = [] 302 filtered_names = [] 303 304 for i, df in enumerate(self.data): 305 if df['subject_type'].iloc[0] == subject_type: 306 filtered_data.append(df) 307 filtered_names.append(self.names[i]) 308 309 return filtered_data, filtered_names
Filter loaded data by subject type.
Args: subject_type: 'Control' or 'Patient'
Returns: Tuple of (filtered_data, filtered_names)
Inherited Members
49class GaitFeatureExtractor(BaseFeatureExtractor): 50 """ 51 Comprehensive gait feature extractor class. 52 53 This class extracts various time-domain, frequency-domain, and statistical features 54 from gait data sliding windows. 55 """ 56 57 def __init__(self, verbose: bool = True): 58 super().__init__( 59 name="gait_features", 60 description="Comprehensive gait feature extractor for time-domain, frequency-domain, and statistical features" 61 ) 62 self.verbose = verbose 63 self.config = { 64 'time_domain': True, 65 'frequency_domain': True, 66 'statistical': True, 67 'ar_order': 3 # Order for auto-regression coefficients 68 } 69 70 if self.verbose: 71 print("🚀 GaitFeatureExtractor initialized successfully!") 72 print(f"📊 Default configuration: {self.config}") 73 74 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 75 """ 76 Extract gait features from sliding windows. 77 78 Args: 79 windows: List of sliding window dictionaries 80 fs: Sampling frequency 81 **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags 82 83 Returns: 84 List of feature dictionaries for each sensor 85 """ 86 # Update config with any passed arguments 87 time_domain = kwargs.get('time_domain', self.config['time_domain']) 88 frequency_domain = kwargs.get('frequency_domain', self.config['frequency_domain']) 89 statistical = kwargs.get('statistical', self.config['statistical']) 90 ar_order = kwargs.get('ar_order', self.config['ar_order']) 91 92 if self.verbose: 93 print("\n" + "="*60) 94 print("🔍 STARTING GAIT FEATURE EXTRACTION") 95 print("="*60) 96 print(f"📈 Total sensors/windows to process: {len(windows)}") 97 print(f"🔊 Sampling frequency: {fs} Hz") 98 print(f"⏱️ Time domain features: {'✅' if time_domain else '❌'}") 99 print(f"🌊 Frequency domain features: {'✅' if frequency_domain else '❌'}") 100 print(f"📊 Statistical features: {'✅' if statistical else '❌'}") 101 print(f"🔄 Auto-regression order: {ar_order}") 102 print("-"*60) 103 104 features = [] 105 106 # Main progress bar for processing all windows 107 main_pbar = tqdm( 108 windows, 109 desc="🔍 Processing Sensors", 110 unit="sensor", 111 disable=not self.verbose 112 ) 113 114 for i, window_dict in enumerate(main_pbar): 115 sensor_name = window_dict['name'] 116 window_data = window_dict['data'] 117 118 if self.verbose: 119 main_pbar.set_postfix({ 120 'Current': sensor_name, 121 'Windows': len(window_data) if isinstance(window_data, list) else 1 122 }) 123 124 # Skip annotation windows 125 if sensor_name == 'annotations': 126 if self.verbose: 127 logger.info(f"📝 Processing annotation data for {sensor_name}") 128 129 features.append({ 130 'name': sensor_name, 131 'features': {}, 132 'annotations': [self._extract_annotation_labels(window) for window in window_data] 133 }) 134 continue 135 136 if self.verbose: 137 logger.info(f"🎯 Processing sensor: {sensor_name}") 138 logger.info(f"📦 Number of windows: {len(window_data)}") 139 140 sensor_features = {'name': sensor_name, 'features': {}} 141 142 # Time domain features 143 if time_domain: 144 if self.verbose: 145 print(f" ⏱️ Extracting time domain features for {sensor_name}...") 146 147 time_features = self._extract_time_domain_features(window_data) 148 sensor_features['features'].update(time_features) 149 150 if self.verbose: 151 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in time_features.values()) 152 print(f" ✅ Time domain: {len(time_features)} feature types, {feature_count} total features") 153 154 # Frequency domain features 155 if frequency_domain: 156 if self.verbose: 157 print(f" 🌊 Extracting frequency domain features for {sensor_name}...") 158 159 freq_features = self._extract_frequency_domain_features(window_data, fs) 160 sensor_features['features'].update(freq_features) 161 162 if self.verbose: 163 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in freq_features.values()) 164 print(f" ✅ Frequency domain: {len(freq_features)} feature types, {feature_count} total features") 165 166 # Statistical features 167 if statistical: 168 if self.verbose: 169 print(f" 📊 Extracting statistical features for {sensor_name}...") 170 171 stat_features = self._extract_statistical_features(window_data) 172 sensor_features['features'].update(stat_features) 173 174 if self.verbose: 175 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in stat_features.values()) 176 print(f" ✅ Statistical: {len(stat_features)} feature types, {feature_count} total features") 177 178 # Auto-regression coefficients 179 if self.verbose: 180 print(f" 🔄 Extracting auto-regression coefficients for {sensor_name}...") 181 182 ar_features = self._extract_ar_coefficients(window_data, ar_order) 183 sensor_features['features'].update(ar_features) 184 185 if self.verbose: 186 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in ar_features.values()) 187 print(f" ✅ Auto-regression: {len(ar_features)} feature types, {feature_count} total features") 188 189 # Calculate total features for this sensor 190 total_features = sum( 191 len(v) if isinstance(v, list) else 1 192 for v in sensor_features['features'].values() 193 ) 194 195 if self.verbose: 196 print(f" 🎯 Total features extracted for {sensor_name}: {total_features}") 197 print(f" 📋 Feature types: {list(sensor_features['features'].keys())}") 198 print("-"*40) 199 200 features.append(sensor_features) 201 202 if self.verbose: 203 print("\n" + "="*60) 204 print("🎉 FEATURE EXTRACTION COMPLETED!") 205 print("="*60) 206 print(f"📊 Total sensors processed: {len(features)}") 207 208 # Calculate overall statistics 209 total_feature_count = 0 210 for feature_dict in features: 211 if 'features' in feature_dict: 212 total_feature_count += sum( 213 len(v) if isinstance(v, list) else 1 214 for v in feature_dict['features'].values() 215 ) 216 217 print(f"🔢 Total features extracted: {total_feature_count}") 218 print(f"📈 Average features per sensor: {total_feature_count / len(features):.1f}") 219 print("="*60) 220 221 return features 222 223 def _extract_time_domain_features(self, windows: List) -> Dict[str, List]: 224 """Extract time domain features from windows.""" 225 if self.verbose: 226 print(" 🔍 Computing time domain features...") 227 228 time_features = {} 229 230 # Define time domain feature functions 231 time_domain_funcs = { 232 'mean': calculate_mean, 233 'std': calculate_standard_deviation, 234 'variance': calculate_variance, 235 'rms': calculate_root_mean_square, 236 'range': calculate_range, 237 'median': calculate_median, 238 'mode': calculate_mode, 239 'mean_absolute_value': calculate_mean_absolute_value, 240 'median_absolute_deviation': calculate_median_absolute_deviation, 241 'peak_height': calculate_peak_height, 242 'zero_crossing_rate': calculate_zero_crossing_rate, 243 'energy': calculate_energy, 244 } 245 246 # Progress bar for time domain features 247 feature_pbar = tqdm( 248 time_domain_funcs.items(), 249 desc=" ⏱️ Time features", 250 unit="feature", 251 leave=False, 252 disable=not self.verbose 253 ) 254 255 for feature_name, func in feature_pbar: 256 if self.verbose: 257 feature_pbar.set_postfix({'Computing': feature_name}) 258 259 time_features[feature_name] = [ 260 func(self._ensure_numpy_array(window)) for window in windows 261 ] 262 263 return time_features 264 265 def _ensure_numpy_array(self, signal): 266 """Convert pandas Series to numpy array if needed.""" 267 if hasattr(signal, 'values'): 268 return signal.values 269 return signal 270 271 def _extract_frequency_domain_features(self, windows: List, fs: int) -> Dict[str, List]: 272 """Extract frequency domain features from windows.""" 273 if self.verbose: 274 print(" 🔍 Computing frequency domain features...") 275 276 freq_features = {} 277 278 # Define frequency domain feature functions 279 freq_domain_funcs = { 280 'dominant_frequency': lambda w: calculate_dominant_frequency(w, fs), 281 'peak_frequency': lambda w: calculate_peak_frequency(w, fs), 282 'power_spectral_entropy': lambda w: calculate_power_spectral_entropy(w, fs), 283 'principal_harmonic_frequency': lambda w: calculate_principal_harmonic_frequency(w, fs), 284 'stride_times': lambda w: calculate_stride_times(w, fs), 285 'step_time': lambda w: calculate_step_time(w, fs), 286 'cadence': lambda w: calculate_cadence(w, fs), 287 'freezing_index': lambda w: calculate_freezing_index(w, fs), 288 } 289 290 # Progress bar for frequency domain features 291 feature_pbar = tqdm( 292 freq_domain_funcs.items(), 293 desc=" 🌊 Freq features", 294 unit="feature", 295 leave=False, 296 disable=not self.verbose 297 ) 298 299 for feature_name, func in feature_pbar: 300 if self.verbose: 301 feature_pbar.set_postfix({'Computing': feature_name}) 302 303 freq_features[feature_name] = [ 304 func(self._ensure_numpy_array(window)) for window in windows 305 ] 306 307 return freq_features 308 309 def _extract_statistical_features(self, windows: List) -> Dict[str, List]: 310 """Extract statistical features from windows.""" 311 if self.verbose: 312 print(" 🔍 Computing statistical features...") 313 314 stat_features = {} 315 316 # Define statistical feature functions 317 stat_funcs = { 318 'skewness': calculate_skewness, 319 'kurtosis': calculate_kurtosis, 320 'entropy': calculate_entropy, 321 'interquartile_range': calculate_interquartile_range, 322 } 323 324 # Progress bar for statistical features 325 feature_pbar = tqdm( 326 stat_funcs.items(), 327 desc=" 📊 Stat features", 328 unit="feature", 329 leave=False, 330 disable=not self.verbose 331 ) 332 333 for feature_name, func in feature_pbar: 334 if self.verbose: 335 feature_pbar.set_postfix({'Computing': feature_name}) 336 337 stat_features[feature_name] = [ 338 func(self._ensure_numpy_array(window)) for window in windows 339 ] 340 341 # Handle correlation separately (needs two signals) 342 if self.verbose: 343 print(" 🔗 Computing correlation features...") 344 345 stat_features['correlation'] = [ 346 calculate_correlation( 347 self._ensure_numpy_array(window)[:-1], 348 self._ensure_numpy_array(window)[1:] 349 ) if len(window) > 1 else 0 350 for window in windows 351 ] 352 353 return stat_features 354 355 def _extract_ar_coefficients(self, windows: List, order: int) -> Dict[str, List]: 356 """Extract auto-regression coefficients from windows.""" 357 if self.verbose: 358 print(f" 🔍 Computing auto-regression coefficients (order={order})...") 359 360 # Progress bar for AR coefficients 361 ar_pbar = tqdm( 362 windows, 363 desc=" 🔄 AR coeffs", 364 unit="window", 365 leave=False, 366 disable=not self.verbose 367 ) 368 369 ar_coeffs = [] 370 for window in ar_pbar: 371 coeffs = calculate_auto_regression_coefficients( 372 self._ensure_numpy_array(window), order 373 ) 374 ar_coeffs.append(coeffs) 375 376 return {'ar_coefficients': ar_coeffs} 377 378 def _extract_annotation_labels(self, window) -> int: 379 """Extract the most common annotation label from a window.""" 380 if hasattr(window, 'mode'): 381 return window.mode().iloc[0] if len(window.mode()) > 0 else 0 382 else: 383 # For numpy arrays or other types 384 unique, counts = np.unique(window, return_counts=True) 385 return unique[np.argmax(counts)] 386 387 def get_feature_names(self) -> List[str]: 388 """ 389 Get names of all features that can be extracted. 390 391 Returns: 392 List of feature names 393 """ 394 time_domain_features = [ 395 'mean', 'std', 'variance', 'rms', 'range', 'median', 'mode', 396 'mean_absolute_value', 'median_absolute_deviation', 'peak_height', 397 'zero_crossing_rate', 'energy' 398 ] 399 400 frequency_domain_features = [ 401 'dominant_frequency', 'peak_frequency', 'power_spectral_entropy', 402 'principal_harmonic_frequency', 'stride_times', 'step_time', 403 'cadence', 'freezing_index' 404 ] 405 406 statistical_features = [ 407 'skewness', 'kurtosis', 'entropy', 'interquartile_range', 'correlation' 408 ] 409 410 other_features = ['ar_coefficients'] 411 412 return time_domain_features + frequency_domain_features + statistical_features + other_features 413 414 def print_extraction_summary(self, features: List[Dict]) -> None: 415 """ 416 Print a detailed summary of extracted features. 417 418 Args: 419 features: List of feature dictionaries returned by extract_features 420 """ 421 print("\n" + "="*80) 422 print("📊 FEATURE EXTRACTION SUMMARY") 423 print("="*80) 424 425 for i, feature_dict in enumerate(features): 426 sensor_name = feature_dict['name'] 427 print(f"\n🎯 Sensor {i+1}: {sensor_name}") 428 print("-" * 40) 429 430 if 'features' in feature_dict and feature_dict['features']: 431 for feature_type, feature_values in feature_dict['features'].items(): 432 if isinstance(feature_values, list): 433 print(f" 📈 {feature_type}: {len(feature_values)} values") 434 if feature_values: 435 sample_value = feature_values[0] 436 if isinstance(sample_value, (list, np.ndarray)): 437 print(f" └── Shape per window: {np.array(sample_value).shape}") 438 else: 439 print(f" └── Sample value: {sample_value:.4f}") 440 else: 441 print(f" 📈 {feature_type}: {feature_values}") 442 443 if 'annotations' in feature_dict: 444 print(f" 📝 Annotations: {len(feature_dict['annotations'])} windows") 445 446 print("\n" + "="*80)
Comprehensive gait feature extractor class.
This class extracts various time-domain, frequency-domain, and statistical features from gait data sliding windows.
57 def __init__(self, verbose: bool = True): 58 super().__init__( 59 name="gait_features", 60 description="Comprehensive gait feature extractor for time-domain, frequency-domain, and statistical features" 61 ) 62 self.verbose = verbose 63 self.config = { 64 'time_domain': True, 65 'frequency_domain': True, 66 'statistical': True, 67 'ar_order': 3 # Order for auto-regression coefficients 68 } 69 70 if self.verbose: 71 print("🚀 GaitFeatureExtractor initialized successfully!") 72 print(f"📊 Default configuration: {self.config}")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
74 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 75 """ 76 Extract gait features from sliding windows. 77 78 Args: 79 windows: List of sliding window dictionaries 80 fs: Sampling frequency 81 **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags 82 83 Returns: 84 List of feature dictionaries for each sensor 85 """ 86 # Update config with any passed arguments 87 time_domain = kwargs.get('time_domain', self.config['time_domain']) 88 frequency_domain = kwargs.get('frequency_domain', self.config['frequency_domain']) 89 statistical = kwargs.get('statistical', self.config['statistical']) 90 ar_order = kwargs.get('ar_order', self.config['ar_order']) 91 92 if self.verbose: 93 print("\n" + "="*60) 94 print("🔍 STARTING GAIT FEATURE EXTRACTION") 95 print("="*60) 96 print(f"📈 Total sensors/windows to process: {len(windows)}") 97 print(f"🔊 Sampling frequency: {fs} Hz") 98 print(f"⏱️ Time domain features: {'✅' if time_domain else '❌'}") 99 print(f"🌊 Frequency domain features: {'✅' if frequency_domain else '❌'}") 100 print(f"📊 Statistical features: {'✅' if statistical else '❌'}") 101 print(f"🔄 Auto-regression order: {ar_order}") 102 print("-"*60) 103 104 features = [] 105 106 # Main progress bar for processing all windows 107 main_pbar = tqdm( 108 windows, 109 desc="🔍 Processing Sensors", 110 unit="sensor", 111 disable=not self.verbose 112 ) 113 114 for i, window_dict in enumerate(main_pbar): 115 sensor_name = window_dict['name'] 116 window_data = window_dict['data'] 117 118 if self.verbose: 119 main_pbar.set_postfix({ 120 'Current': sensor_name, 121 'Windows': len(window_data) if isinstance(window_data, list) else 1 122 }) 123 124 # Skip annotation windows 125 if sensor_name == 'annotations': 126 if self.verbose: 127 logger.info(f"📝 Processing annotation data for {sensor_name}") 128 129 features.append({ 130 'name': sensor_name, 131 'features': {}, 132 'annotations': [self._extract_annotation_labels(window) for window in window_data] 133 }) 134 continue 135 136 if self.verbose: 137 logger.info(f"🎯 Processing sensor: {sensor_name}") 138 logger.info(f"📦 Number of windows: {len(window_data)}") 139 140 sensor_features = {'name': sensor_name, 'features': {}} 141 142 # Time domain features 143 if time_domain: 144 if self.verbose: 145 print(f" ⏱️ Extracting time domain features for {sensor_name}...") 146 147 time_features = self._extract_time_domain_features(window_data) 148 sensor_features['features'].update(time_features) 149 150 if self.verbose: 151 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in time_features.values()) 152 print(f" ✅ Time domain: {len(time_features)} feature types, {feature_count} total features") 153 154 # Frequency domain features 155 if frequency_domain: 156 if self.verbose: 157 print(f" 🌊 Extracting frequency domain features for {sensor_name}...") 158 159 freq_features = self._extract_frequency_domain_features(window_data, fs) 160 sensor_features['features'].update(freq_features) 161 162 if self.verbose: 163 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in freq_features.values()) 164 print(f" ✅ Frequency domain: {len(freq_features)} feature types, {feature_count} total features") 165 166 # Statistical features 167 if statistical: 168 if self.verbose: 169 print(f" 📊 Extracting statistical features for {sensor_name}...") 170 171 stat_features = self._extract_statistical_features(window_data) 172 sensor_features['features'].update(stat_features) 173 174 if self.verbose: 175 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in stat_features.values()) 176 print(f" ✅ Statistical: {len(stat_features)} feature types, {feature_count} total features") 177 178 # Auto-regression coefficients 179 if self.verbose: 180 print(f" 🔄 Extracting auto-regression coefficients for {sensor_name}...") 181 182 ar_features = self._extract_ar_coefficients(window_data, ar_order) 183 sensor_features['features'].update(ar_features) 184 185 if self.verbose: 186 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in ar_features.values()) 187 print(f" ✅ Auto-regression: {len(ar_features)} feature types, {feature_count} total features") 188 189 # Calculate total features for this sensor 190 total_features = sum( 191 len(v) if isinstance(v, list) else 1 192 for v in sensor_features['features'].values() 193 ) 194 195 if self.verbose: 196 print(f" 🎯 Total features extracted for {sensor_name}: {total_features}") 197 print(f" 📋 Feature types: {list(sensor_features['features'].keys())}") 198 print("-"*40) 199 200 features.append(sensor_features) 201 202 if self.verbose: 203 print("\n" + "="*60) 204 print("🎉 FEATURE EXTRACTION COMPLETED!") 205 print("="*60) 206 print(f"📊 Total sensors processed: {len(features)}") 207 208 # Calculate overall statistics 209 total_feature_count = 0 210 for feature_dict in features: 211 if 'features' in feature_dict: 212 total_feature_count += sum( 213 len(v) if isinstance(v, list) else 1 214 for v in feature_dict['features'].values() 215 ) 216 217 print(f"🔢 Total features extracted: {total_feature_count}") 218 print(f"📈 Average features per sensor: {total_feature_count / len(features):.1f}") 219 print("="*60) 220 221 return features
Extract gait features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags
Returns: List of feature dictionaries for each sensor
387 def get_feature_names(self) -> List[str]: 388 """ 389 Get names of all features that can be extracted. 390 391 Returns: 392 List of feature names 393 """ 394 time_domain_features = [ 395 'mean', 'std', 'variance', 'rms', 'range', 'median', 'mode', 396 'mean_absolute_value', 'median_absolute_deviation', 'peak_height', 397 'zero_crossing_rate', 'energy' 398 ] 399 400 frequency_domain_features = [ 401 'dominant_frequency', 'peak_frequency', 'power_spectral_entropy', 402 'principal_harmonic_frequency', 'stride_times', 'step_time', 403 'cadence', 'freezing_index' 404 ] 405 406 statistical_features = [ 407 'skewness', 'kurtosis', 'entropy', 'interquartile_range', 'correlation' 408 ] 409 410 other_features = ['ar_coefficients'] 411 412 return time_domain_features + frequency_domain_features + statistical_features + other_features
Get names of all features that can be extracted.
Returns: List of feature names
414 def print_extraction_summary(self, features: List[Dict]) -> None: 415 """ 416 Print a detailed summary of extracted features. 417 418 Args: 419 features: List of feature dictionaries returned by extract_features 420 """ 421 print("\n" + "="*80) 422 print("📊 FEATURE EXTRACTION SUMMARY") 423 print("="*80) 424 425 for i, feature_dict in enumerate(features): 426 sensor_name = feature_dict['name'] 427 print(f"\n🎯 Sensor {i+1}: {sensor_name}") 428 print("-" * 40) 429 430 if 'features' in feature_dict and feature_dict['features']: 431 for feature_type, feature_values in feature_dict['features'].items(): 432 if isinstance(feature_values, list): 433 print(f" 📈 {feature_type}: {len(feature_values)} values") 434 if feature_values: 435 sample_value = feature_values[0] 436 if isinstance(sample_value, (list, np.ndarray)): 437 print(f" └── Shape per window: {np.array(sample_value).shape}") 438 else: 439 print(f" └── Sample value: {sample_value:.4f}") 440 else: 441 print(f" 📈 {feature_type}: {feature_values}") 442 443 if 'annotations' in feature_dict: 444 print(f" 📝 Annotations: {len(feature_dict['annotations'])} windows") 445 446 print("\n" + "="*80)
Print a detailed summary of extracted features.
Args: features: List of feature dictionaries returned by extract_features
Inherited Members
26class LBPFeatureExtractor(BaseFeatureExtractor): 27 """ 28 Local Binary Pattern (LBP) feature extractor for VGRF data. 29 30 This extractor converts time-series data into LBP codes and extracts 31 histogram features from the LBP representation. 32 """ 33 34 def __init__(self, verbose: bool = True): 35 super().__init__( 36 name="lbp_features", 37 description="Local Binary Pattern feature extractor for VGRF time-series data" 38 ) 39 self.verbose = verbose 40 self.config = { 41 'radius': 2, # LBP radius (number of neighbors) 42 'n_bins': 256, # Number of histogram bins 43 'normalize': True # Normalize histogram 44 } 45 46 if self.verbose: 47 print("🔍 LBP Feature Extractor initialized!") 48 49 def lbp_1d(self, data: np.ndarray, radius: int = 2) -> str: 50 """ 51 Compute 1D Local Binary Pattern for time-series data. 52 53 Args: 54 data: Input time-series data 55 radius: Radius for LBP computation 56 57 Returns: 58 LBP code as binary string 59 """ 60 n = len(data) 61 lbp_code = '' 62 63 for i in range(n): 64 pattern = '' 65 for j in range(i - radius, i + radius + 1): 66 if j < 0 or j >= n: 67 pattern += '0' 68 elif data[j] >= data[i]: 69 pattern += '1' 70 else: 71 pattern += '0' 72 lbp_code += pattern 73 74 return lbp_code 75 76 def lbp_to_histogram(self, lbp_code: str, n_bins: int = 256, normalize: bool = True) -> np.ndarray: 77 """ 78 Convert LBP code to histogram features. 79 80 Args: 81 lbp_code: Binary LBP code string 82 n_bins: Number of histogram bins 83 normalize: Whether to normalize histogram 84 85 Returns: 86 Histogram features as numpy array 87 """ 88 # Convert LBP code to integer values 89 if len(lbp_code) == 0: 90 return np.zeros(n_bins) 91 92 # Process LBP code in chunks of 8 bits (or smaller) 93 chunk_size = 8 94 lbp_values = [] 95 96 for i in range(0, len(lbp_code), chunk_size): 97 chunk = lbp_code[i:i + chunk_size] 98 if len(chunk) > 0: 99 # Convert binary string to integer 100 try: 101 value = int(chunk, 2) 102 lbp_values.append(value % n_bins) # Ensure within bin range 103 except ValueError: 104 continue 105 106 if len(lbp_values) == 0: 107 return np.zeros(n_bins) 108 109 # Create histogram 110 hist, _ = np.histogram(lbp_values, bins=n_bins, range=(0, n_bins)) 111 112 if normalize and np.sum(hist) > 0: 113 hist = hist / np.sum(hist) 114 115 return hist 116 117 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 118 """ 119 Extract LBP features from sliding windows. 120 121 Args: 122 windows: List of sliding window dictionaries 123 fs: Sampling frequency (unused for LBP) 124 **kwargs: Additional arguments 125 126 Returns: 127 List of feature dictionaries 128 """ 129 # Update config with any passed arguments 130 radius = kwargs.get('radius', self.config['radius']) 131 n_bins = kwargs.get('n_bins', self.config['n_bins']) 132 normalize = kwargs.get('normalize', self.config['normalize']) 133 134 if self.verbose: 135 print(f"\n🔍 LBP Feature Extraction") 136 print(f"📊 Radius: {radius}, Bins: {n_bins}, Normalize: {normalize}") 137 138 features = [] 139 140 for window_dict in tqdm(windows, desc="Processing LBP features", disable=not self.verbose): 141 sensor_name = window_dict['name'] 142 window_data = window_dict['data'] 143 144 # Skip annotation windows 145 if sensor_name == 'annotations': 146 continue 147 148 sensor_features = {'name': sensor_name, 'features': {}} 149 150 # Extract LBP features for each window 151 lbp_histograms = [] 152 lbp_means = [] 153 lbp_stds = [] 154 155 for window in window_data: 156 # Ensure window is numpy array 157 if hasattr(window, 'values'): 158 window = window.values 159 160 # Compute LBP 161 lbp_code = self.lbp_1d(window, radius) 162 163 # Convert to histogram 164 hist = self.lbp_to_histogram(lbp_code, n_bins, normalize) 165 lbp_histograms.append(hist) 166 167 # Extract summary statistics 168 lbp_means.append(np.mean(hist)) 169 lbp_stds.append(np.std(hist)) 170 171 # Store features 172 sensor_features['features'] = { 173 'lbp_histograms': lbp_histograms, 174 'lbp_mean': lbp_means, 175 'lbp_std': lbp_stds, 176 'lbp_energy': [np.sum(hist**2) for hist in lbp_histograms], 177 'lbp_entropy': [self._calculate_entropy(hist) for hist in lbp_histograms] 178 } 179 180 features.append(sensor_features) 181 182 return features 183 184 def _calculate_entropy(self, hist: np.ndarray) -> float: 185 """Calculate entropy of histogram.""" 186 # Avoid log(0) by adding small value 187 hist = hist + 1e-10 188 return -np.sum(hist * np.log2(hist)) 189 190 def get_feature_names(self) -> List[str]: 191 """Get names of LBP features.""" 192 return [ 193 'lbp_histograms', 'lbp_mean', 'lbp_std', 194 'lbp_energy', 'lbp_entropy' 195 ]
Local Binary Pattern (LBP) feature extractor for VGRF data.
This extractor converts time-series data into LBP codes and extracts histogram features from the LBP representation.
34 def __init__(self, verbose: bool = True): 35 super().__init__( 36 name="lbp_features", 37 description="Local Binary Pattern feature extractor for VGRF time-series data" 38 ) 39 self.verbose = verbose 40 self.config = { 41 'radius': 2, # LBP radius (number of neighbors) 42 'n_bins': 256, # Number of histogram bins 43 'normalize': True # Normalize histogram 44 } 45 46 if self.verbose: 47 print("🔍 LBP Feature Extractor initialized!")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
49 def lbp_1d(self, data: np.ndarray, radius: int = 2) -> str: 50 """ 51 Compute 1D Local Binary Pattern for time-series data. 52 53 Args: 54 data: Input time-series data 55 radius: Radius for LBP computation 56 57 Returns: 58 LBP code as binary string 59 """ 60 n = len(data) 61 lbp_code = '' 62 63 for i in range(n): 64 pattern = '' 65 for j in range(i - radius, i + radius + 1): 66 if j < 0 or j >= n: 67 pattern += '0' 68 elif data[j] >= data[i]: 69 pattern += '1' 70 else: 71 pattern += '0' 72 lbp_code += pattern 73 74 return lbp_code
Compute 1D Local Binary Pattern for time-series data.
Args: data: Input time-series data radius: Radius for LBP computation
Returns: LBP code as binary string
76 def lbp_to_histogram(self, lbp_code: str, n_bins: int = 256, normalize: bool = True) -> np.ndarray: 77 """ 78 Convert LBP code to histogram features. 79 80 Args: 81 lbp_code: Binary LBP code string 82 n_bins: Number of histogram bins 83 normalize: Whether to normalize histogram 84 85 Returns: 86 Histogram features as numpy array 87 """ 88 # Convert LBP code to integer values 89 if len(lbp_code) == 0: 90 return np.zeros(n_bins) 91 92 # Process LBP code in chunks of 8 bits (or smaller) 93 chunk_size = 8 94 lbp_values = [] 95 96 for i in range(0, len(lbp_code), chunk_size): 97 chunk = lbp_code[i:i + chunk_size] 98 if len(chunk) > 0: 99 # Convert binary string to integer 100 try: 101 value = int(chunk, 2) 102 lbp_values.append(value % n_bins) # Ensure within bin range 103 except ValueError: 104 continue 105 106 if len(lbp_values) == 0: 107 return np.zeros(n_bins) 108 109 # Create histogram 110 hist, _ = np.histogram(lbp_values, bins=n_bins, range=(0, n_bins)) 111 112 if normalize and np.sum(hist) > 0: 113 hist = hist / np.sum(hist) 114 115 return hist
Convert LBP code to histogram features.
Args: lbp_code: Binary LBP code string n_bins: Number of histogram bins normalize: Whether to normalize histogram
Returns: Histogram features as numpy array
117 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 118 """ 119 Extract LBP features from sliding windows. 120 121 Args: 122 windows: List of sliding window dictionaries 123 fs: Sampling frequency (unused for LBP) 124 **kwargs: Additional arguments 125 126 Returns: 127 List of feature dictionaries 128 """ 129 # Update config with any passed arguments 130 radius = kwargs.get('radius', self.config['radius']) 131 n_bins = kwargs.get('n_bins', self.config['n_bins']) 132 normalize = kwargs.get('normalize', self.config['normalize']) 133 134 if self.verbose: 135 print(f"\n🔍 LBP Feature Extraction") 136 print(f"📊 Radius: {radius}, Bins: {n_bins}, Normalize: {normalize}") 137 138 features = [] 139 140 for window_dict in tqdm(windows, desc="Processing LBP features", disable=not self.verbose): 141 sensor_name = window_dict['name'] 142 window_data = window_dict['data'] 143 144 # Skip annotation windows 145 if sensor_name == 'annotations': 146 continue 147 148 sensor_features = {'name': sensor_name, 'features': {}} 149 150 # Extract LBP features for each window 151 lbp_histograms = [] 152 lbp_means = [] 153 lbp_stds = [] 154 155 for window in window_data: 156 # Ensure window is numpy array 157 if hasattr(window, 'values'): 158 window = window.values 159 160 # Compute LBP 161 lbp_code = self.lbp_1d(window, radius) 162 163 # Convert to histogram 164 hist = self.lbp_to_histogram(lbp_code, n_bins, normalize) 165 lbp_histograms.append(hist) 166 167 # Extract summary statistics 168 lbp_means.append(np.mean(hist)) 169 lbp_stds.append(np.std(hist)) 170 171 # Store features 172 sensor_features['features'] = { 173 'lbp_histograms': lbp_histograms, 174 'lbp_mean': lbp_means, 175 'lbp_std': lbp_stds, 176 'lbp_energy': [np.sum(hist**2) for hist in lbp_histograms], 177 'lbp_entropy': [self._calculate_entropy(hist) for hist in lbp_histograms] 178 } 179 180 features.append(sensor_features) 181 182 return features
Extract LBP features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency (unused for LBP) **kwargs: Additional arguments
Returns: List of feature dictionaries
190 def get_feature_names(self) -> List[str]: 191 """Get names of LBP features.""" 192 return [ 193 'lbp_histograms', 'lbp_mean', 'lbp_std', 194 'lbp_energy', 'lbp_entropy' 195 ]
Get names of LBP features.
Inherited Members
198class FourierSeriesFeatureExtractor(BaseFeatureExtractor): 199 """ 200 Fourier Series feature extractor for VGRF data. 201 202 This extractor fits Fourier series to time-series data and extracts 203 coefficients and reconstruction features. 204 """ 205 206 def __init__(self, verbose: bool = True): 207 super().__init__( 208 name="fourier_features", 209 description="Fourier series feature extractor for VGRF time-series data" 210 ) 211 self.verbose = verbose 212 self.config = { 213 'n_terms': 10, # Number of Fourier terms 214 'period': 3.0, # Period for Fourier series 215 'extract_coefficients': True, 216 'extract_reconstruction_error': True 217 } 218 219 if self.verbose: 220 print("🌊 Fourier Series Feature Extractor initialized!") 221 222 def fit_fourier_series(self, signal: np.ndarray, time_points: np.ndarray, 223 period: float = 3.0, n_terms: int = 10) -> Dict[str, Any]: 224 """ 225 Fit Fourier series to signal. 226 227 Args: 228 signal: Input signal 229 time_points: Time points 230 period: Period of the Fourier series 231 n_terms: Number of Fourier terms 232 233 Returns: 234 Dictionary containing Fourier series parameters 235 """ 236 try: 237 # Calculate Fourier coefficients 238 L = period 239 240 # Calculate a0 (DC component) 241 a0 = 2/L * simpson(signal, time_points) 242 243 # Calculate an and bn coefficients 244 an = [] 245 bn = [] 246 247 for n in range(1, n_terms + 1): 248 # Calculate an coefficient 249 an_val = 2.0/L * simpson(signal * np.cos(2.*np.pi*n*time_points/L), time_points) 250 an.append(an_val) 251 252 # Calculate bn coefficient 253 bn_val = 2.0/L * simpson(signal * np.sin(2.*np.pi*n*time_points/L), time_points) 254 bn.append(bn_val) 255 256 # Reconstruct signal 257 reconstructed = np.full_like(time_points, a0/2) 258 for n in range(n_terms): 259 reconstructed += an[n] * np.cos(2.*np.pi*(n+1)*time_points/L) 260 reconstructed += bn[n] * np.sin(2.*np.pi*(n+1)*time_points/L) 261 262 # Calculate reconstruction error 263 reconstruction_error = np.mean((signal - reconstructed)**2) 264 265 return { 266 'a0': a0, 267 'an': an, 268 'bn': bn, 269 'reconstructed': reconstructed, 270 'reconstruction_error': reconstruction_error, 271 'fourier_energy': a0**2 + 2*np.sum(np.array(an)**2 + np.array(bn)**2) 272 } 273 274 except Exception as e: 275 if self.verbose: 276 print(f"Error in Fourier series fitting: {e}") 277 return { 278 'a0': 0, 279 'an': [0] * n_terms, 280 'bn': [0] * n_terms, 281 'reconstructed': np.zeros_like(time_points), 282 'reconstruction_error': float('inf'), 283 'fourier_energy': 0 284 } 285 286 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 287 """ 288 Extract Fourier series features from sliding windows. 289 290 Args: 291 windows: List of sliding window dictionaries 292 fs: Sampling frequency 293 **kwargs: Additional arguments 294 295 Returns: 296 List of feature dictionaries 297 """ 298 # Update config with any passed arguments 299 n_terms = kwargs.get('n_terms', self.config['n_terms']) 300 period = kwargs.get('period', self.config['period']) 301 302 if self.verbose: 303 print(f"\n🌊 Fourier Series Feature Extraction") 304 print(f"📊 Terms: {n_terms}, Period: {period}") 305 306 features = [] 307 308 for window_dict in tqdm(windows, desc="Processing Fourier features", disable=not self.verbose): 309 sensor_name = window_dict['name'] 310 window_data = window_dict['data'] 311 312 # Skip annotation windows 313 if sensor_name == 'annotations': 314 continue 315 316 sensor_features = {'name': sensor_name, 'features': {}} 317 318 # Extract Fourier features for each window 319 a0_values = [] 320 an_values = [] 321 bn_values = [] 322 reconstruction_errors = [] 323 fourier_energies = [] 324 325 for window in window_data: 326 # Ensure window is numpy array 327 if hasattr(window, 'values'): 328 window = window.values 329 330 # Create time points 331 time_points = np.linspace(0, period, len(window)) 332 333 # Fit Fourier series 334 fourier_result = self.fit_fourier_series(window, time_points, period, n_terms) 335 336 # Store results 337 a0_values.append(fourier_result['a0']) 338 an_values.append(fourier_result['an']) 339 bn_values.append(fourier_result['bn']) 340 reconstruction_errors.append(fourier_result['reconstruction_error']) 341 fourier_energies.append(fourier_result['fourier_energy']) 342 343 # Store features 344 sensor_features['features'] = { 345 'fourier_a0': a0_values, 346 'fourier_an': an_values, 347 'fourier_bn': bn_values, 348 'fourier_reconstruction_error': reconstruction_errors, 349 'fourier_energy': fourier_energies, 350 'fourier_an_mean': [np.mean(an) for an in an_values], 351 'fourier_bn_mean': [np.mean(bn) for bn in bn_values], 352 'fourier_an_std': [np.std(an) for an in an_values], 353 'fourier_bn_std': [np.std(bn) for bn in bn_values] 354 } 355 356 features.append(sensor_features) 357 358 return features 359 360 def get_feature_names(self) -> List[str]: 361 """Get names of Fourier series features.""" 362 return [ 363 'fourier_a0', 'fourier_an', 'fourier_bn', 364 'fourier_reconstruction_error', 'fourier_energy', 365 'fourier_an_mean', 'fourier_bn_mean', 366 'fourier_an_std', 'fourier_bn_std' 367 ]
Fourier Series feature extractor for VGRF data.
This extractor fits Fourier series to time-series data and extracts coefficients and reconstruction features.
206 def __init__(self, verbose: bool = True): 207 super().__init__( 208 name="fourier_features", 209 description="Fourier series feature extractor for VGRF time-series data" 210 ) 211 self.verbose = verbose 212 self.config = { 213 'n_terms': 10, # Number of Fourier terms 214 'period': 3.0, # Period for Fourier series 215 'extract_coefficients': True, 216 'extract_reconstruction_error': True 217 } 218 219 if self.verbose: 220 print("🌊 Fourier Series Feature Extractor initialized!")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
222 def fit_fourier_series(self, signal: np.ndarray, time_points: np.ndarray, 223 period: float = 3.0, n_terms: int = 10) -> Dict[str, Any]: 224 """ 225 Fit Fourier series to signal. 226 227 Args: 228 signal: Input signal 229 time_points: Time points 230 period: Period of the Fourier series 231 n_terms: Number of Fourier terms 232 233 Returns: 234 Dictionary containing Fourier series parameters 235 """ 236 try: 237 # Calculate Fourier coefficients 238 L = period 239 240 # Calculate a0 (DC component) 241 a0 = 2/L * simpson(signal, time_points) 242 243 # Calculate an and bn coefficients 244 an = [] 245 bn = [] 246 247 for n in range(1, n_terms + 1): 248 # Calculate an coefficient 249 an_val = 2.0/L * simpson(signal * np.cos(2.*np.pi*n*time_points/L), time_points) 250 an.append(an_val) 251 252 # Calculate bn coefficient 253 bn_val = 2.0/L * simpson(signal * np.sin(2.*np.pi*n*time_points/L), time_points) 254 bn.append(bn_val) 255 256 # Reconstruct signal 257 reconstructed = np.full_like(time_points, a0/2) 258 for n in range(n_terms): 259 reconstructed += an[n] * np.cos(2.*np.pi*(n+1)*time_points/L) 260 reconstructed += bn[n] * np.sin(2.*np.pi*(n+1)*time_points/L) 261 262 # Calculate reconstruction error 263 reconstruction_error = np.mean((signal - reconstructed)**2) 264 265 return { 266 'a0': a0, 267 'an': an, 268 'bn': bn, 269 'reconstructed': reconstructed, 270 'reconstruction_error': reconstruction_error, 271 'fourier_energy': a0**2 + 2*np.sum(np.array(an)**2 + np.array(bn)**2) 272 } 273 274 except Exception as e: 275 if self.verbose: 276 print(f"Error in Fourier series fitting: {e}") 277 return { 278 'a0': 0, 279 'an': [0] * n_terms, 280 'bn': [0] * n_terms, 281 'reconstructed': np.zeros_like(time_points), 282 'reconstruction_error': float('inf'), 283 'fourier_energy': 0 284 }
Fit Fourier series to signal.
Args: signal: Input signal time_points: Time points period: Period of the Fourier series n_terms: Number of Fourier terms
Returns: Dictionary containing Fourier series parameters
286 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 287 """ 288 Extract Fourier series features from sliding windows. 289 290 Args: 291 windows: List of sliding window dictionaries 292 fs: Sampling frequency 293 **kwargs: Additional arguments 294 295 Returns: 296 List of feature dictionaries 297 """ 298 # Update config with any passed arguments 299 n_terms = kwargs.get('n_terms', self.config['n_terms']) 300 period = kwargs.get('period', self.config['period']) 301 302 if self.verbose: 303 print(f"\n🌊 Fourier Series Feature Extraction") 304 print(f"📊 Terms: {n_terms}, Period: {period}") 305 306 features = [] 307 308 for window_dict in tqdm(windows, desc="Processing Fourier features", disable=not self.verbose): 309 sensor_name = window_dict['name'] 310 window_data = window_dict['data'] 311 312 # Skip annotation windows 313 if sensor_name == 'annotations': 314 continue 315 316 sensor_features = {'name': sensor_name, 'features': {}} 317 318 # Extract Fourier features for each window 319 a0_values = [] 320 an_values = [] 321 bn_values = [] 322 reconstruction_errors = [] 323 fourier_energies = [] 324 325 for window in window_data: 326 # Ensure window is numpy array 327 if hasattr(window, 'values'): 328 window = window.values 329 330 # Create time points 331 time_points = np.linspace(0, period, len(window)) 332 333 # Fit Fourier series 334 fourier_result = self.fit_fourier_series(window, time_points, period, n_terms) 335 336 # Store results 337 a0_values.append(fourier_result['a0']) 338 an_values.append(fourier_result['an']) 339 bn_values.append(fourier_result['bn']) 340 reconstruction_errors.append(fourier_result['reconstruction_error']) 341 fourier_energies.append(fourier_result['fourier_energy']) 342 343 # Store features 344 sensor_features['features'] = { 345 'fourier_a0': a0_values, 346 'fourier_an': an_values, 347 'fourier_bn': bn_values, 348 'fourier_reconstruction_error': reconstruction_errors, 349 'fourier_energy': fourier_energies, 350 'fourier_an_mean': [np.mean(an) for an in an_values], 351 'fourier_bn_mean': [np.mean(bn) for bn in bn_values], 352 'fourier_an_std': [np.std(an) for an in an_values], 353 'fourier_bn_std': [np.std(bn) for bn in bn_values] 354 } 355 356 features.append(sensor_features) 357 358 return features
Extract Fourier series features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments
Returns: List of feature dictionaries
360 def get_feature_names(self) -> List[str]: 361 """Get names of Fourier series features.""" 362 return [ 363 'fourier_a0', 'fourier_an', 'fourier_bn', 364 'fourier_reconstruction_error', 'fourier_energy', 365 'fourier_an_mean', 'fourier_bn_mean', 366 'fourier_an_std', 'fourier_bn_std' 367 ]
Get names of Fourier series features.
Inherited Members
370class PhysioNetFeatureExtractor(BaseFeatureExtractor): 371 """ 372 Combined feature extractor for PhysioNet VGRF data. 373 374 This extractor combines LBP and Fourier series features along with 375 basic statistical features specific to VGRF data. 376 """ 377 378 def __init__(self, verbose: bool = True): 379 super().__init__( 380 name="physionet_features", 381 description="Combined feature extractor for PhysioNet VGRF data including LBP and Fourier features" 382 ) 383 self.verbose = verbose 384 self.lbp_extractor = LBPFeatureExtractor(verbose=False) 385 self.fourier_extractor = FourierSeriesFeatureExtractor(verbose=False) 386 387 if self.verbose: 388 print("🚀 PhysioNet Feature Extractor initialized!") 389 390 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 391 """ 392 Extract combined features from sliding windows. 393 394 Args: 395 windows: List of sliding window dictionaries 396 fs: Sampling frequency 397 **kwargs: Additional arguments 398 399 Returns: 400 List of feature dictionaries 401 """ 402 # Extract features from each extractor 403 extract_lbp = kwargs.get('extract_lbp', True) 404 extract_fourier = kwargs.get('extract_fourier', True) 405 extract_statistical = kwargs.get('extract_statistical', True) 406 407 if self.verbose: 408 print(f"\n🔍 PhysioNet Feature Extraction") 409 print(f"📊 LBP: {extract_lbp}, Fourier: {extract_fourier}, Statistical: {extract_statistical}") 410 411 features = [] 412 413 # Extract LBP features 414 if extract_lbp: 415 lbp_features = self.lbp_extractor.extract_features(windows, fs, **kwargs) 416 else: 417 lbp_features = [] 418 419 # Extract Fourier features 420 if extract_fourier: 421 fourier_features = self.fourier_extractor.extract_features(windows, fs, **kwargs) 422 else: 423 fourier_features = [] 424 425 # Extract statistical features 426 if extract_statistical: 427 statistical_features = self._extract_statistical_features(windows) 428 else: 429 statistical_features = [] 430 431 # Combine features 432 for i, window_dict in enumerate(windows): 433 sensor_name = window_dict['name'] 434 435 # Skip annotation windows 436 if sensor_name == 'annotations': 437 continue 438 439 combined_features = {'name': sensor_name, 'features': {}} 440 441 # Add LBP features 442 if extract_lbp and i < len(lbp_features): 443 combined_features['features'].update(lbp_features[i]['features']) 444 445 # Add Fourier features 446 if extract_fourier and i < len(fourier_features): 447 combined_features['features'].update(fourier_features[i]['features']) 448 449 # Add statistical features 450 if extract_statistical and i < len(statistical_features): 451 combined_features['features'].update(statistical_features[i]['features']) 452 453 features.append(combined_features) 454 455 return features 456 457 def _extract_statistical_features(self, windows: List[Dict]) -> List[Dict]: 458 """Extract basic statistical features.""" 459 features = [] 460 461 for window_dict in windows: 462 sensor_name = window_dict['name'] 463 window_data = window_dict['data'] 464 465 # Skip annotation windows 466 if sensor_name == 'annotations': 467 continue 468 469 sensor_features = {'name': sensor_name, 'features': {}} 470 471 # Extract statistical features for each window 472 means = [] 473 stds = [] 474 maxs = [] 475 mins = [] 476 ranges = [] 477 478 for window in window_data: 479 # Ensure window is numpy array 480 if hasattr(window, 'values'): 481 window = window.values 482 483 means.append(np.mean(window)) 484 stds.append(np.std(window)) 485 maxs.append(np.max(window)) 486 mins.append(np.min(window)) 487 ranges.append(np.max(window) - np.min(window)) 488 489 # Store features 490 sensor_features['features'] = { 491 'vgrf_mean': means, 492 'vgrf_std': stds, 493 'vgrf_max': maxs, 494 'vgrf_min': mins, 495 'vgrf_range': ranges 496 } 497 498 features.append(sensor_features) 499 500 return features 501 502 def get_feature_names(self) -> List[str]: 503 """Get names of all features.""" 504 feature_names = [] 505 feature_names.extend(self.lbp_extractor.get_feature_names()) 506 feature_names.extend(self.fourier_extractor.get_feature_names()) 507 feature_names.extend(['vgrf_mean', 'vgrf_std', 'vgrf_max', 'vgrf_min', 'vgrf_range']) 508 return feature_names
Combined feature extractor for PhysioNet VGRF data.
This extractor combines LBP and Fourier series features along with basic statistical features specific to VGRF data.
378 def __init__(self, verbose: bool = True): 379 super().__init__( 380 name="physionet_features", 381 description="Combined feature extractor for PhysioNet VGRF data including LBP and Fourier features" 382 ) 383 self.verbose = verbose 384 self.lbp_extractor = LBPFeatureExtractor(verbose=False) 385 self.fourier_extractor = FourierSeriesFeatureExtractor(verbose=False) 386 387 if self.verbose: 388 print("🚀 PhysioNet Feature Extractor initialized!")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
390 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 391 """ 392 Extract combined features from sliding windows. 393 394 Args: 395 windows: List of sliding window dictionaries 396 fs: Sampling frequency 397 **kwargs: Additional arguments 398 399 Returns: 400 List of feature dictionaries 401 """ 402 # Extract features from each extractor 403 extract_lbp = kwargs.get('extract_lbp', True) 404 extract_fourier = kwargs.get('extract_fourier', True) 405 extract_statistical = kwargs.get('extract_statistical', True) 406 407 if self.verbose: 408 print(f"\n🔍 PhysioNet Feature Extraction") 409 print(f"📊 LBP: {extract_lbp}, Fourier: {extract_fourier}, Statistical: {extract_statistical}") 410 411 features = [] 412 413 # Extract LBP features 414 if extract_lbp: 415 lbp_features = self.lbp_extractor.extract_features(windows, fs, **kwargs) 416 else: 417 lbp_features = [] 418 419 # Extract Fourier features 420 if extract_fourier: 421 fourier_features = self.fourier_extractor.extract_features(windows, fs, **kwargs) 422 else: 423 fourier_features = [] 424 425 # Extract statistical features 426 if extract_statistical: 427 statistical_features = self._extract_statistical_features(windows) 428 else: 429 statistical_features = [] 430 431 # Combine features 432 for i, window_dict in enumerate(windows): 433 sensor_name = window_dict['name'] 434 435 # Skip annotation windows 436 if sensor_name == 'annotations': 437 continue 438 439 combined_features = {'name': sensor_name, 'features': {}} 440 441 # Add LBP features 442 if extract_lbp and i < len(lbp_features): 443 combined_features['features'].update(lbp_features[i]['features']) 444 445 # Add Fourier features 446 if extract_fourier and i < len(fourier_features): 447 combined_features['features'].update(fourier_features[i]['features']) 448 449 # Add statistical features 450 if extract_statistical and i < len(statistical_features): 451 combined_features['features'].update(statistical_features[i]['features']) 452 453 features.append(combined_features) 454 455 return features
Extract combined features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments
Returns: List of feature dictionaries
502 def get_feature_names(self) -> List[str]: 503 """Get names of all features.""" 504 feature_names = [] 505 feature_names.extend(self.lbp_extractor.get_feature_names()) 506 feature_names.extend(self.fourier_extractor.get_feature_names()) 507 feature_names.extend(['vgrf_mean', 'vgrf_std', 'vgrf_max', 'vgrf_min', 'vgrf_range']) 508 return feature_names
Get names of all features.
Inherited Members
18class ClippingPreprocessor(BasePreprocessor): 19 """ 20 Preprocessor for clipping values to a specified range. 21 """ 22 23 def __init__(self, min_val: float = -1, max_val: float = 1): 24 super().__init__( 25 name="clipping", 26 description="Clips values in the data to be within a specified range" 27 ) 28 self.config = { 29 'min_val': min_val, 30 'max_val': max_val 31 } 32 33 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 34 """ 35 Fit the preprocessor (no fitting needed for clipping). 36 37 Args: 38 data: Input data to fit on 39 **kwargs: Additional arguments 40 """ 41 # Update config with any passed arguments 42 self.config.update({k: v for k, v in kwargs.items() if k in ['min_val', 'max_val']}) 43 self.fitted = True 44 45 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 46 """ 47 Clip values in the data to be within the specified range. 48 49 Args: 50 data: Input data to transform 51 **kwargs: Additional arguments 52 53 Returns: 54 Clipped data 55 """ 56 min_val = kwargs.get('min_val', self.config['min_val']) 57 max_val = kwargs.get('max_val', self.config['max_val']) 58 59 return np.clip(data, min_val, max_val)
Preprocessor for clipping values to a specified range.
23 def __init__(self, min_val: float = -1, max_val: float = 1): 24 super().__init__( 25 name="clipping", 26 description="Clips values in the data to be within a specified range" 27 ) 28 self.config = { 29 'min_val': min_val, 30 'max_val': max_val 31 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
33 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 34 """ 35 Fit the preprocessor (no fitting needed for clipping). 36 37 Args: 38 data: Input data to fit on 39 **kwargs: Additional arguments 40 """ 41 # Update config with any passed arguments 42 self.config.update({k: v for k, v in kwargs.items() if k in ['min_val', 'max_val']}) 43 self.fitted = True
Fit the preprocessor (no fitting needed for clipping).
Args: data: Input data to fit on **kwargs: Additional arguments
45 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 46 """ 47 Clip values in the data to be within the specified range. 48 49 Args: 50 data: Input data to transform 51 **kwargs: Additional arguments 52 53 Returns: 54 Clipped data 55 """ 56 min_val = kwargs.get('min_val', self.config['min_val']) 57 max_val = kwargs.get('max_val', self.config['max_val']) 58 59 return np.clip(data, min_val, max_val)
Clip values in the data to be within the specified range.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Clipped data
Inherited Members
62class NoiseRemovalPreprocessor(BasePreprocessor): 63 """ 64 Preprocessor for removing noise using moving average filter. 65 """ 66 67 def __init__(self, window_size: int = 5): 68 super().__init__( 69 name="noise_removal", 70 description="Applies a moving average filter to reduce noise" 71 ) 72 self.config = { 73 'window_size': window_size 74 } 75 76 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 77 """ 78 Fit the preprocessor (no fitting needed for noise removal). 79 80 Args: 81 data: Input data to fit on 82 **kwargs: Additional arguments 83 """ 84 self.config.update({k: v for k, v in kwargs.items() if k in ['window_size']}) 85 self.fitted = True 86 87 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 88 """ 89 Apply a moving average filter to reduce noise. 90 91 Args: 92 data: Input data to transform 93 **kwargs: Additional arguments 94 95 Returns: 96 Noise-reduced data 97 """ 98 window_size = kwargs.get('window_size', self.config['window_size']) 99 100 if isinstance(data, pd.DataFrame): 101 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 102 elif isinstance(data, pd.Series): 103 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 104 else: 105 # For numpy arrays, use uniform filter 106 from scipy.ndimage import uniform_filter1d 107 return uniform_filter1d(data, size=window_size, mode='nearest')
Preprocessor for removing noise using moving average filter.
67 def __init__(self, window_size: int = 5): 68 super().__init__( 69 name="noise_removal", 70 description="Applies a moving average filter to reduce noise" 71 ) 72 self.config = { 73 'window_size': window_size 74 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
76 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 77 """ 78 Fit the preprocessor (no fitting needed for noise removal). 79 80 Args: 81 data: Input data to fit on 82 **kwargs: Additional arguments 83 """ 84 self.config.update({k: v for k, v in kwargs.items() if k in ['window_size']}) 85 self.fitted = True
Fit the preprocessor (no fitting needed for noise removal).
Args: data: Input data to fit on **kwargs: Additional arguments
87 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 88 """ 89 Apply a moving average filter to reduce noise. 90 91 Args: 92 data: Input data to transform 93 **kwargs: Additional arguments 94 95 Returns: 96 Noise-reduced data 97 """ 98 window_size = kwargs.get('window_size', self.config['window_size']) 99 100 if isinstance(data, pd.DataFrame): 101 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 102 elif isinstance(data, pd.Series): 103 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 104 else: 105 # For numpy arrays, use uniform filter 106 from scipy.ndimage import uniform_filter1d 107 return uniform_filter1d(data, size=window_size, mode='nearest')
Apply a moving average filter to reduce noise.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Noise-reduced data
Inherited Members
110class OutlierRemovalPreprocessor(BasePreprocessor): 111 """ 112 Preprocessor for removing outliers using Z-score method. 113 """ 114 115 def __init__(self, threshold: float = 3): 116 super().__init__( 117 name="outlier_removal", 118 description="Removes outliers beyond a given threshold using the Z-score method" 119 ) 120 self.config = { 121 'threshold': threshold 122 } 123 self.mean_ = None 124 self.std_ = None 125 126 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 127 """ 128 Fit the preprocessor by computing mean and standard deviation. 129 130 Args: 131 data: Input data to fit on 132 **kwargs: Additional arguments 133 """ 134 self.config.update({k: v for k, v in kwargs.items() if k in ['threshold']}) 135 136 if isinstance(data, (pd.DataFrame, pd.Series)): 137 self.mean_ = data.mean() 138 self.std_ = data.std() 139 else: 140 self.mean_ = np.mean(data) 141 self.std_ = np.std(data) 142 143 self.fitted = True 144 145 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 146 """ 147 Remove outliers beyond the threshold using Z-score method. 148 149 Args: 150 data: Input data to transform 151 **kwargs: Additional arguments 152 153 Returns: 154 Data with outliers removed 155 """ 156 threshold = kwargs.get('threshold', self.config['threshold']) 157 158 if isinstance(data, (pd.DataFrame, pd.Series)): 159 z_scores = (data - self.mean_).abs() / self.std_ 160 return data[z_scores <= threshold] 161 else: 162 z_scores = np.abs(data - self.mean_) / self.std_ 163 return data[z_scores <= threshold]
Preprocessor for removing outliers using Z-score method.
115 def __init__(self, threshold: float = 3): 116 super().__init__( 117 name="outlier_removal", 118 description="Removes outliers beyond a given threshold using the Z-score method" 119 ) 120 self.config = { 121 'threshold': threshold 122 } 123 self.mean_ = None 124 self.std_ = None
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
126 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 127 """ 128 Fit the preprocessor by computing mean and standard deviation. 129 130 Args: 131 data: Input data to fit on 132 **kwargs: Additional arguments 133 """ 134 self.config.update({k: v for k, v in kwargs.items() if k in ['threshold']}) 135 136 if isinstance(data, (pd.DataFrame, pd.Series)): 137 self.mean_ = data.mean() 138 self.std_ = data.std() 139 else: 140 self.mean_ = np.mean(data) 141 self.std_ = np.std(data) 142 143 self.fitted = True
Fit the preprocessor by computing mean and standard deviation.
Args: data: Input data to fit on **kwargs: Additional arguments
145 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 146 """ 147 Remove outliers beyond the threshold using Z-score method. 148 149 Args: 150 data: Input data to transform 151 **kwargs: Additional arguments 152 153 Returns: 154 Data with outliers removed 155 """ 156 threshold = kwargs.get('threshold', self.config['threshold']) 157 158 if isinstance(data, (pd.DataFrame, pd.Series)): 159 z_scores = (data - self.mean_).abs() / self.std_ 160 return data[z_scores <= threshold] 161 else: 162 z_scores = np.abs(data - self.mean_) / self.std_ 163 return data[z_scores <= threshold]
Remove outliers beyond the threshold using Z-score method.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Data with outliers removed
Inherited Members
166class BaselineRemovalPreprocessor(BasePreprocessor): 167 """ 168 Preprocessor for removing baseline by subtracting the mean. 169 """ 170 171 def __init__(self): 172 super().__init__( 173 name="baseline_removal", 174 description="Removes baseline by subtracting the mean" 175 ) 176 self.mean_ = None 177 178 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 179 """ 180 Fit the preprocessor by computing the mean. 181 182 Args: 183 data: Input data to fit on 184 **kwargs: Additional arguments 185 """ 186 if isinstance(data, (pd.DataFrame, pd.Series)): 187 self.mean_ = data.mean() 188 else: 189 self.mean_ = np.mean(data) 190 191 self.fitted = True 192 193 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 194 """ 195 Remove baseline by subtracting the mean. 196 197 Args: 198 data: Input data to transform 199 **kwargs: Additional arguments 200 201 Returns: 202 Baseline-corrected data 203 """ 204 return data - self.mean_
Preprocessor for removing baseline by subtracting the mean.
171 def __init__(self): 172 super().__init__( 173 name="baseline_removal", 174 description="Removes baseline by subtracting the mean" 175 ) 176 self.mean_ = None
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
178 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 179 """ 180 Fit the preprocessor by computing the mean. 181 182 Args: 183 data: Input data to fit on 184 **kwargs: Additional arguments 185 """ 186 if isinstance(data, (pd.DataFrame, pd.Series)): 187 self.mean_ = data.mean() 188 else: 189 self.mean_ = np.mean(data) 190 191 self.fitted = True
Fit the preprocessor by computing the mean.
Args: data: Input data to fit on **kwargs: Additional arguments
193 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 194 """ 195 Remove baseline by subtracting the mean. 196 197 Args: 198 data: Input data to transform 199 **kwargs: Additional arguments 200 201 Returns: 202 Baseline-corrected data 203 """ 204 return data - self.mean_
Remove baseline by subtracting the mean.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Baseline-corrected data
Inherited Members
207class DriftRemovalPreprocessor(BasePreprocessor): 208 """ 209 Preprocessor for removing low-frequency drift using high-pass filter. 210 """ 211 212 def __init__(self, cutoff: float = 0.01, fs: int = 100): 213 super().__init__( 214 name="drift_removal", 215 description="Removes low-frequency drift using a high-pass filter" 216 ) 217 self.config = { 218 'cutoff': cutoff, 219 'fs': fs 220 } 221 222 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 223 """ 224 Fit the preprocessor (no fitting needed for drift removal). 225 226 Args: 227 data: Input data to fit on 228 **kwargs: Additional arguments 229 """ 230 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 231 self.fitted = True 232 233 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 234 """ 235 Remove low-frequency drift using a high-pass filter. 236 237 Args: 238 data: Input data to transform 239 **kwargs: Additional arguments 240 241 Returns: 242 Drift-corrected data 243 """ 244 cutoff = kwargs.get('cutoff', self.config['cutoff']) 245 fs = kwargs.get('fs', self.config['fs']) 246 247 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 248 249 if isinstance(data, (pd.DataFrame, pd.Series)): 250 return pd.Series(filtfilt(b, a, data), index=data.index) 251 else: 252 return filtfilt(b, a, data)
Preprocessor for removing low-frequency drift using high-pass filter.
212 def __init__(self, cutoff: float = 0.01, fs: int = 100): 213 super().__init__( 214 name="drift_removal", 215 description="Removes low-frequency drift using a high-pass filter" 216 ) 217 self.config = { 218 'cutoff': cutoff, 219 'fs': fs 220 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
222 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 223 """ 224 Fit the preprocessor (no fitting needed for drift removal). 225 226 Args: 227 data: Input data to fit on 228 **kwargs: Additional arguments 229 """ 230 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 231 self.fitted = True
Fit the preprocessor (no fitting needed for drift removal).
Args: data: Input data to fit on **kwargs: Additional arguments
233 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 234 """ 235 Remove low-frequency drift using a high-pass filter. 236 237 Args: 238 data: Input data to transform 239 **kwargs: Additional arguments 240 241 Returns: 242 Drift-corrected data 243 """ 244 cutoff = kwargs.get('cutoff', self.config['cutoff']) 245 fs = kwargs.get('fs', self.config['fs']) 246 247 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 248 249 if isinstance(data, (pd.DataFrame, pd.Series)): 250 return pd.Series(filtfilt(b, a, data), index=data.index) 251 else: 252 return filtfilt(b, a, data)
Remove low-frequency drift using a high-pass filter.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Drift-corrected data
Inherited Members
255class HighFrequencyNoiseRemovalPreprocessor(BasePreprocessor): 256 """ 257 Preprocessor for removing high-frequency noise using low-pass filter. 258 """ 259 260 def __init__(self, cutoff: float = 10, fs: int = 100): 261 super().__init__( 262 name="high_frequency_noise_removal", 263 description="Applies a low-pass filter to remove high-frequency noise" 264 ) 265 self.config = { 266 'cutoff': cutoff, 267 'fs': fs 268 } 269 270 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 271 """ 272 Fit the preprocessor (no fitting needed for filtering). 273 274 Args: 275 data: Input data to fit on 276 **kwargs: Additional arguments 277 """ 278 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 279 self.fitted = True 280 281 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 282 """ 283 Apply a low-pass filter to remove high-frequency noise. 284 285 Args: 286 data: Input data to transform 287 **kwargs: Additional arguments 288 289 Returns: 290 Filtered data 291 """ 292 cutoff = kwargs.get('cutoff', self.config['cutoff']) 293 fs = kwargs.get('fs', self.config['fs']) 294 295 b, a = butter(1, cutoff / (fs / 2), btype='lowpass') 296 297 if isinstance(data, (pd.DataFrame, pd.Series)): 298 return pd.Series(filtfilt(b, a, data), index=data.index) 299 else: 300 return filtfilt(b, a, data)
Preprocessor for removing high-frequency noise using low-pass filter.
260 def __init__(self, cutoff: float = 10, fs: int = 100): 261 super().__init__( 262 name="high_frequency_noise_removal", 263 description="Applies a low-pass filter to remove high-frequency noise" 264 ) 265 self.config = { 266 'cutoff': cutoff, 267 'fs': fs 268 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
270 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 271 """ 272 Fit the preprocessor (no fitting needed for filtering). 273 274 Args: 275 data: Input data to fit on 276 **kwargs: Additional arguments 277 """ 278 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 279 self.fitted = True
Fit the preprocessor (no fitting needed for filtering).
Args: data: Input data to fit on **kwargs: Additional arguments
281 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 282 """ 283 Apply a low-pass filter to remove high-frequency noise. 284 285 Args: 286 data: Input data to transform 287 **kwargs: Additional arguments 288 289 Returns: 290 Filtered data 291 """ 292 cutoff = kwargs.get('cutoff', self.config['cutoff']) 293 fs = kwargs.get('fs', self.config['fs']) 294 295 b, a = butter(1, cutoff / (fs / 2), btype='lowpass') 296 297 if isinstance(data, (pd.DataFrame, pd.Series)): 298 return pd.Series(filtfilt(b, a, data), index=data.index) 299 else: 300 return filtfilt(b, a, data)
Apply a low-pass filter to remove high-frequency noise.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Filtered data
Inherited Members
303class LowFrequencyNoiseRemovalPreprocessor(BasePreprocessor): 304 """ 305 Preprocessor for removing low-frequency noise using high-pass filter. 306 """ 307 308 def __init__(self, cutoff: float = 0.5, fs: int = 100): 309 super().__init__( 310 name="low_frequency_noise_removal", 311 description="Applies a high-pass filter to remove low-frequency noise" 312 ) 313 self.config = { 314 'cutoff': cutoff, 315 'fs': fs 316 } 317 318 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 319 """ 320 Fit the preprocessor (no fitting needed for filtering). 321 322 Args: 323 data: Input data to fit on 324 **kwargs: Additional arguments 325 """ 326 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 327 self.fitted = True 328 329 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 330 """ 331 Apply a high-pass filter to remove low-frequency noise. 332 333 Args: 334 data: Input data to transform 335 **kwargs: Additional arguments 336 337 Returns: 338 Filtered data 339 """ 340 cutoff = kwargs.get('cutoff', self.config['cutoff']) 341 fs = kwargs.get('fs', self.config['fs']) 342 343 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 344 345 if isinstance(data, (pd.DataFrame, pd.Series)): 346 return pd.Series(filtfilt(b, a, data), index=data.index) 347 else: 348 return filtfilt(b, a, data)
Preprocessor for removing low-frequency noise using high-pass filter.
308 def __init__(self, cutoff: float = 0.5, fs: int = 100): 309 super().__init__( 310 name="low_frequency_noise_removal", 311 description="Applies a high-pass filter to remove low-frequency noise" 312 ) 313 self.config = { 314 'cutoff': cutoff, 315 'fs': fs 316 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
318 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 319 """ 320 Fit the preprocessor (no fitting needed for filtering). 321 322 Args: 323 data: Input data to fit on 324 **kwargs: Additional arguments 325 """ 326 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 327 self.fitted = True
Fit the preprocessor (no fitting needed for filtering).
Args: data: Input data to fit on **kwargs: Additional arguments
329 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 330 """ 331 Apply a high-pass filter to remove low-frequency noise. 332 333 Args: 334 data: Input data to transform 335 **kwargs: Additional arguments 336 337 Returns: 338 Filtered data 339 """ 340 cutoff = kwargs.get('cutoff', self.config['cutoff']) 341 fs = kwargs.get('fs', self.config['fs']) 342 343 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 344 345 if isinstance(data, (pd.DataFrame, pd.Series)): 346 return pd.Series(filtfilt(b, a, data), index=data.index) 347 else: 348 return filtfilt(b, a, data)
Apply a high-pass filter to remove low-frequency noise.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Filtered data
Inherited Members
351class ArtifactRemovalPreprocessor(BasePreprocessor): 352 """ 353 Preprocessor for removing artifacts by interpolating missing values. 354 """ 355 356 def __init__(self, method: str = "linear"): 357 super().__init__( 358 name="artifact_removal", 359 description="Removes artifacts by interpolating missing values" 360 ) 361 self.config = { 362 'method': method 363 } 364 365 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 366 """ 367 Fit the preprocessor (no fitting needed for interpolation). 368 369 Args: 370 data: Input data to fit on 371 **kwargs: Additional arguments 372 """ 373 self.config.update({k: v for k, v in kwargs.items() if k in ['method']}) 374 self.fitted = True 375 376 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 377 """ 378 Remove artifacts by interpolating missing values. 379 380 Args: 381 data: Input data to transform 382 **kwargs: Additional arguments 383 384 Returns: 385 Artifact-free data 386 """ 387 method = kwargs.get('method', self.config['method']) 388 389 if isinstance(data, (pd.DataFrame, pd.Series)): 390 return data.interpolate(method=method).bfill().ffill() 391 else: 392 # For numpy arrays, use linear interpolation 393 from scipy.interpolate import interp1d 394 x = np.arange(len(data)) 395 valid_mask = ~np.isnan(data) 396 if np.any(valid_mask): 397 f = interp1d(x[valid_mask], data[valid_mask], kind='linear', fill_value='extrapolate') 398 return f(x) 399 else: 400 return data
Preprocessor for removing artifacts by interpolating missing values.
356 def __init__(self, method: str = "linear"): 357 super().__init__( 358 name="artifact_removal", 359 description="Removes artifacts by interpolating missing values" 360 ) 361 self.config = { 362 'method': method 363 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
365 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 366 """ 367 Fit the preprocessor (no fitting needed for interpolation). 368 369 Args: 370 data: Input data to fit on 371 **kwargs: Additional arguments 372 """ 373 self.config.update({k: v for k, v in kwargs.items() if k in ['method']}) 374 self.fitted = True
Fit the preprocessor (no fitting needed for interpolation).
Args: data: Input data to fit on **kwargs: Additional arguments
376 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 377 """ 378 Remove artifacts by interpolating missing values. 379 380 Args: 381 data: Input data to transform 382 **kwargs: Additional arguments 383 384 Returns: 385 Artifact-free data 386 """ 387 method = kwargs.get('method', self.config['method']) 388 389 if isinstance(data, (pd.DataFrame, pd.Series)): 390 return data.interpolate(method=method).bfill().ffill() 391 else: 392 # For numpy arrays, use linear interpolation 393 from scipy.interpolate import interp1d 394 x = np.arange(len(data)) 395 valid_mask = ~np.isnan(data) 396 if np.any(valid_mask): 397 f = interp1d(x[valid_mask], data[valid_mask], kind='linear', fill_value='extrapolate') 398 return f(x) 399 else: 400 return data
Remove artifacts by interpolating missing values.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Artifact-free data
Inherited Members
403class TrendRemovalPreprocessor(BasePreprocessor): 404 """ 405 Preprocessor for removing trends using polynomial fitting. 406 """ 407 408 def __init__(self, order: int = 2): 409 super().__init__( 410 name="trend_removal", 411 description="Removes trends using polynomial fitting" 412 ) 413 self.config = { 414 'order': order 415 } 416 417 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 418 """ 419 Fit the preprocessor (no fitting needed for detrending). 420 421 Args: 422 data: Input data to fit on 423 **kwargs: Additional arguments 424 """ 425 self.config.update({k: v for k, v in kwargs.items() if k in ['order']}) 426 self.fitted = True 427 428 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 429 """ 430 Remove trends using polynomial fitting. 431 432 Args: 433 data: Input data to transform 434 **kwargs: Additional arguments 435 436 Returns: 437 Detrended data 438 """ 439 order = kwargs.get('order', self.config['order']) 440 441 if isinstance(data, (pd.DataFrame, pd.Series)): 442 x = np.arange(len(data)) 443 poly_coeffs = np.polyfit(x, data, order) 444 trend = np.polyval(poly_coeffs, x) 445 return data - trend 446 else: 447 x = np.arange(len(data)) 448 poly_coeffs = np.polyfit(x, data, order) 449 trend = np.polyval(poly_coeffs, x) 450 return data - trend
Preprocessor for removing trends using polynomial fitting.
408 def __init__(self, order: int = 2): 409 super().__init__( 410 name="trend_removal", 411 description="Removes trends using polynomial fitting" 412 ) 413 self.config = { 414 'order': order 415 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
417 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 418 """ 419 Fit the preprocessor (no fitting needed for detrending). 420 421 Args: 422 data: Input data to fit on 423 **kwargs: Additional arguments 424 """ 425 self.config.update({k: v for k, v in kwargs.items() if k in ['order']}) 426 self.fitted = True
Fit the preprocessor (no fitting needed for detrending).
Args: data: Input data to fit on **kwargs: Additional arguments
428 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 429 """ 430 Remove trends using polynomial fitting. 431 432 Args: 433 data: Input data to transform 434 **kwargs: Additional arguments 435 436 Returns: 437 Detrended data 438 """ 439 order = kwargs.get('order', self.config['order']) 440 441 if isinstance(data, (pd.DataFrame, pd.Series)): 442 x = np.arange(len(data)) 443 poly_coeffs = np.polyfit(x, data, order) 444 trend = np.polyval(poly_coeffs, x) 445 return data - trend 446 else: 447 x = np.arange(len(data)) 448 poly_coeffs = np.polyfit(x, data, order) 449 trend = np.polyval(poly_coeffs, x) 450 return data - trend
Remove trends using polynomial fitting.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Detrended data
Inherited Members
453class DCOffsetRemovalPreprocessor(BasePreprocessor): 454 """ 455 Preprocessor for removing DC offset by subtracting the mean. 456 """ 457 458 def __init__(self): 459 super().__init__( 460 name="dc_offset_removal", 461 description="Removes DC offset by subtracting the mean" 462 ) 463 self.mean_ = None 464 465 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 466 """ 467 Fit the preprocessor by computing the mean. 468 469 Args: 470 data: Input data to fit on 471 **kwargs: Additional arguments 472 """ 473 if isinstance(data, (pd.DataFrame, pd.Series)): 474 self.mean_ = data.mean() 475 else: 476 self.mean_ = np.mean(data) 477 478 self.fitted = True 479 480 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 481 """ 482 Remove DC offset by subtracting the mean. 483 484 Args: 485 data: Input data to transform 486 **kwargs: Additional arguments 487 488 Returns: 489 DC-corrected data 490 """ 491 return data - self.mean_
Preprocessor for removing DC offset by subtracting the mean.
458 def __init__(self): 459 super().__init__( 460 name="dc_offset_removal", 461 description="Removes DC offset by subtracting the mean" 462 ) 463 self.mean_ = None
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
465 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 466 """ 467 Fit the preprocessor by computing the mean. 468 469 Args: 470 data: Input data to fit on 471 **kwargs: Additional arguments 472 """ 473 if isinstance(data, (pd.DataFrame, pd.Series)): 474 self.mean_ = data.mean() 475 else: 476 self.mean_ = np.mean(data) 477 478 self.fitted = True
Fit the preprocessor by computing the mean.
Args: data: Input data to fit on **kwargs: Additional arguments
480 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 481 """ 482 Remove DC offset by subtracting the mean. 483 484 Args: 485 data: Input data to transform 486 **kwargs: Additional arguments 487 488 Returns: 489 DC-corrected data 490 """ 491 return data - self.mean_
Remove DC offset by subtracting the mean.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: DC-corrected data
Inherited Members
18class DaphnetVisualizationAnalyzer(BaseEDAAnalyzer): 19 """ 20 EDA analyzer for Daphnet dataset visualization. 21 22 This analyzer provides comprehensive visualization capabilities for Daphnet dataset 23 including thigh, shank, and trunk sensor data. 24 """ 25 26 def __init__(self): 27 super().__init__( 28 name="daphnet_visualization", 29 description="Comprehensive visualization analyzer for Daphnet dataset sensor data" 30 ) 31 self.config = { 32 'figsize': (20, 16), 33 'colors': { 34 'no_freeze': 'orange', 35 'freeze': 'purple' 36 }, 37 'alpha': 0.6 38 } 39 40 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 41 """ 42 Analyze the data and return statistical summaries. 43 44 Args: 45 data: Input data to analyze 46 **kwargs: Additional arguments 47 48 Returns: 49 Dictionary containing analysis results 50 """ 51 if isinstance(data, list): 52 # Multiple datasets 53 results = {} 54 for i, df in enumerate(data): 55 results[f'dataset_{i}'] = self._analyze_single_dataset(df) 56 return results 57 else: 58 # Single dataset 59 return self._analyze_single_dataset(data) 60 61 def _analyze_single_dataset(self, df: pd.DataFrame) -> Dict[str, Any]: 62 """Analyze a single dataset.""" 63 # Basic statistics 64 stats = { 65 'shape': df.shape, 66 'columns': df.columns.tolist(), 67 'annotation_distribution': df['annotations'].value_counts().to_dict() if 'annotations' in df.columns else {}, 68 'missing_values': df.isnull().sum().to_dict(), 69 'data_range': { 70 'min': df.select_dtypes(include=[np.number]).min().to_dict(), 71 'max': df.select_dtypes(include=[np.number]).max().to_dict() 72 } 73 } 74 75 # Sensor-specific statistics 76 sensor_stats = {} 77 for sensor in ['thigh', 'shank', 'trunk']: 78 if sensor in df.columns: 79 sensor_stats[sensor] = { 80 'mean': df[sensor].mean(), 81 'std': df[sensor].std(), 82 'min': df[sensor].min(), 83 'max': df[sensor].max() 84 } 85 86 stats['sensor_statistics'] = sensor_stats 87 return stats 88 89 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 90 """ 91 Create visualizations of the data. 92 93 Args: 94 data: Input data to visualize 95 **kwargs: Additional arguments including sensor_type, dataset_index, names 96 """ 97 sensor_type = kwargs.get('sensor_type', 'all') 98 dataset_index = kwargs.get('dataset_index', 0) 99 names = kwargs.get('names', []) 100 101 if isinstance(data, list): 102 if dataset_index < len(data): 103 df = data[dataset_index] 104 dataset_name = names[dataset_index] if dataset_index < len(names) else f"Dataset {dataset_index}" 105 else: 106 print(f"Dataset index {dataset_index} out of range") 107 return 108 else: 109 df = data 110 dataset_name = names[0] if names else "Dataset" 111 112 if sensor_type == 'all': 113 self._plot_all_sensors(df, dataset_name) 114 elif sensor_type == 'thigh': 115 self._plot_thigh_data(df, dataset_name) 116 elif sensor_type == 'shank': 117 self._plot_shank_data(df, dataset_name) 118 elif sensor_type == 'trunk': 119 self._plot_trunk_data(df, dataset_name) 120 else: 121 print(f"Unknown sensor type: {sensor_type}") 122 123 def _plot_thigh_data(self, df: pd.DataFrame, dataset_name: str): 124 """Plot thigh sensor data.""" 125 print(f"Plotting thigh data for {dataset_name}") 126 127 # Filter data 128 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 129 130 if df_filtered.empty: 131 print("No valid data to plot") 132 return 133 134 # Create figure 135 fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize']) 136 fig.suptitle(f"Thigh Data from {dataset_name}") 137 138 # Separate freeze and no-freeze data 139 if 'annotations' in df.columns: 140 neg = df_filtered[df_filtered.annotations == 1] # No freeze 141 pos = df_filtered[df_filtered.annotations == 2] # Freeze 142 else: 143 neg = df_filtered 144 pos = pd.DataFrame() 145 146 # Plot each component 147 components = ['thigh_h_fd', 'thigh_v', 'thigh_h_l', 'thigh'] 148 labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall'] 149 150 for i, (component, label) in enumerate(zip(components, labels)): 151 if component in df_filtered.columns: 152 # Plot main signal 153 axes[i].plot(df_filtered.index, df_filtered[component]) 154 axes[i].set_ylabel(f"{label} Thigh Acceleration") 155 156 # Plot annotations if available 157 if not neg.empty: 158 axes[i].scatter(neg.index, neg[component], 159 c=self.config['colors']['no_freeze'], 160 label="no freeze", alpha=self.config['alpha']) 161 if not pos.empty: 162 axes[i].scatter(pos.index, pos[component], 163 c=self.config['colors']['freeze'], 164 label="freeze", alpha=self.config['alpha']) 165 166 axes[i].legend() 167 168 plt.xlabel("Time") 169 plt.tight_layout() 170 plt.show() 171 172 def _plot_shank_data(self, df: pd.DataFrame, dataset_name: str): 173 """Plot shank sensor data.""" 174 print(f"Plotting shank data for {dataset_name}") 175 176 # Filter data 177 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 178 179 if df_filtered.empty: 180 print("No valid data to plot") 181 return 182 183 # Create figure 184 fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize']) 185 fig.suptitle(f"Shank Data from {dataset_name}") 186 187 # Separate freeze and no-freeze data 188 if 'annotations' in df.columns: 189 neg = df_filtered[df_filtered.annotations == 1] # No freeze 190 pos = df_filtered[df_filtered.annotations == 2] # Freeze 191 else: 192 neg = df_filtered 193 pos = pd.DataFrame() 194 195 # Plot each component 196 components = ['shank_h_fd', 'shank_v', 'shank_h_l', 'shank'] 197 labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall'] 198 199 for i, (component, label) in enumerate(zip(components, labels)): 200 if component in df_filtered.columns: 201 # Plot main signal 202 axes[i].plot(df_filtered.index, df_filtered[component]) 203 axes[i].set_ylabel(f"{label} Shank Acceleration") 204 205 # Plot annotations if available 206 if not neg.empty: 207 axes[i].scatter(neg.index, neg[component], 208 c=self.config['colors']['no_freeze'], 209 label="no freeze", alpha=self.config['alpha']) 210 if not pos.empty: 211 axes[i].scatter(pos.index, pos[component], 212 c=self.config['colors']['freeze'], 213 label="freeze", alpha=self.config['alpha']) 214 215 axes[i].legend() 216 217 plt.xlabel("Time") 218 plt.tight_layout() 219 plt.show() 220 221 def _plot_trunk_data(self, df: pd.DataFrame, dataset_name: str): 222 """Plot trunk sensor data.""" 223 print(f"Plotting trunk data for {dataset_name}") 224 225 # Filter data 226 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 227 228 if df_filtered.empty: 229 print("No valid data to plot") 230 return 231 232 # Create figure 233 fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize']) 234 fig.suptitle(f"Trunk Data from {dataset_name}") 235 236 # Separate freeze and no-freeze data 237 if 'annotations' in df.columns: 238 neg = df_filtered[df_filtered.annotations == 1] # No freeze 239 pos = df_filtered[df_filtered.annotations == 2] # Freeze 240 else: 241 neg = df_filtered 242 pos = pd.DataFrame() 243 244 # Plot each component 245 components = ['trunk_h_fd', 'trunk_v', 'trunk_h_l', 'trunk'] 246 labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall'] 247 248 for i, (component, label) in enumerate(zip(components, labels)): 249 if component in df_filtered.columns: 250 # Plot main signal 251 axes[i].plot(df_filtered.index, df_filtered[component]) 252 axes[i].set_ylabel(f"{label} Trunk Acceleration") 253 254 # Plot annotations if available 255 if not neg.empty: 256 axes[i].scatter(neg.index, neg[component], 257 c=self.config['colors']['no_freeze'], 258 label="no freeze", alpha=self.config['alpha']) 259 if not pos.empty: 260 axes[i].scatter(pos.index, pos[component], 261 c=self.config['colors']['freeze'], 262 label="freeze", alpha=self.config['alpha']) 263 264 axes[i].legend() 265 266 plt.xlabel("Time") 267 plt.tight_layout() 268 plt.show() 269 270 def _plot_all_sensors(self, df: pd.DataFrame, dataset_name: str): 271 """Plot all sensor data in a combined view.""" 272 print(f"Plotting all sensor data for {dataset_name}") 273 274 # Create figure with subplots for each sensor 275 fig, axes = plt.subplots(3, 1, sharex=True, figsize=self.config['figsize']) 276 fig.suptitle(f"All Sensor Data from {dataset_name}") 277 278 # Filter data 279 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 280 281 if df_filtered.empty: 282 print("No valid data to plot") 283 return 284 285 sensors = ['thigh', 'shank', 'trunk'] 286 for i, sensor in enumerate(sensors): 287 if sensor in df_filtered.columns: 288 axes[i].plot(df_filtered.index, df_filtered[sensor]) 289 axes[i].set_ylabel(f"{sensor.capitalize()} Acceleration") 290 291 # Add annotations if available 292 if 'annotations' in df_filtered.columns: 293 neg = df_filtered[df_filtered.annotations == 1] 294 pos = df_filtered[df_filtered.annotations == 2] 295 296 if not neg.empty: 297 axes[i].scatter(neg.index, neg[sensor], 298 c=self.config['colors']['no_freeze'], 299 label="no freeze", alpha=self.config['alpha']) 300 if not pos.empty: 301 axes[i].scatter(pos.index, pos[sensor], 302 c=self.config['colors']['freeze'], 303 label="freeze", alpha=self.config['alpha']) 304 305 axes[i].legend() 306 307 plt.xlabel("Time") 308 plt.tight_layout() 309 plt.show()
EDA analyzer for Daphnet dataset visualization.
This analyzer provides comprehensive visualization capabilities for Daphnet dataset including thigh, shank, and trunk sensor data.
26 def __init__(self): 27 super().__init__( 28 name="daphnet_visualization", 29 description="Comprehensive visualization analyzer for Daphnet dataset sensor data" 30 ) 31 self.config = { 32 'figsize': (20, 16), 33 'colors': { 34 'no_freeze': 'orange', 35 'freeze': 'purple' 36 }, 37 'alpha': 0.6 38 }
Initialize the EDA analyzer.
Args: name: Name of the EDA analyzer description: Description of the EDA analyzer
40 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 41 """ 42 Analyze the data and return statistical summaries. 43 44 Args: 45 data: Input data to analyze 46 **kwargs: Additional arguments 47 48 Returns: 49 Dictionary containing analysis results 50 """ 51 if isinstance(data, list): 52 # Multiple datasets 53 results = {} 54 for i, df in enumerate(data): 55 results[f'dataset_{i}'] = self._analyze_single_dataset(df) 56 return results 57 else: 58 # Single dataset 59 return self._analyze_single_dataset(data)
Analyze the data and return statistical summaries.
Args: data: Input data to analyze **kwargs: Additional arguments
Returns: Dictionary containing analysis results
89 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 90 """ 91 Create visualizations of the data. 92 93 Args: 94 data: Input data to visualize 95 **kwargs: Additional arguments including sensor_type, dataset_index, names 96 """ 97 sensor_type = kwargs.get('sensor_type', 'all') 98 dataset_index = kwargs.get('dataset_index', 0) 99 names = kwargs.get('names', []) 100 101 if isinstance(data, list): 102 if dataset_index < len(data): 103 df = data[dataset_index] 104 dataset_name = names[dataset_index] if dataset_index < len(names) else f"Dataset {dataset_index}" 105 else: 106 print(f"Dataset index {dataset_index} out of range") 107 return 108 else: 109 df = data 110 dataset_name = names[0] if names else "Dataset" 111 112 if sensor_type == 'all': 113 self._plot_all_sensors(df, dataset_name) 114 elif sensor_type == 'thigh': 115 self._plot_thigh_data(df, dataset_name) 116 elif sensor_type == 'shank': 117 self._plot_shank_data(df, dataset_name) 118 elif sensor_type == 'trunk': 119 self._plot_trunk_data(df, dataset_name) 120 else: 121 print(f"Unknown sensor type: {sensor_type}")
Create visualizations of the data.
Args: data: Input data to visualize **kwargs: Additional arguments including sensor_type, dataset_index, names
Inherited Members
312class SensorStatisticsAnalyzer(BaseEDAAnalyzer): 313 """ 314 EDA analyzer for sensor data statistics and feature visualization. 315 316 This analyzer provides statistical analysis and feature visualization capabilities 317 for sensor data including sliding windows and extracted features. 318 """ 319 320 def __init__(self): 321 super().__init__( 322 name="sensor_statistics", 323 description="Statistical analysis and feature visualization for sensor data" 324 ) 325 self.config = { 326 'figsize': (20, 10), 327 'feature_markers': { 328 'mean': 'x', 329 'rms': 'o', 330 'peak_height': 'v', 331 'mode': '<', 332 'median': '^' 333 } 334 } 335 336 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 337 """ 338 Analyze sensor data and return statistical summaries. 339 340 Args: 341 data: Input data to analyze 342 **kwargs: Additional arguments 343 344 Returns: 345 Dictionary containing analysis results 346 """ 347 if isinstance(data, list): 348 # Multiple datasets 349 results = {} 350 for i, df in enumerate(data): 351 results[f'dataset_{i}'] = self._compute_statistics(df) 352 return results 353 else: 354 # Single dataset 355 return self._compute_statistics(data) 356 357 def _compute_statistics(self, df: pd.DataFrame) -> Dict[str, Any]: 358 """Compute comprehensive statistics for a dataset.""" 359 stats = { 360 'basic_stats': df.describe().to_dict(), 361 'correlation_matrix': df.corr().to_dict() if len(df.select_dtypes(include=[np.number]).columns) > 1 else {}, 362 'skewness': df.skew().to_dict(), 363 'kurtosis': df.kurtosis().to_dict() 364 } 365 366 # Add sensor-specific statistics 367 sensor_stats = {} 368 for sensor in ['thigh', 'shank', 'trunk']: 369 if sensor in df.columns: 370 sensor_data = df[sensor].dropna() 371 sensor_stats[sensor] = { 372 'mean': sensor_data.mean(), 373 'std': sensor_data.std(), 374 'variance': sensor_data.var(), 375 'min': sensor_data.min(), 376 'max': sensor_data.max(), 377 'range': sensor_data.max() - sensor_data.min(), 378 'median': sensor_data.median(), 379 'q25': sensor_data.quantile(0.25), 380 'q75': sensor_data.quantile(0.75), 381 'iqr': sensor_data.quantile(0.75) - sensor_data.quantile(0.25) 382 } 383 384 stats['sensor_statistics'] = sensor_stats 385 return stats 386 387 def visualize(self, sliding_windows: List[Dict], features: List[Dict], **kwargs): 388 """ 389 Create visualizations of sensor data with overlaid features. 390 391 Args: 392 sliding_windows: List of sliding window dictionaries 393 features: List of feature dictionaries 394 **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows 395 """ 396 sensor_name = kwargs.get('sensor_name', 'shank') 397 start_idx = kwargs.get('start_idx', 0) 398 end_idx = kwargs.get('end_idx', 1000) 399 num_windows = kwargs.get('num_windows', 10) 400 save = kwargs.get('save', False) 401 402 self._plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, 403 sensor_name, num_windows, save) 404 405 def _plot_sensor_with_features(self, sliding_windows: List[Dict], features: List[Dict], 406 start_idx: int, end_idx: int, sensor_name: str = "shank", 407 num_windows: int = 10, save: bool = False): 408 """ 409 Plot sliding windows of sensor data with overlaid statistical features. 410 411 Args: 412 sliding_windows: List of sliding window dictionaries 413 features: List of feature dictionaries 414 start_idx: Start index of the time window 415 end_idx: End index of the time window 416 sensor_name: Name of the sensor to plot 417 num_windows: Number of sliding windows to plot 418 save: Whether to save the plot 419 """ 420 fig, axes = plt.subplots(2, 1, figsize=self.config['figsize'], 421 gridspec_kw={'height_ratios': [3, 1]}) 422 423 # Extract sensor windows 424 sensor_windows = next((sw['data'] for sw in sliding_windows if sw['name'] == sensor_name), None) 425 if sensor_windows is None: 426 print(f"Sensor '{sensor_name}' not found in sliding_windows.") 427 return 428 429 # Extract corresponding features 430 sensor_features = next((feat['features'] for feat in features if feat['name'] == sensor_name), None) 431 if sensor_features is None: 432 print(f"Sensor '{sensor_name}' not found in features.") 433 return 434 435 # Filter windows based on start_idx and end_idx 436 filtered_windows = [series for series in sensor_windows 437 if start_idx <= series.index[0] and series.index[-1] <= end_idx] 438 439 if not filtered_windows: 440 print(f"No windows found in the specified index range ({start_idx} - {end_idx}).") 441 return 442 443 # Store entropy & frequency features for separate plotting 444 entropy_values = [] 445 dominant_frequencies = [] 446 447 # Plot first num_windows windows 448 for i in range(min(num_windows, len(filtered_windows))): 449 series = filtered_windows[i] 450 451 # Extract time and signal values 452 time_values = series.index.to_numpy() 453 signal_values = series.values 454 455 # Determine actual start and end indices for this window 456 window_start, window_end = time_values[0], time_values[-1] 457 458 # Plot time series data 459 axes[0].plot(time_values, signal_values, alpha=0.6) 460 461 # Mark start and end of each window with vertical dotted lines 462 axes[0].axvline(x=window_start, color='black', linestyle='dotted', alpha=0.7) 463 axes[0].axvline(x=window_end, color='black', linestyle='dotted', alpha=0.7) 464 465 # Overlay statistical features 466 for feature_name, marker in self.config['feature_markers'].items(): 467 if feature_name in sensor_features and len(sensor_features[feature_name]) > i: 468 feature_value = sensor_features[feature_name][i] 469 if feature_value != 0: # Skip zero values 470 closest_index = np.argmin(np.abs(signal_values - feature_value)) 471 closest_time = time_values[closest_index] 472 axes[0].scatter(closest_time, feature_value, color='red', 473 marker=marker, s=100, label=feature_name if i == 0 else "") 474 475 # Store entropy & frequency features for separate plotting 476 if 'entropy' in sensor_features and len(sensor_features['entropy']) > i: 477 entropy_values.append(sensor_features['entropy'][i]) 478 if 'dominant_frequency' in sensor_features and len(sensor_features['dominant_frequency']) > i: 479 dominant_frequencies.append(sensor_features['dominant_frequency'][i]) 480 481 # Labels and title for time-series plot 482 axes[0].set_xlabel('Time') 483 axes[0].set_ylabel(f'{sensor_name} Signal') 484 axes[0].set_title(f'First {num_windows} windows of {sensor_name} in range {start_idx}-{end_idx} with Features') 485 axes[0].legend() 486 487 # Frequency-domain & entropy plot 488 if dominant_frequencies: 489 window_indices = list(range(len(dominant_frequencies))) 490 axes[1].plot(window_indices, dominant_frequencies, 491 label="Dominant Frequency", marker="o", linestyle="dashed", color="blue") 492 493 if entropy_values: 494 axes[1].bar(window_indices, entropy_values, alpha=0.6, label="Entropy", color="green") 495 496 axes[1].set_xlabel("Window Index") 497 axes[1].set_ylabel("Feature Value") 498 axes[1].set_title("Frequency & Entropy Features") 499 axes[1].legend() 500 501 plt.tight_layout() 502 503 # Save or show plot 504 if save: 505 file_path = input("Enter the file path to save the plot (e.g., 'plot.png'): ") 506 plt.savefig(file_path, dpi=300) 507 print(f"Plot saved at {file_path}") 508 else: 509 plt.show()
EDA analyzer for sensor data statistics and feature visualization.
This analyzer provides statistical analysis and feature visualization capabilities for sensor data including sliding windows and extracted features.
320 def __init__(self): 321 super().__init__( 322 name="sensor_statistics", 323 description="Statistical analysis and feature visualization for sensor data" 324 ) 325 self.config = { 326 'figsize': (20, 10), 327 'feature_markers': { 328 'mean': 'x', 329 'rms': 'o', 330 'peak_height': 'v', 331 'mode': '<', 332 'median': '^' 333 } 334 }
Initialize the EDA analyzer.
Args: name: Name of the EDA analyzer description: Description of the EDA analyzer
336 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 337 """ 338 Analyze sensor data and return statistical summaries. 339 340 Args: 341 data: Input data to analyze 342 **kwargs: Additional arguments 343 344 Returns: 345 Dictionary containing analysis results 346 """ 347 if isinstance(data, list): 348 # Multiple datasets 349 results = {} 350 for i, df in enumerate(data): 351 results[f'dataset_{i}'] = self._compute_statistics(df) 352 return results 353 else: 354 # Single dataset 355 return self._compute_statistics(data)
Analyze sensor data and return statistical summaries.
Args: data: Input data to analyze **kwargs: Additional arguments
Returns: Dictionary containing analysis results
387 def visualize(self, sliding_windows: List[Dict], features: List[Dict], **kwargs): 388 """ 389 Create visualizations of sensor data with overlaid features. 390 391 Args: 392 sliding_windows: List of sliding window dictionaries 393 features: List of feature dictionaries 394 **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows 395 """ 396 sensor_name = kwargs.get('sensor_name', 'shank') 397 start_idx = kwargs.get('start_idx', 0) 398 end_idx = kwargs.get('end_idx', 1000) 399 num_windows = kwargs.get('num_windows', 10) 400 save = kwargs.get('save', False) 401 402 self._plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, 403 sensor_name, num_windows, save)
Create visualizations of sensor data with overlaid features.
Args: sliding_windows: List of sliding window dictionaries features: List of feature dictionaries **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows
Inherited Members
21class RandomForestModel(BaseClassificationModel): 22 """ 23 Random Forest classification model. 24 25 This class provides Random Forest classification functionality with 26 comprehensive training, prediction, and evaluation capabilities. 27 """ 28 29 def __init__(self, n_estimators: int = 100, random_state: int = 42, max_depth: Optional[int] = None): 30 super().__init__( 31 name="random_forest", 32 description="Random Forest classifier for gait data classification" 33 ) 34 self.config = { 35 'n_estimators': n_estimators, 36 'random_state': random_state, 37 'max_depth': max_depth 38 } 39 self.model = RandomForestClassifier( 40 n_estimators=n_estimators, 41 random_state=random_state, 42 max_depth=max_depth 43 ) 44 self.feature_names = [] 45 self.class_names = [] 46 47 def train(self, features: List[Dict], **kwargs): 48 """ 49 Train the Random Forest model on the given features. 50 51 Args: 52 features: List of feature dictionaries 53 **kwargs: Additional arguments including test_size, validation_split 54 """ 55 # Preprocess features 56 X, y = preprocess_features(features) 57 58 # Store feature and class information 59 self.feature_names = [f"feature_{i}" for i in range(X.shape[1])] 60 self.class_names = list(set(y)) 61 62 # Split data if test_size is specified 63 test_size = kwargs.get('test_size', 0.2) 64 validation_split = kwargs.get('validation_split', True) 65 66 if validation_split: 67 X_train, X_test, y_train, y_test = train_test_split( 68 X, y, test_size=test_size, random_state=self.config['random_state'] 69 ) 70 71 # Train model 72 self.model.fit(X_train, y_train) 73 74 # Store validation data for later evaluation 75 self.X_test = X_test 76 self.y_test = y_test 77 78 # Print training accuracy 79 train_accuracy = self.model.score(X_train, y_train) 80 test_accuracy = self.model.score(X_test, y_test) 81 82 print(f"Training accuracy: {train_accuracy:.4f}") 83 print(f"Validation accuracy: {test_accuracy:.4f}") 84 else: 85 # Train on all data 86 self.model.fit(X, y) 87 train_accuracy = self.model.score(X, y) 88 print(f"Training accuracy: {train_accuracy:.4f}") 89 90 self.trained = True 91 print("Random Forest model trained successfully.") 92 93 def predict(self, features: List[Dict], **kwargs) -> Union[np.ndarray, Any]: 94 """ 95 Make predictions using the trained Random Forest model. 96 97 Args: 98 features: List of feature dictionaries 99 **kwargs: Additional arguments including return_probabilities 100 101 Returns: 102 Array of predictions or probabilities 103 """ 104 if not self.trained: 105 raise ValueError("Model must be trained before making predictions") 106 107 # Preprocess features 108 X, _ = preprocess_features(features) 109 110 # Make predictions 111 return_probabilities = kwargs.get('return_probabilities', False) 112 113 if return_probabilities: 114 return self.model.predict_proba(X) 115 else: 116 return self.model.predict(X) 117 118 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 119 """ 120 Evaluate the Random Forest model performance. 121 122 Args: 123 features: List of feature dictionaries 124 **kwargs: Additional arguments including detailed_report 125 126 Returns: 127 Dictionary containing evaluation metrics 128 """ 129 if not self.trained: 130 raise ValueError("Model must be trained before evaluation") 131 132 # Use validation data if available, otherwise use provided features 133 if hasattr(self, 'X_test') and hasattr(self, 'y_test'): 134 X_test, y_test = self.X_test, self.y_test 135 else: 136 X_test, y_test = preprocess_features(features) 137 138 # Make predictions 139 y_pred = self.model.predict(X_test) 140 141 # Calculate metrics 142 accuracy = accuracy_score(y_test, y_pred) 143 conf_matrix = confusion_matrix(y_test, y_pred) 144 145 # Basic metrics 146 metrics = { 147 'accuracy': accuracy, 148 'confusion_matrix': conf_matrix.tolist() 149 } 150 151 # Detailed report if requested 152 detailed_report = kwargs.get('detailed_report', False) 153 if detailed_report: 154 class_report = classification_report(y_test, y_pred, output_dict=True) 155 metrics['classification_report'] = class_report 156 157 # Feature importance 158 if hasattr(self.model, 'feature_importances_'): 159 feature_importance = dict(zip(self.feature_names, self.model.feature_importances_)) 160 metrics['feature_importance'] = feature_importance 161 162 return metrics 163 164 def save_model(self, filepath: str): 165 """ 166 Save the trained Random Forest model to a file. 167 168 Args: 169 filepath: Path to save the model 170 """ 171 if not self.trained: 172 raise ValueError("Model must be trained before saving") 173 174 # Save model with additional metadata 175 model_data = { 176 'model': self.model, 177 'config': self.config, 178 'feature_names': self.feature_names, 179 'class_names': self.class_names, 180 'trained': self.trained 181 } 182 183 joblib.dump(model_data, filepath) 184 print(f"Random Forest model saved to {filepath}") 185 186 def load_model(self, filepath: str): 187 """ 188 Load a trained Random Forest model from a file. 189 190 Args: 191 filepath: Path to the saved model 192 """ 193 try: 194 model_data = joblib.load(filepath) 195 196 # Handle legacy model format 197 if isinstance(model_data, dict): 198 self.model = model_data['model'] 199 self.config = model_data.get('config', self.config) 200 self.feature_names = model_data.get('feature_names', []) 201 self.class_names = model_data.get('class_names', []) 202 self.trained = model_data.get('trained', True) 203 else: 204 # Legacy format - just the model 205 self.model = model_data 206 self.trained = True 207 208 print(f"Random Forest model loaded from {filepath}") 209 except Exception as e: 210 print(f"Error loading model: {e}") 211 raise 212 213 def get_feature_importance(self) -> Dict[str, float]: 214 """ 215 Get feature importance scores. 216 217 Returns: 218 Dictionary mapping feature names to importance scores 219 """ 220 if not self.trained: 221 raise ValueError("Model must be trained to get feature importance") 222 223 if hasattr(self.model, 'feature_importances_'): 224 return dict(zip(self.feature_names, self.model.feature_importances_)) 225 else: 226 return {} 227 228 def predict_single(self, single_features: Dict) -> int: 229 """ 230 Make prediction for a single feature vector. 231 232 Args: 233 single_features: Dictionary containing features for a single sample 234 235 Returns: 236 Predicted class 237 """ 238 if not self.trained: 239 raise ValueError("Model must be trained before making predictions") 240 241 # Convert single feature dict to format expected by preprocess_features 242 features_list = [single_features] 243 X, _ = preprocess_features(features_list) 244 245 return self.model.predict(X)[0]
Random Forest classification model.
This class provides Random Forest classification functionality with comprehensive training, prediction, and evaluation capabilities.
29 def __init__(self, n_estimators: int = 100, random_state: int = 42, max_depth: Optional[int] = None): 30 super().__init__( 31 name="random_forest", 32 description="Random Forest classifier for gait data classification" 33 ) 34 self.config = { 35 'n_estimators': n_estimators, 36 'random_state': random_state, 37 'max_depth': max_depth 38 } 39 self.model = RandomForestClassifier( 40 n_estimators=n_estimators, 41 random_state=random_state, 42 max_depth=max_depth 43 ) 44 self.feature_names = [] 45 self.class_names = []
Initialize the classification model.
Args: name: Name of the classification model description: Description of the classification model
47 def train(self, features: List[Dict], **kwargs): 48 """ 49 Train the Random Forest model on the given features. 50 51 Args: 52 features: List of feature dictionaries 53 **kwargs: Additional arguments including test_size, validation_split 54 """ 55 # Preprocess features 56 X, y = preprocess_features(features) 57 58 # Store feature and class information 59 self.feature_names = [f"feature_{i}" for i in range(X.shape[1])] 60 self.class_names = list(set(y)) 61 62 # Split data if test_size is specified 63 test_size = kwargs.get('test_size', 0.2) 64 validation_split = kwargs.get('validation_split', True) 65 66 if validation_split: 67 X_train, X_test, y_train, y_test = train_test_split( 68 X, y, test_size=test_size, random_state=self.config['random_state'] 69 ) 70 71 # Train model 72 self.model.fit(X_train, y_train) 73 74 # Store validation data for later evaluation 75 self.X_test = X_test 76 self.y_test = y_test 77 78 # Print training accuracy 79 train_accuracy = self.model.score(X_train, y_train) 80 test_accuracy = self.model.score(X_test, y_test) 81 82 print(f"Training accuracy: {train_accuracy:.4f}") 83 print(f"Validation accuracy: {test_accuracy:.4f}") 84 else: 85 # Train on all data 86 self.model.fit(X, y) 87 train_accuracy = self.model.score(X, y) 88 print(f"Training accuracy: {train_accuracy:.4f}") 89 90 self.trained = True 91 print("Random Forest model trained successfully.")
Train the Random Forest model on the given features.
Args: features: List of feature dictionaries **kwargs: Additional arguments including test_size, validation_split
93 def predict(self, features: List[Dict], **kwargs) -> Union[np.ndarray, Any]: 94 """ 95 Make predictions using the trained Random Forest model. 96 97 Args: 98 features: List of feature dictionaries 99 **kwargs: Additional arguments including return_probabilities 100 101 Returns: 102 Array of predictions or probabilities 103 """ 104 if not self.trained: 105 raise ValueError("Model must be trained before making predictions") 106 107 # Preprocess features 108 X, _ = preprocess_features(features) 109 110 # Make predictions 111 return_probabilities = kwargs.get('return_probabilities', False) 112 113 if return_probabilities: 114 return self.model.predict_proba(X) 115 else: 116 return self.model.predict(X)
Make predictions using the trained Random Forest model.
Args: features: List of feature dictionaries **kwargs: Additional arguments including return_probabilities
Returns: Array of predictions or probabilities
118 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 119 """ 120 Evaluate the Random Forest model performance. 121 122 Args: 123 features: List of feature dictionaries 124 **kwargs: Additional arguments including detailed_report 125 126 Returns: 127 Dictionary containing evaluation metrics 128 """ 129 if not self.trained: 130 raise ValueError("Model must be trained before evaluation") 131 132 # Use validation data if available, otherwise use provided features 133 if hasattr(self, 'X_test') and hasattr(self, 'y_test'): 134 X_test, y_test = self.X_test, self.y_test 135 else: 136 X_test, y_test = preprocess_features(features) 137 138 # Make predictions 139 y_pred = self.model.predict(X_test) 140 141 # Calculate metrics 142 accuracy = accuracy_score(y_test, y_pred) 143 conf_matrix = confusion_matrix(y_test, y_pred) 144 145 # Basic metrics 146 metrics = { 147 'accuracy': accuracy, 148 'confusion_matrix': conf_matrix.tolist() 149 } 150 151 # Detailed report if requested 152 detailed_report = kwargs.get('detailed_report', False) 153 if detailed_report: 154 class_report = classification_report(y_test, y_pred, output_dict=True) 155 metrics['classification_report'] = class_report 156 157 # Feature importance 158 if hasattr(self.model, 'feature_importances_'): 159 feature_importance = dict(zip(self.feature_names, self.model.feature_importances_)) 160 metrics['feature_importance'] = feature_importance 161 162 return metrics
Evaluate the Random Forest model performance.
Args: features: List of feature dictionaries **kwargs: Additional arguments including detailed_report
Returns: Dictionary containing evaluation metrics
164 def save_model(self, filepath: str): 165 """ 166 Save the trained Random Forest model to a file. 167 168 Args: 169 filepath: Path to save the model 170 """ 171 if not self.trained: 172 raise ValueError("Model must be trained before saving") 173 174 # Save model with additional metadata 175 model_data = { 176 'model': self.model, 177 'config': self.config, 178 'feature_names': self.feature_names, 179 'class_names': self.class_names, 180 'trained': self.trained 181 } 182 183 joblib.dump(model_data, filepath) 184 print(f"Random Forest model saved to {filepath}")
Save the trained Random Forest model to a file.
Args: filepath: Path to save the model
186 def load_model(self, filepath: str): 187 """ 188 Load a trained Random Forest model from a file. 189 190 Args: 191 filepath: Path to the saved model 192 """ 193 try: 194 model_data = joblib.load(filepath) 195 196 # Handle legacy model format 197 if isinstance(model_data, dict): 198 self.model = model_data['model'] 199 self.config = model_data.get('config', self.config) 200 self.feature_names = model_data.get('feature_names', []) 201 self.class_names = model_data.get('class_names', []) 202 self.trained = model_data.get('trained', True) 203 else: 204 # Legacy format - just the model 205 self.model = model_data 206 self.trained = True 207 208 print(f"Random Forest model loaded from {filepath}") 209 except Exception as e: 210 print(f"Error loading model: {e}") 211 raise
Load a trained Random Forest model from a file.
Args: filepath: Path to the saved model
213 def get_feature_importance(self) -> Dict[str, float]: 214 """ 215 Get feature importance scores. 216 217 Returns: 218 Dictionary mapping feature names to importance scores 219 """ 220 if not self.trained: 221 raise ValueError("Model must be trained to get feature importance") 222 223 if hasattr(self.model, 'feature_importances_'): 224 return dict(zip(self.feature_names, self.model.feature_importances_)) 225 else: 226 return {}
Get feature importance scores.
Returns: Dictionary mapping feature names to importance scores
228 def predict_single(self, single_features: Dict) -> int: 229 """ 230 Make prediction for a single feature vector. 231 232 Args: 233 single_features: Dictionary containing features for a single sample 234 235 Returns: 236 Predicted class 237 """ 238 if not self.trained: 239 raise ValueError("Model must be trained before making predictions") 240 241 # Convert single feature dict to format expected by preprocess_features 242 features_list = [single_features] 243 X, _ = preprocess_features(features_list) 244 245 return self.model.predict(X)[0]
Make prediction for a single feature vector.
Args: single_features: Dictionary containing features for a single sample
Returns: Predicted class
Inherited Members
53def get_dataset_manager(): 54 """Get the singleton DatasetManager instance.""" 55 return DatasetManager()
Get the singleton DatasetManager instance.
93def get_feature_manager(): 94 """Get the singleton FeatureManager instance.""" 95 return FeatureManager()
Get the singleton FeatureManager instance.
69def get_preprocessing_manager(): 70 """Get the singleton PreprocessingManager instance.""" 71 return PreprocessingManager()
Get the singleton PreprocessingManager instance.
Get the singleton EDAManager instance.
44def get_classification_manager(): 45 """Get the singleton ClassificationManager instance.""" 46 return ClassificationManager()
Get the singleton ClassificationManager instance.
193def get_all_managers(): 194 """ 195 Get all singleton managers. 196 197 Returns: 198 Dictionary containing all manager instances 199 """ 200 return { 201 'dataset': DatasetManager(), 202 'feature': FeatureManager(), 203 'preprocessing': PreprocessingManager(), 204 'eda': EDAManager(), 205 'classification': ClassificationManager() 206 }
Get all singleton managers.
Returns: Dictionary containing all manager instances
58def get_available_datasets(): 59 """Get list of available dataset names.""" 60 return DatasetManager().get_available_components()
Get list of available dataset names.
98def get_available_extractors(): 99 """Get list of available feature extractor names.""" 100 return FeatureManager().get_available_components()
Get list of available feature extractor names.
74def get_available_preprocessors(): 75 """Get list of available preprocessor names.""" 76 return PreprocessingManager().get_available_components()
Get list of available preprocessor names.
56def get_available_analyzers(): 57 """Get list of available EDA analyzer names.""" 58 return EDAManager().get_available_components()
Get list of available EDA analyzer names.
49def get_available_models(): 50 """Get list of available classification model names.""" 51 return ClassificationManager().get_available_components()
Get list of available classification model names.
209def get_system_info(): 210 """ 211 Get information about the available components in the system. 212 213 Returns: 214 Dictionary containing system information 215 """ 216 return { 217 'version': __version__, 218 'author': __author__, 219 'available_datasets': get_available_datasets(), 220 'available_extractors': get_available_extractors(), 221 'available_preprocessors': get_available_preprocessors(), 222 'available_analyzers': get_available_analyzers(), 223 'available_models': get_available_models(), 224 'architecture': 'Modular with singleton design pattern' 225 }
Get information about the available components in the system.
Returns: Dictionary containing system information
228def load_and_analyze_daphnet(data_dir: str, sensor_type: str = 'all', window_size: int = 192): 229 """ 230 Complete workflow for loading and analyzing Daphnet data. 231 232 Args: 233 data_dir: Directory containing the Daphnet dataset 234 sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk') 235 window_size: Size of sliding windows for feature extraction 236 237 Returns: 238 Dictionary containing data, features, and analysis results 239 """ 240 # Load dataset 241 loader = DaphnetLoader() 242 data, names = loader.load_data(data_dir) 243 244 # Create sliding windows 245 windows = loader.create_sliding_windows(data, names, window_size=window_size) 246 247 # Extract features 248 extractor = GaitFeatureExtractor() 249 features = extractor.extract_features(windows[0]['windows'], fs=64) 250 251 # Analyze data 252 analyzer = DaphnetVisualizationAnalyzer() 253 analysis = analyzer.analyze(data) 254 255 return { 256 'data': data, 257 'names': names, 258 'windows': windows, 259 'features': features, 260 'analysis': analysis, 261 'loader': loader, 262 'extractor': extractor, 263 'analyzer': analyzer 264 }
Complete workflow for loading and analyzing Daphnet data.
Args: data_dir: Directory containing the Daphnet dataset sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk') window_size: Size of sliding windows for feature extraction
Returns: Dictionary containing data, features, and analysis results
266def load_and_analyze_physionet(data_dir: str, window_size: int = 600, step_size: int = 100): 267 """ 268 Complete workflow for loading and analyzing PhysioNet VGRF data. 269 270 Args: 271 data_dir: Directory to store/find the PhysioNet dataset 272 window_size: Size of sliding windows for feature extraction (default: 600) 273 step_size: Step size for sliding windows (default: 100) 274 275 Returns: 276 Dictionary containing data, features, and analysis results 277 """ 278 # Load dataset 279 loader = PhysioNetLoader() 280 data, names = loader.load_data(data_dir) 281 282 # Create sliding windows 283 windows = loader.create_sliding_windows(data, names, window_size=window_size, step_size=step_size) 284 285 # Extract PhysioNet-specific features 286 extractor = PhysioNetFeatureExtractor() 287 all_features = [] 288 289 for window_dict in windows: 290 if 'windows' in window_dict: 291 features = extractor.extract_features(window_dict['windows'], fs=100) 292 all_features.append({ 293 'name': window_dict['name'], 294 'features': features, 295 'metadata': window_dict.get('metadata', {}) 296 }) 297 298 return { 299 'data': data, 300 'names': names, 301 'windows': windows, 302 'features': all_features, 303 'labels': loader.get_labels(), 304 'loader': loader, 305 'extractor': extractor 306 }
Complete workflow for loading and analyzing PhysioNet VGRF data.
Args: data_dir: Directory to store/find the PhysioNet dataset window_size: Size of sliding windows for feature extraction (default: 600) step_size: Step size for sliding windows (default: 100)
Returns: Dictionary containing data, features, and analysis results
308def train_gait_classifier(features, model_type: str = 'random_forest', **kwargs): 309 """ 310 Train a gait classification model. 311 312 Args: 313 features: List of feature dictionaries 314 model_type: Type of model to train ('random_forest', etc.) 315 **kwargs: Additional arguments for model training 316 317 Returns: 318 Trained model instance 319 """ 320 if model_type == 'random_forest': 321 model = RandomForestModel(**kwargs) 322 model.train(features, **kwargs) 323 return model 324 else: 325 raise ValueError(f"Model type '{model_type}' not supported")
Train a gait classification model.
Args: features: List of feature dictionaries model_type: Type of model to train ('random_forest', etc.) **kwargs: Additional arguments for model training
Returns: Trained model instance
170def load_daphnet_data(data_dir: str): 171 """ 172 Legacy function for loading Daphnet data. 173 174 Args: 175 data_dir: Directory to store the dataset 176 177 Returns: 178 Tuple of (data_list, names_list) 179 """ 180 loader = DaphnetLoader() 181 return loader.load_data(data_dir)
Legacy function for loading Daphnet data.
Args: data_dir: Directory to store the dataset
Returns: Tuple of (data_list, names_list)
184def create_sliding_windows(daphnet, daphnet_names, window_size=192, step_size=32): 185 """ 186 Legacy function for creating sliding windows. 187 188 Args: 189 daphnet: List of dataframes containing Daphnet data 190 daphnet_names: List of names of the Daphnet dataframes 191 window_size: Size of the sliding window 192 step_size: Step size for the sliding window 193 194 Returns: 195 List of dictionaries containing sliding windows for each DataFrame 196 """ 197 loader = DaphnetLoader() 198 return loader.create_sliding_windows(daphnet, daphnet_names, window_size, step_size)
Legacy function for creating sliding windows.
Args: daphnet: List of dataframes containing Daphnet data daphnet_names: List of names of the Daphnet dataframes window_size: Size of the sliding window step_size: Step size for the sliding window
Returns: List of dictionaries containing sliding windows for each DataFrame
104def load_mobifall_data(): 105 """ 106 Legacy function for loading MobiFall data. 107 108 Returns: 109 Tuple of (data_list, names_list) 110 """ 111 loader = MobiFallLoader() 112 return loader.load_data("")
Legacy function for loading MobiFall data.
Returns: Tuple of (data_list, names_list)
104def load_arduous_data(): 105 """ 106 Legacy function for loading Arduous data. 107 108 Returns: 109 Tuple of (data_list, names_list) 110 """ 111 loader = ArduousLoader() 112 return loader.load_data("")
Legacy function for loading Arduous data.
Returns: Tuple of (data_list, names_list)
313def load_physionet_data(data_dir: str) -> Tuple[List[pd.DataFrame], List[str]]: 314 """ 315 Legacy function to load PhysioNet data. 316 317 Args: 318 data_dir: Directory containing the dataset 319 320 Returns: 321 Tuple of (data_list, names_list) 322 """ 323 loader = PhysioNetLoader() 324 return loader.load_data(data_dir)
Legacy function to load PhysioNet data.
Args: data_dir: Directory containing the dataset
Returns: Tuple of (data_list, names_list)
327def create_physionet_windows(data: List[pd.DataFrame], names: List[str], 328 window_size: int = 600, step_size: int = 100) -> List[Dict]: 329 """ 330 Legacy function to create sliding windows from PhysioNet data. 331 332 Args: 333 data: List of DataFrames 334 names: List of names 335 window_size: Size of sliding window 336 step_size: Step size for sliding window 337 338 Returns: 339 List of sliding window dictionaries 340 """ 341 loader = PhysioNetLoader() 342 return loader.create_sliding_windows(data, names, window_size, step_size)
Legacy function to create sliding windows from PhysioNet data.
Args: data: List of DataFrames names: List of names window_size: Size of sliding window step_size: Step size for sliding window
Returns: List of sliding window dictionaries
392def load_harup_data(data_dir: str, subjects=None, activities=None, trials=None): 393 """ 394 Legacy function for loading HAR-UP data. 395 396 Args: 397 data_dir: Directory containing the dataset 398 subjects: List of subject IDs to load (default: all subjects) 399 activities: List of activity IDs to load (default: all activities) 400 trials: List of trial IDs to load (default: all trials) 401 402 Returns: 403 Tuple of (data_list, names_list) 404 """ 405 loader = HARUPLoader() 406 return loader.load_data(data_dir, subjects, activities, trials)
Legacy function for loading HAR-UP data.
Args: data_dir: Directory containing the dataset subjects: List of subject IDs to load (default: all subjects) activities: List of activity IDs to load (default: all activities) trials: List of trial IDs to load (default: all trials)
Returns: Tuple of (data_list, names_list)
409def create_harup_windows(harup_data, harup_names, window_size=100, step_size=50): 410 """ 411 Legacy function for creating sliding windows from HAR-UP data. 412 413 Args: 414 harup_data: List of dataframes containing HAR-UP data 415 harup_names: List of names of the HAR-UP dataframes 416 window_size: Size of the sliding window 417 step_size: Step size for the sliding window 418 419 Returns: 420 List of dictionaries containing sliding windows for each DataFrame 421 """ 422 loader = HARUPLoader() 423 return loader.create_sliding_windows(harup_data, harup_names, window_size, step_size)
Legacy function for creating sliding windows from HAR-UP data.
Args: harup_data: List of dataframes containing HAR-UP data harup_names: List of names of the HAR-UP dataframes window_size: Size of the sliding window step_size: Step size for the sliding window
Returns: List of dictionaries containing sliding windows for each DataFrame
426def extract_harup_features(windows_data, time_domain=True, freq_domain=True): 427 """ 428 Legacy function for extracting features from HAR-UP windows. 429 430 Args: 431 windows_data: List of dictionaries containing sliding windows 432 time_domain: Whether to extract time domain features 433 freq_domain: Whether to extract frequency domain features 434 435 Returns: 436 List of dictionaries containing extracted features 437 """ 438 loader = HARUPLoader() 439 return loader.extract_features(windows_data, time_domain, freq_domain)
Legacy function for extracting features from HAR-UP windows.
Args: windows_data: List of dictionaries containing sliding windows time_domain: Whether to extract time domain features freq_domain: Whether to extract frequency domain features
Returns: List of dictionaries containing extracted features
25def download_dataset(dataset_name, data_dir): 26 """Download the dataset.""" 27 if dataset_name == "daphnet": 28 download_daphnet_data(data_dir) 29 elif dataset_name == "mobifall": 30 download_mobifall_data(data_dir) 31 elif dataset_name == "arduous": 32 download_arduous_data(data_dir) 33 elif dataset_name == "harup": 34 download_harup_data(data_dir) 35 elif dataset_name == "urfall": 36 download_urfall_data(data_dir) 37 elif dataset_name == "physionet": 38 # PhysioNet dataset is handled by the PhysioNetLoader itself 39 pass 40 else: 41 raise ValueError(f"Dataset {dataset_name} not supported.")
Download the dataset.
243def extract_dataset(dataset_name, data_dir): 244 """Extract the dataset.""" 245 if dataset_name == "daphnet": 246 extract_daphnet_data(data_dir) 247 elif dataset_name == "mobifall": 248 extract_mobifall_data(data_dir) 249 elif dataset_name == "arduous": 250 extract_arduous_data(data_dir) 251 elif dataset_name == "harup": 252 extract_harup_data(data_dir) 253 elif dataset_name == "urfall": 254 extract_urfall_data(data_dir) 255 elif dataset_name == "physionet": 256 # PhysioNet dataset is handled by the PhysioNetLoader itself 257 pass 258 else: 259 raise ValueError(f"Dataset {dataset_name} not supported.")
Extract the dataset.
133def calculate_mean(signal): 134 """Calculate the mean of the signal.""" 135 return np.mean(signal)
Calculate the mean of the signal.
64def calculate_standard_deviation(signal): 65 """ 66 Calculate the standard deviation of a signal. 67 Args: 68 signal (np.array): Input signal. 69 Returns: 70 std_dev (float): Standard deviation. 71 """ 72 return np.std(signal)
Calculate the standard deviation of a signal. Args: signal (np.array): Input signal. Returns: std_dev (float): Standard deviation.
96def calculate_variance(signal): 97 """ 98 Calculate the variance of a signal. 99 Args: 100 signal (np.array): Input signal. 101 Returns: 102 variance (float): Variance. 103 """ 104 return np.var(signal)
Calculate the variance of a signal. Args: signal (np.array): Input signal. Returns: variance (float): Variance.
149def calculate_skewness(signal): 150 """Calculate the skewness of the signal.""" 151 try: 152 return skew(signal) 153 except Exception as e: 154 print(f"An error occurred in skewness: {e}") 155 return 0
Calculate the skewness of the signal.
106def calculate_kurtosis(signal): 107 """ 108 Calculate the kurtosis of a signal. 109 Args: 110 signal (np.array): Input signal. 111 Returns: 112 kurtosis_value (float): Kurtosis. 113 """ 114 try: 115 return kurtosis(signal, fisher=False) 116 except Exception as e: 117 print(f"An error occurred in feature 'kurtosis': {e}") 118 return 0
Calculate the kurtosis of a signal. Args: signal (np.array): Input signal. Returns: kurtosis_value (float): Kurtosis.
157def calculate_root_mean_square(signal): 158 """Calculate the root mean square of the signal.""" 159 return np.sqrt(np.mean(np.square(signal)))
Calculate the root mean square of the signal.
161def calculate_range(signal): 162 """Calculate the range of the signal.""" 163 return np.max(signal) - np.min(signal)
Calculate the range of the signal.
145def calculate_median(signal): 146 """Calculate the median of the signal.""" 147 return np.median(signal)
Calculate the median of the signal.
194def calculate_mode(signal): 195 """Calculate the mode of the signal.""" 196 values, counts = np.unique(signal, return_counts=True) 197 return values[np.argmax(counts)]
Calculate the mode of the signal.
206def calculate_mean_absolute_value(signal): 207 """Calculate the mean absolute value of the signal.""" 208 return np.mean(np.abs(signal))
Calculate the mean absolute value of the signal.
210def calculate_median_absolute_deviation(signal): 211 """Calculate the median absolute deviation of the signal.""" 212 return np.median(np.abs(signal - np.median(signal)))
Calculate the median absolute deviation of the signal.
180def calculate_peak_height(signal): 181 """Calculate the peak height of the signal.""" 182 peaks, _ = find_peaks(signal) 183 return np.max(signal[peaks]) if len(peaks) > 0 else 0
Calculate the peak height of the signal.
9def calculate_stride_times(signal, fs): 10 """ 11 Calculate stride times from a signal using peak detection. 12 Args: 13 signal (np.array): Input signal. 14 fs (int): Sampling frequency. 15 Returns: 16 avg_stride_time (float): Average stride time. 17 """ 18 peaks, _ = find_peaks(signal) 19 stride_times = np.diff(peaks) / fs 20 avg_stride_time = np.mean(stride_times) if len(stride_times) > 0 else 0 21 return avg_stride_time
Calculate stride times from a signal using peak detection. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: avg_stride_time (float): Average stride time.
120def calculate_step_time(signal, fs): 121 """ 122 Calculate step times from a signal using peak detection. 123 Args: 124 signal (np.array): Input signal. 125 fs (int): Sampling frequency. 126 Returns: 127 step_times (np.array): Array of step times. 128 """ 129 peaks, _ = find_peaks(signal) 130 step_times = np.diff(peaks) / fs 131 return step_times
Calculate step times from a signal using peak detection. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: step_times (np.array): Array of step times.
199def calculate_cadence(signal, fs): 200 """Calculate the cadence (steps per minute) of the signal.""" 201 peaks, _ = find_peaks(signal) 202 step_count = len(peaks) 203 duration = len(signal) / fs 204 return (step_count / duration) * 60
Calculate the cadence (steps per minute) of the signal.
50def calculate_freezing_index(signal, fs): 51 """ 52 Calculate the freezing index of a signal. 53 Args: 54 signal (np.array): Input signal. 55 fs (int): Sampling frequency. 56 Returns: 57 freezing_index (float): Freezing index. 58 """ 59 power_3_8 = calculate_power(signal, fs, (3, 8)) 60 power_0_5_3 = calculate_power(signal, fs, (0.5, 3)) 61 freezing_index = power_3_8 / power_0_5_3 if power_0_5_3 != 0 else 0 62 return freezing_index
Calculate the freezing index of a signal. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: freezing_index (float): Freezing index.
169def calculate_dominant_frequency(signal, fs): 170 """Calculate the dominant frequency of the signal.""" 171 try: 172 fft_values = np.abs(fft(signal)) 173 freqs = np.fft.fftfreq(len(signal), 1 / fs) 174 dominant_freq = freqs[np.argmax(fft_values)] 175 return dominant_freq 176 except Exception as e: 177 print(f"An error occurred: {e}") 178 return 0
Calculate the dominant frequency of the signal.
214def calculate_peak_frequency(signal, fs): 215 """Calculate the peak frequency of the signal.""" 216 try: 217 f, Pxx = welch(signal, fs=fs, nperseg=min(len(signal), 192)) # Ensure nperseg ≤ length 218 return f[np.argmax(Pxx)] 219 except Exception as e: 220 print(f"An error occurred in feature 'peak_frequency': {e}") 221 return 0
Calculate the peak frequency of the signal.
233def calculate_power_spectral_entropy(signal, fs): 234 """Calculate the power spectral entropy of the signal.""" 235 try: 236 f, Pxx = welch(signal, fs=fs, nperseg=min(len(signal), 192)) # Ensure nperseg ≤ length 237 Pxx_norm = Pxx / np.sum(Pxx) 238 return -np.sum(Pxx_norm * np.log2(Pxx_norm + np.finfo(float).eps)) 239 except Exception as e: 240 print(f"An error occurred in feature 'power spectral entropy': {e}") 241 return 0
Calculate the power spectral entropy of the signal.
243def calculate_principal_harmonic_frequency(signal, fs): 244 """Calculate the principal harmonic frequency of the signal.""" 245 try: 246 fft_values = np.abs(fft(signal)) 247 freqs = np.fft.fftfreq(len(signal), 1 / fs) 248 return freqs[np.argmax(fft_values)] 249 except Exception as e: 250 print(f"An error occurred in feature 'principal_harmonic_frequency': {e}") 251 return 0
Calculate the principal harmonic frequency of the signal.
74def calculate_entropy(signal): 75 """ 76 Calculate the entropy of a signal. 77 Args: 78 signal (np.array): Input signal. 79 Returns: 80 entropy_value (float): Entropy. 81 """ 82 value, counts = np.unique(signal, return_counts=True) 83 probabilities = counts / len(signal) 84 return entropy(probabilities, base=2)
Calculate the entropy of a signal. Args: signal (np.array): Input signal. Returns: entropy_value (float): Entropy.
185def calculate_interquartile_range(signal): 186 """Calculate the interquartile range of the signal.""" 187 try: 188 q75, q25 = np.percentile(signal, [75, 25]) 189 return q75 - q25 190 except Exception as e: 191 print(f"An error occurred in feature 'interquartile_range': {e}") 192 return 0
Calculate the interquartile range of the signal.
165def calculate_correlation(signal1, signal2): 166 """Calculate the correlation between two signals.""" 167 return np.corrcoef(signal1, signal2)[0, 1]
Calculate the correlation between two signals.
253def calculate_auto_regression_coefficients(signal, order=3): 254 """Calculate the auto-regression coefficients of the signal.""" 255 try: 256 model = AutoReg(signal, lags=order) 257 results = model.fit() 258 return results.params 259 except Exception as e: 260 print(f"An error occurred in feature 'auto_regression_coefficients': {e}") 261 return 0
Calculate the auto-regression coefficients of the signal.
15def clip_sliding_windows(data, min_val=-1, max_val=1): 16 """ 17 Clip values in the sliding windows to be within a specified range. 18 """ 19 return np.clip(data, min_val, max_val)
Clip values in the sliding windows to be within a specified range.
21def remove_noise(data, window_size=5): 22 """ 23 Apply a moving average filter to reduce noise. 24 """ 25 return data.rolling(window=window_size, center=True).mean().fillna(method="bfill").fillna(method="ffill")
Apply a moving average filter to reduce noise.
27def remove_outliers(data, threshold=3): 28 """ 29 Remove outliers beyond a given threshold using the Z-score method. 30 """ 31 mean, std = data.mean(), data.std() 32 return data[(data - mean).abs() <= threshold * std]
Remove outliers beyond a given threshold using the Z-score method.
34def remove_baseline(data): 35 """ 36 Remove baseline by subtracting the mean. 37 """ 38 return data - data.mean()
Remove baseline by subtracting the mean.
40def remove_drift(data, cutoff=0.01, fs=100): 41 """ 42 Remove low-frequency drift using a high-pass filter. 43 """ 44 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 45 return filtfilt(b, a, data)
Remove low-frequency drift using a high-pass filter.
47def remove_artifacts(data, method="interpolate"): 48 """ 49 Remove artifacts by interpolating missing values. 50 """ 51 return data.interpolate(method="linear").fillna(method="bfill").fillna(method="ffill")
Remove artifacts by interpolating missing values.
53def remove_trend(data, order=2): 54 """ 55 Remove trends using polynomial fitting. 56 """ 57 x = np.arange(len(data)) 58 poly_coeffs = np.polyfit(x, data, order) 59 trend = np.polyval(poly_coeffs, x) 60 return data - trend
Remove trends using polynomial fitting.
62def remove_dc_offset(data): 63 """ 64 Remove DC offset by subtracting the mean. 65 """ 66 return data - data.mean()
Remove DC offset by subtracting the mean.
68def remove_high_frequency_noise(data, cutoff=10, fs=100): 69 """ 70 Apply a low-pass filter to remove high-frequency noise. 71 """ 72 b, a = butter(1, cutoff / (fs / 2), btype='lowpass') 73 return filtfilt(b, a, data)
Apply a low-pass filter to remove high-frequency noise.
75def remove_low_frequency_noise(data, cutoff=0.5, fs=100): 76 """ 77 Apply a high-pass filter to remove low-frequency noise. 78 """ 79 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 80 return filtfilt(b, a, data)
Apply a high-pass filter to remove low-frequency noise.
19def plot_thigh_data(daphnetThigh, daphnetNames, i): 20 """ 21 Plot thigh acceleration data for a specific dataset. 22 Args: 23 daphnetThigh (list): List of DataFrames containing thigh acceleration data. 24 daphnetNames (list): List of dataset names. 25 i (int): Index of the dataset to plot. 26 """ 27 print(daphnetNames[i]) 28 fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16)) 29 fig.suptitle("Thigh Data from " + daphnetNames[i]) 30 plt.xlabel("Time") 31 32 df = daphnetThigh[i] 33 df = df[df.annotations > 0] # Filter out rows with no annotations 34 neg = df[df.annotations == 1] # No freeze 35 pos = df[df.annotations == 2] # Freeze 36 37 # Plot horizontal forward thigh acceleration 38 ax1 = axes[0] 39 ax1.plot(df.thigh_h_fd) 40 ax1.set_ylabel("Horizontal Forward Thigh Acceleration") 41 ax1.scatter(neg.index, neg.thigh_h_fd, c='orange', label="no freeze") 42 ax1.scatter(pos.index, pos.thigh_h_fd, c='purple', label="freeze") 43 ax1.legend() 44 45 # Plot vertical thigh acceleration 46 ax2 = axes[1] 47 ax2.plot(df.thigh_v) 48 ax2.set_ylabel("Vertical Thigh Acceleration") 49 ax2.scatter(neg.index, neg.thigh_v, c='orange', label="no freeze") 50 ax2.scatter(pos.index, pos.thigh_v, c='purple', label="freeze") 51 ax2.legend() 52 53 # Plot horizontal lateral thigh acceleration 54 ax3 = axes[2] 55 ax3.plot(df.thigh_h_l) 56 ax3.set_ylabel("Horizontal Lateral Thigh Acceleration") 57 ax3.scatter(neg.index, neg.thigh_h_l, c='orange', label="no freeze") 58 ax3.scatter(pos.index, pos.thigh_h_l, c='purple', label="freeze") 59 ax3.legend() 60 61 # Plot overall thigh acceleration 62 ax4 = axes[3] 63 ax4.plot(df.thigh) 64 ax4.set_ylabel("Overall Thigh Acceleration") 65 ax4.scatter(neg.index, neg.thigh, c='orange', label="no freeze") 66 ax4.scatter(pos.index, pos.thigh, c='purple', label="freeze") 67 ax4.legend() 68 69 plt.tight_layout() 70 plt.show()
Plot thigh acceleration data for a specific dataset. Args: daphnetThigh (list): List of DataFrames containing thigh acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
73def plot_shank_data(daphnetShank, daphnetNames, i): 74 """ 75 Plot shank acceleration data for a specific dataset. 76 Args: 77 daphnetShank (list): List of DataFrames containing shank acceleration data. 78 daphnetNames (list): List of dataset names. 79 i (int): Index of the dataset to plot. 80 """ 81 print(daphnetNames[i]) 82 fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16)) 83 fig.suptitle("Shank Data from " + daphnetNames[i]) 84 plt.xlabel("Time") 85 86 df = daphnetShank[i] 87 df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2) 88 df = df[df.annotations > 0] 89 neg = df[df.annotations == 1] 90 pos = df[df.annotations == 2] 91 92 ax1 = axes[0] 93 ax1.plot(df.shank_h_fd) 94 ax1.set_ylabel("Horizontal Forward Shank Acceleration") 95 ax1.scatter(neg.index, neg.shank_h_fd, c='orange', label="no freeze") 96 ax1.scatter(pos.index, pos.shank_h_fd, c='purple', label="freeze") 97 ax1.legend() 98 99 ax2 = axes[1] 100 ax2.plot(df.shank_v) 101 ax2.set_ylabel("Vertical Shank Acceleration") 102 ax2.scatter(neg.index, neg.shank_v, c='orange', label="no freeze") 103 ax2.scatter(pos.index, pos.shank_v, c='purple', label="freeze") 104 ax2.legend() 105 106 ax3 = axes[2] 107 ax3.plot(df.shank_h_l) 108 ax3.set_ylabel("Horizontal Lateral Shank Acceleration") 109 ax3.scatter(neg.index, neg.shank_h_l, c='orange', label="no freeze") 110 ax3.scatter(pos.index, pos.shank_h_l, c='purple', label="freeze") 111 ax3.legend() 112 113 ax4 = axes[3] 114 ax4.plot(df.shank) 115 ax4.set_ylabel("Overall Shank Acceleration") 116 ax4.scatter(neg.index, neg.shank, c='orange', label="no freeze") 117 ax4.scatter(pos.index, pos.shank, c='purple', label="freeze") 118 ax4.legend() 119 120 plt.tight_layout() 121 plt.show()
Plot shank acceleration data for a specific dataset. Args: daphnetShank (list): List of DataFrames containing shank acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
124def plot_trunk_data(daphnetTrunk, daphnetNames, i): 125 """ 126 Plot trunk acceleration data for a specific dataset. 127 Args: 128 daphnetTrunk (list): List of DataFrames containing trunk acceleration data. 129 daphnetNames (list): List of dataset names. 130 i (int): Index of the dataset to plot. 131 """ 132 print(daphnetNames[i]) 133 fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16)) 134 fig.suptitle("Trunk Data from " + daphnetNames[i]) 135 plt.xlabel("Time") 136 137 df = daphnetTrunk[i] 138 df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2) 139 df = df[df.annotations > 0] 140 neg = df[df.annotations == 1] 141 pos = df[df.annotations == 2] 142 143 ax1 = axes[0] 144 ax1.plot(df.trunk_h_fd) 145 ax1.set_ylabel("Horizontal Forward Trunk Acceleration") 146 ax1.scatter(neg.index, neg.trunk_h_fd, c='orange', label="no freeze") 147 ax1.scatter(pos.index, pos.trunk_h_fd, c='purple', label="freeze") 148 ax1.legend() 149 150 ax2 = axes[1] 151 ax2.plot(df.trunk_v) 152 ax2.set_ylabel("Vertical Trunk Acceleration") 153 ax2.scatter(neg.index, neg.trunk_v, c='orange', label="no freeze") 154 ax2.scatter(pos.index, pos.trunk_v, c='purple', label="freeze") 155 ax2.legend() 156 157 ax3 = axes[2] 158 ax3.plot(df.trunk_h_l) 159 ax3.set_ylabel("Horizontal Lateral Trunk Acceleration") 160 ax3.scatter(neg.index, neg.trunk_h_l, c='orange', label="no freeze") 161 ax3.scatter(pos.index, pos.trunk_h_l, c='purple', label="freeze") 162 ax3.legend() 163 164 ax4 = axes[3] 165 ax4.plot(df.trunk) 166 ax4.set_ylabel("Overall Trunk Acceleration") 167 ax4.scatter(neg.index, neg.trunk, c='orange', label="no freeze") 168 ax4.scatter(pos.index, pos.trunk, c='purple', label="freeze") 169 ax4.legend() 170 171 plt.tight_layout() 172 plt.show()
Plot trunk acceleration data for a specific dataset. Args: daphnetTrunk (list): List of DataFrames containing trunk acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
191def plot_all_data(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames, i): 192 """ 193 Plot thigh, shank, and trunk acceleration data for a specific dataset. 194 Args: 195 daphnetThigh (list): List of DataFrames containing thigh acceleration data. 196 daphnetShank (list): List of DataFrames containing shank acceleration data. 197 daphnetTrunk (list): List of DataFrames containing trunk acceleration data. 198 daphnetNames (list): List of dataset names. 199 i (int): Index of the dataset to plot. 200 """ 201 plot_thigh_data(daphnetThigh, daphnetNames, i) 202 plot_shank_data(daphnetShank, daphnetNames, i) 203 plot_trunk_data(daphnetTrunk, daphnetNames, i)
Plot thigh, shank, and trunk acceleration data for a specific dataset. Args: daphnetThigh (list): List of DataFrames containing thigh acceleration data. daphnetShank (list): List of DataFrames containing shank acceleration data. daphnetTrunk (list): List of DataFrames containing trunk acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
175def plot_all_thigh_data(daphnetThigh, daphnetNames): 176 """Plot thigh acceleration data for all datasets.""" 177 for i in range(len(daphnetThigh)): 178 plot_thigh_data(daphnetThigh, daphnetNames, i)
Plot thigh acceleration data for all datasets.
180def plot_all_shank_data(daphnetShank, daphnetNames): 181 """Plot shank acceleration data for all datasets.""" 182 for i in range(len(daphnetShank)): 183 plot_shank_data(daphnetShank, daphnetNames, i)
Plot shank acceleration data for all datasets.
185def plot_all_trunk_data(daphnetTrunk, daphnetNames): 186 """Plot trunk acceleration data for all datasets.""" 187 for i in range(len(daphnetTrunk)): 188 plot_trunk_data(daphnetTrunk, daphnetNames, i)
Plot trunk acceleration data for all datasets.
205def plot_all_datasets(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames): 206 """Plot thigh, shank, and trunk acceleration data for all datasets.""" 207 for i in range(len(daphnetThigh)): 208 plot_all_data(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames, i)
Plot thigh, shank, and trunk acceleration data for all datasets.
11def plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, sensor_name="shank", num_windows=10, save=False): 12 """ 13 @brief Plots sliding windows of a sensor's time series data with overlaid statistical features. 14 15 This function plots the first `num_windows` sliding windows within the given `start_idx` and `end_idx` 16 for a specified sensor and overlays feature values at their corresponding time indices. 17 It also displays entropy and dominant frequency in a separate plot. 18 19 @param[in] sliding_windows List of dictionaries, where each dictionary contains: 20 - 'name': sensor name (str) 21 - 'data': List of time-series windows (each as a Pandas Series) 22 @param[in] features List of dictionaries, where each dictionary contains: 23 - 'name': sensor name (str) 24 - 'features': Dictionary of extracted feature lists 25 @param[in] start_idx Start index of the time window to be plotted. 26 @param[in] end_idx End index of the time window to be plotted. 27 @param[in] sensor_name Name of the sensor to be plotted (default: "shank"). 28 @param[in] num_windows Number of sliding windows to plot (default: 10). 29 @param[in] save If True, saves the plot to a file instead of displaying it. 30 31 @return None 32 """ 33 34 fig, axes = plt.subplots(2, 1, figsize=(20, 10), gridspec_kw={'height_ratios': [3, 1]}) 35 36 # Extract sensor windows 37 sensor_windows = next((sw['data'] for sw in sliding_windows if sw['name'] == sensor_name), None) 38 if sensor_windows is None: 39 print(f"Sensor '{sensor_name}' not found in sliding_windows.") 40 return 41 42 # Extract corresponding features 43 sensor_features = next((feat['features'] for feat in features if feat['name'] == sensor_name), None) 44 if sensor_features is None: 45 print(f"Sensor '{sensor_name}' not found in features.") 46 return 47 48 # Filter windows based on start_idx and end_idx 49 filtered_windows = [series for series in sensor_windows if start_idx <= series.index[0] and series.index[-1] <= end_idx] 50 51 if not filtered_windows: 52 print(f"No windows found in the specified index range ({start_idx} - {end_idx}).") 53 return 54 55 # Store entropy & frequency features for separate plotting 56 entropy_values = [] 57 dominant_frequencies = [] 58 59 # Plot first `num_windows` windows 60 for i in range(min(num_windows, len(filtered_windows))): 61 series = filtered_windows[i] # Each window is a Pandas Series 62 63 # Extract time and signal values 64 time_values = series.index.to_numpy() # Time is the index 65 signal_values = series.values # Sensor readings 66 67 # Determine actual start and end indices for this window 68 window_start, window_end = time_values[0], time_values[-1] 69 70 # Plot time series data 71 axes[0].plot(time_values, signal_values, alpha=0.6) 72 73 # Mark start and end of each window with vertical dotted lines 74 axes[0].axvline(x=window_start, color='black', linestyle='dotted', alpha=0.7) 75 axes[0].axvline(x=window_end, color='black', linestyle='dotted', alpha=0.7) 76 77 # Overlay statistical features 78 for feature, marker in zip(['mean', 'rms', 'peak_height', 'mode', 'median'], ['x', 'o', 'v', '<', '^']): 79 if feature in sensor_features and len(sensor_features[feature]) > i: 80 feature_value = sensor_features[feature][i] 81 if feature_value != 0: # Skip zero values 82 closest_index = np.argmin(np.abs(signal_values - feature_value)) 83 closest_time = time_values[closest_index] 84 axes[0].scatter(closest_time, feature_value, color='red', marker=marker, s=100) 85 86 # Store entropy & frequency features for separate plotting 87 if 'entropy' in sensor_features and len(sensor_features['entropy']) > i: 88 entropy_values.append(sensor_features['entropy'][i]) 89 if 'dominant_frequency' in sensor_features and len(sensor_features['dominant_frequency']) > i: 90 dominant_frequencies.append(sensor_features['dominant_frequency'][i]) 91 92 # Labels and title for time-series plot 93 axes[0].set_xlabel('Time') 94 axes[0].set_ylabel(f'{sensor_name} Signal') 95 axes[0].set_title(f'First {num_windows} windows of {sensor_name} in range {start_idx}-{end_idx} with Features') 96 97 # Frequency-domain & entropy plot (axes[1]) 98 if dominant_frequencies: 99 window_indices = list(range(len(dominant_frequencies))) 100 axes[1].plot(window_indices, dominant_frequencies, label="Dominant Frequency", marker="o", linestyle="dashed", color="blue") 101 102 if entropy_values: 103 axes[1].bar(window_indices, entropy_values, alpha=0.6, label="Entropy", color="green") 104 105 axes[1].set_xlabel("Window Index") 106 axes[1].set_ylabel("Feature Value") 107 axes[1].set_title("Frequency & Entropy Features") 108 axes[1].legend() 109 110 plt.tight_layout() 111 112 # Save or show plot 113 if save: 114 file_path = input("Enter the file path to save the plot (e.g., 'plot.png'): ") 115 plt.savefig(file_path, dpi=300) 116 print(f"Plot saved at {file_path}") 117 else: 118 plt.show()
@brief Plots sliding windows of a sensor's time series data with overlaid statistical features.
This function plots the first num_windows sliding windows within the given start_idx and end_idx
for a specified sensor and overlays feature values at their corresponding time indices.
It also displays entropy and dominant frequency in a separate plot.
@param[in] sliding_windows List of dictionaries, where each dictionary contains: - 'name': sensor name (str) - 'data': List of time-series windows (each as a Pandas Series) @param[in] features List of dictionaries, where each dictionary contains: - 'name': sensor name (str) - 'features': Dictionary of extracted feature lists @param[in] start_idx Start index of the time window to be plotted. @param[in] end_idx End index of the time window to be plotted. @param[in] sensor_name Name of the sensor to be plotted (default: "shank"). @param[in] num_windows Number of sliding windows to plot (default: 10). @param[in] save If True, saves the plot to a file instead of displaying it.
@return None
249def create_random_forest_model(n_estimators=100, random_state=42, max_depth=None): 250 """ 251 Create a Random Forest model with specified parameters. 252 253 Args: 254 n_estimators: Number of trees in the forest 255 random_state: Random state for reproducibility 256 max_depth: Maximum depth of the tree 257 258 Returns: 259 RandomForestModel instance 260 """ 261 return RandomForestModel(n_estimators=n_estimators, random_state=random_state, max_depth=max_depth)
Create a Random Forest model with specified parameters.
Args: n_estimators: Number of trees in the forest random_state: Random state for reproducibility max_depth: Maximum depth of the tree
Returns: RandomForestModel instance
14def preprocess_features(features): 15 """ 16 Convert the features dictionary into X (feature matrix) and y (labels), 17 ensuring all feature vectors have a consistent length. 18 """ 19 X = [] 20 y = [] 21 feature_lengths = [] # Track feature lengths to standardize across sensors 22 23 for sensor_dict in features: 24 sensor_name = sensor_dict["name"] 25 sensor_features = sensor_dict["features"] 26 sensor_annotations = sensor_dict["annotations"] 27 28 num_windows = len(sensor_annotations) # Expected number of windows 29 feature_arrays = [] 30 31 for key in sensor_features: 32 feature_array = sensor_features[key] # Extract the feature list 33 feature_array = np.array(feature_array, dtype=object) # Convert to NumPy object array 34 35 # Ensure it's a list of equal-length vectors 36 if isinstance(feature_array[0], (list, np.ndarray)): 37 print(f"Fixing inconsistent feature '{key}' in sensor '{sensor_name}'.") 38 39 # Find max length for this feature across all windows 40 max_length = max(len(f) if isinstance(f, (list, np.ndarray)) else 1 for f in feature_array) 41 feature_lengths.append(max_length) # Store max feature length for later 42 43 # Pad/truncate each feature to be the same length 44 feature_array = np.array([ 45 np.pad(np.ravel(f), (0, max_length - len(f)), 'constant', constant_values=0) 46 if isinstance(f, (list, np.ndarray)) else np.array([f] + [0] * (max_length - 1)) 47 for f in feature_array 48 ]) 49 50 # Ensure consistency in number of windows 51 if len(feature_array) != num_windows: 52 print(f"Skipping feature '{key}' due to mismatched length: {len(feature_array)} instead of {num_windows}.") 53 continue 54 55 feature_arrays.append(feature_array) 56 57 if not feature_arrays: 58 continue 59 60 # Concatenate features per window 61 try: 62 feature_matrix = np.column_stack(feature_arrays) 63 except ValueError: 64 print(f"Error: Features in sensor '{sensor_name}' have inconsistent shapes. Skipping sensor.") 65 continue 66 67 X.append(feature_matrix) 68 y.append(np.array(sensor_annotations)) 69 70 if not X or not y: 71 raise ValueError("No valid features or labels found.") 72 73 # **Fix: Standardize feature matrix sizes across sensors** 74 max_feature_dim = max(map(lambda x: x.shape[1], X)) # Get the max feature size 75 print(f"Standardizing all feature vectors to {max_feature_dim} dimensions.") 76 77 # Pad/truncate all feature matrices to match max_feature_dim 78 X = [np.pad(x, ((0, 0), (0, max_feature_dim - x.shape[1])), 'constant', constant_values=0) if x.shape[1] < max_feature_dim else x[:, :max_feature_dim] for x in X] 79 80 # Stack all feature matrices 81 X = np.vstack(X).astype(np.float32) 82 y = np.concatenate(y) 83 84 # Remap labels to zero-based contiguous integers 85 unique_labels = np.unique(y) 86 label_map = {label: idx for idx, label in enumerate(unique_labels)} 87 y_remapped = np.array([label_map[label] for label in y]) 88 89 # Also update annotations in feature_dicts 90 # This part of the code was not provided in the original file, 91 # so I'm not adding it as per instruction 1. 92 93 return X, y_remapped
Convert the features dictionary into X (feature matrix) and y (labels), ensuring all feature vectors have a consistent length.
12def evaluate_model(model, features): 13 """ 14 Evaluates the given model on the provided features and prints accuracy and confusion matrix. 15 """ 16 X, y = preprocess_features(features) 17 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) 18 19 y_pred = model.predict(X_test) 20 21 acc = accuracy_score(y_test, y_pred) 22 # conf_matrix = confusion_matrix(y_test, y_pred) 23 24 print(f"Accuracy: {acc:.4f}") 25 # print(f"Confusion Matrix:\n{conf_matrix}")
Evaluates the given model on the provided features and prints accuracy and confusion matrix.