Source code for voxelops.utils.bids

"""BIDS post-processing utilities for HeudiConv output."""

import json
import stat
from pathlib import Path
from typing import Any, Dict, List, Optional


[docs] def post_process_heudiconv_output( bids_dir: Path, participant: str, session: Optional[str] = None, dry_run: bool = False, ) -> Dict[str, Any]: """ Post-process HeudiConv output to ensure BIDS compliance. Orchestrates all post-processing steps: 1. Verify fieldmap EPI files exist 2. Add IntendedFor fields to fmap JSONs 3. Hide bval/bvec from fmap directories (rename with dot prefix) Parameters ---------- bids_dir : Path Root BIDS directory. participant : str Participant ID (without 'sub-' prefix). session : Optional[str], optional Session ID (without 'ses-' prefix), if applicable, by default None. dry_run : bool, optional If True, report changes without modifying files, by default False. Returns ------- Dict[str, Any] A dictionary with results: - 'success': bool - 'verification': dict - 'intended_for': dict - 'cleanup': dict - 'errors': list """ results = { "success": True, "errors": [], "verification": {}, "intended_for": {}, "cleanup": {}, } # Build participant directory path participant_dir = bids_dir / f"sub-{participant}" if session: participant_dir = participant_dir / f"ses-{session}" if not participant_dir.exists(): results["success"] = False results["errors"].append(f"Participant directory not found: {participant_dir}") return results # Step 1: Verify fieldmap EPI files exist try: verification_result = verify_fmap_epi_files(participant_dir, session) results["verification"] = verification_result if not verification_result["success"]: results["errors"].extend(verification_result.get("errors", [])) except Exception as e: results["errors"].append(f"Verification failed: {e}") results["success"] = False # Step 2: Add IntendedFor to fieldmap JSONs try: intended_for_result = add_intended_for_to_fmaps( participant_dir, session, dry_run ) results["intended_for"] = intended_for_result if not intended_for_result["success"]: results["errors"].extend(intended_for_result.get("errors", [])) except Exception as e: results["errors"].append(f"IntendedFor processing failed: {e}") results["success"] = False # Step 3: Hide bval/bvec from fmap directories try: cleanup_result = remove_bval_bvec_from_fmaps(participant_dir, session, dry_run) results["cleanup"] = cleanup_result if not cleanup_result["success"]: results["errors"].extend(cleanup_result.get("errors", [])) except Exception as e: results["errors"].append(f"Cleanup failed: {e}") results["success"] = False return results
[docs] def verify_fmap_epi_files( participant_dir: Path, session: Optional[str] = None, ) -> Dict[str, Any]: """ Verify that expected fieldmap EPI files exist. Checks for existence of ``*acq-dwi*_epi.nii.gz`` and ``.json`` in ``fmap/`` directory. Parameters ---------- participant_dir : Path Path to participant directory (or session directory if session exists). session : Optional[str], optional Session ID (for logging purposes), by default None. Returns ------- Dict[str, Any] Dictionary with verification results. """ results = { "success": True, "found_files": [], "missing_files": [], "errors": [], } fmap_dir = participant_dir / "fmap" if not fmap_dir.exists(): results["success"] = False results["errors"].append(f"Fieldmap directory not found: {fmap_dir}") return results # Look for DWI fieldmap files dwi_epi_nii = list(fmap_dir.glob("*acq-dwi*_epi.nii.gz")) dwi_epi_json = list(fmap_dir.glob("*acq-dwi*_epi.json")) if dwi_epi_nii: results["found_files"].extend([str(f.name) for f in dwi_epi_nii]) else: results["missing_files"].append("*acq-dwi*_epi.nii.gz") results["errors"].append("No DWI fieldmap NIfTI files found") results["success"] = False if dwi_epi_json: results["found_files"].extend([str(f.name) for f in dwi_epi_json]) else: results["missing_files"].append("*acq-dwi*_epi.json") results["errors"].append("No DWI fieldmap JSON files found") results["success"] = False return results
[docs] def add_intended_for_to_fmaps( participant_dir: Path, session: Optional[str] = None, dry_run: bool = False, ) -> Dict[str, Any]: """ Add IntendedFor fields to fieldmap JSON files. Maps fieldmaps to target files based on acquisition type: - ``acq-dwi*_epi.json`` -> all ``dwi/*_dwi.nii.gz`` files - ``acq-func*_epi.json`` -> all ``func/*_bold.nii.gz`` files Parameters ---------- participant_dir : Path Path to participant directory (or session directory if session exists). session : Optional[str], optional Session ID (for building relative paths), by default None. dry_run : bool, optional If True, report changes without modifying files, by default False. Returns ------- Dict[str, Any] Dictionary with processing results. """ results = { "success": True, "updated_files": [], "errors": [], "dry_run": dry_run, } fmap_dir = participant_dir / "fmap" if not fmap_dir.exists(): results["success"] = False results["errors"].append(f"Fieldmap directory not found: {fmap_dir}") return results # Find all fieldmap JSON files fmap_jsons = list(fmap_dir.glob("*_epi.json")) if not fmap_jsons: results["errors"].append("No fieldmap JSON files found") results["success"] = False return results for fmap_json in fmap_jsons: try: # Determine acquisition type from filename filename = fmap_json.name if "acq-dwi" in filename: # DWI fieldmap -> find DWI targets target_files = _find_dwi_targets(participant_dir) acq_type = "DWI" elif "acq-func" in filename: # Functional fieldmap -> find all BOLD targets target_files = _find_func_targets(participant_dir) acq_type = "functional" else: results["errors"].append(f"Unknown acquisition type in {filename}") continue if not target_files: results["errors"].append(f"No target files found for {filename}") continue # Build IntendedFor paths (relative to session or participant directory) intended_for_paths = [ _build_intended_for_path(target, participant_dir, session) for target in target_files ] # Update JSON file if not dry_run: success = _update_json_sidecar(fmap_json, intended_for_paths) if success: results["updated_files"].append( { "file": str(fmap_json.name), "type": acq_type, "targets": intended_for_paths, } ) else: results["errors"].append(f"Failed to update {filename}") else: results["updated_files"].append( { "file": str(fmap_json.name), "type": acq_type, "targets": intended_for_paths, "note": "Dry run - not modified", } ) except Exception as e: results["errors"].append(f"Error processing {fmap_json.name}: {e}") results["success"] = False return results
[docs] def remove_bval_bvec_from_fmaps( participant_dir: Path, session: Optional[str] = None, dry_run: bool = False, ) -> Dict[str, Any]: """ Hide .bvec and .bval files from fmap directories by renaming with dot prefix. These files are incorrectly generated by dcm2niix for fieldmaps and are not BIDS-compliant for EPI fieldmaps. Instead of deleting, we rename them with a leading dot to hide them (e.g., ``.filename.bvec``). Parameters ---------- participant_dir : Path Path to participant directory (or session directory if session exists). session : Optional[str], optional Session ID (for logging purposes), by default None. dry_run : bool, optional If True, report files to hide without renaming, by default False. Returns ------- Dict[str, Any] Dictionary with cleanup results. """ results = { "success": True, "hidden_files": [], "errors": [], "dry_run": dry_run, } fmap_dir = participant_dir / "fmap" if not fmap_dir.exists(): results["errors"].append(f"Fieldmap directory not found: {fmap_dir}") results["success"] = False return results # Find all .bvec and .bval files in fmap directory (excluding already hidden ones) bvec_files = [f for f in fmap_dir.glob("*_epi.bvec") if not f.name.startswith(".")] bval_files = [f for f in fmap_dir.glob("*_epi.bval") if not f.name.startswith(".")] files_to_hide = bvec_files + bval_files if not files_to_hide: # Not an error - just means files are already clean/hidden return results for file_path in files_to_hide: try: if not dry_run: # Rename with leading dot to hide hidden_path = file_path.parent / f".{file_path.name}" file_path.rename(hidden_path) results["hidden_files"].append( { "original": str(file_path.name), "hidden_as": str(hidden_path.name), } ) else: results["hidden_files"].append( { "file": str(file_path.name), "will_hide_as": f".{file_path.name}", "note": "Dry run - not renamed", } ) except Exception as e: results["errors"].append(f"Failed to hide {file_path.name}: {e}") results["success"] = False return results
# Private helper functions def _find_dwi_targets(participant_dir: Path) -> List[Path]: """Find all DWI NIfTI files in dwi directory.""" dwi_dir = participant_dir / "dwi" if not dwi_dir.exists(): return [] return list(dwi_dir.glob("*_dwi.nii.gz")) def _find_func_targets(participant_dir: Path) -> List[Path]: """Find all functional BOLD NIfTI files in func directory.""" func_dir = participant_dir / "func" if not func_dir.exists(): return [] return list(func_dir.glob("*_bold.nii.gz")) def _build_intended_for_path( target_file: Path, participant_dir: Path, session: Optional[str] = None, ) -> str: """ Build BIDS-compliant relative path for IntendedFor field. Paths are relative to the session directory (if session exists) or participant directory. Parameters ---------- target_file : Path Absolute path to target file. participant_dir : Path Path to participant/session directory. session : Optional[str], optional Session ID if applicable, by default None. Returns ------- str Relative path string for IntendedFor field. """ # Get path relative to participant_dir try: rel_path = target_file.relative_to(participant_dir) if session: # Add "ses-{session}" before rel_path = f"ses-{session}/{rel_path}" return str(rel_path) except ValueError: # If relative_to fails, build manually # This shouldn't happen if paths are constructed correctly return str(target_file.name) def _update_json_sidecar(json_path: Path, intended_for: List[str]) -> bool: """ Update JSON sidecar file with IntendedFor field. Reads existing JSON, adds/updates IntendedFor field, and writes back. Preserves all existing fields. Handles read-only files by making them writable. Parameters ---------- json_path : Path Path to JSON file. intended_for : List[str] List of relative paths for IntendedFor field. Returns ------- bool True if successful, False otherwise. """ try: # Read existing JSON data = _read_json_sidecar(json_path) if data is None: return False # Add IntendedFor field (BIDS spec requires array) data["IntendedFor"] = intended_for # Make file writable if it's read-only (HeudiConv creates read-only files) current_mode = json_path.stat().st_mode if not (current_mode & stat.S_IWUSR): # Add user write permission json_path.chmod(current_mode | stat.S_IWUSR) # Write back with formatting with open(json_path, "w") as f: json.dump(data, f, indent=2) return True except Exception as e: print(f"Error updating {json_path}: {e}") return False def _read_json_sidecar(json_path: Path) -> Optional[Dict[str, Any]]: """ Read JSON sidecar file with error handling. Parameters ---------- json_path : Path Path to JSON file. Returns ------- Optional[Dict[str, Any]] Dictionary with JSON contents, or None if reading fails. """ try: with open(json_path, "r") as f: return json.load(f) except Exception as e: print(f"Error reading {json_path}: {e}") return None