Source code for scitex_io._load_modules._H5Explorer

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Timestamp: "2025-07-05 12:46:09 (ywatanabe)"
# File: /ssh:sp:/home/ywatanabe/proj/scitex_repo/src/scitex/io/_H5Explorer.py
# ----------------------------------------
import os
__FILE__ = __file__
__DIR__ = os.path.dirname(__FILE__)
# ----------------------------------------

import random
import time
import warnings

# Time-stamp: "2025-06-13 21:00:00 (ywatanabe)"

"""HDF5 file explorer for interactive data inspection."""

from typing import Any, Dict, List, Optional

import h5py
import numpy as np


[docs] class H5Explorer: """Interactive HDF5 file explorer. This class provides convenient methods to explore HDF5 files, inspect their structure, and load data. Example: >>> explorer = H5Explorer('data.h5') >>> explorer.explore() # Display file structure >>> data = explorer.load('group1/dataset1') # Load specific dataset >>> explorer.close() """
[docs] def __init__(self, filepath: str, mode: str = "r"): """Initialize H5Explorer. Args: filepath: Path to HDF5 file mode: File opening mode ('r' for read, 'r+' for read/write) """ self.filepath = filepath self.mode = mode self.file = h5py.File(filepath, mode)
[docs] def __enter__(self): """Context manager entry.""" return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close()
[docs] def close(self): """Close the HDF5 file.""" if hasattr(self, "file") and self.file: self.file.close()
[docs] def explore( self, path: str = "/", max_depth: Optional[int] = None ) -> None: """Explore HDF5 file structure interactively.""" self.show(path, max_depth)
[docs] def show( self, path: str = "/", max_depth: Optional[int] = None, indent: str = "", _current_depth: int = 0, ) -> None: """Display HDF5 file structure. Args: path: Starting path in HDF5 file max_depth: Maximum depth to explore (None for unlimited) indent: Indentation string (used internally) _current_depth: Current depth (used internally) """ if max_depth is not None and _current_depth > max_depth: return item = self.file[path] if path != "/" else self.file if isinstance(item, h5py.Group): if path != "/": print(f"{indent}[{path.split('/')[-1]}]") for key in sorted(item.keys()): subpath = f"{path}/{key}".replace("//", "/") self.show( subpath, max_depth, indent + " ", _current_depth + 1 ) elif isinstance(item, h5py.Dataset): name = path.split("/")[-1] shape = item.shape dtype = item.dtype size = item.size print(f"{indent}{name}: shape={shape}, dtype={dtype}, size={size}")
[docs] def keys(self, path: str = "/") -> List[str]: """Get keys at specified path. Args: path: Path in HDF5 file Returns: List of keys at the specified path """ item = self.file[path] if path != "/" else self.file if isinstance(item, h5py.Group): return list(item.keys()) return []
[docs] def load(self, path: str) -> Any: """Load data from specified path. Args: path: Path to dataset or group in HDF5 file Returns: Data from the specified path """ item = self.file[path] if isinstance(item, h5py.Dataset): data = item[()] # Decode bytes to string if needed if isinstance(data, bytes): return data.decode("utf-8") # Handle pickled objects (stored as np.void) elif isinstance(data, np.void): import pickle return pickle.loads(data.tobytes()) return data elif isinstance(item, h5py.Group): # Load group as dictionary result = {} for key in item.keys(): result[key] = self.load(f"{path}/{key}".replace("//", "/")) # Also load attributes for key in item.attrs.keys(): result[f"_attr_{key}"] = item.attrs[key] return result else: return item
[docs] def get(self, path: str) -> Any: """Alias for load() method for compatibility. Args: path: Path to dataset or group in HDF5 file Returns: Data from the specified path """ return self.load(path)
[docs] def get_info(self, path: str = "/") -> Dict[str, Any]: """Get information about an item. Args: path: Path to item in HDF5 file Returns: Dictionary with item information """ item = self.file[path] if path != "/" else self.file info = { "path": path, "type": type(item).__name__, } if isinstance(item, h5py.Dataset): info.update( { "shape": item.shape, "dtype": str(item.dtype), "size": item.size, "compression": item.compression, "chunks": item.chunks, } ) elif isinstance(item, h5py.Group): info["n_items"] = len(item.keys()) info["keys"] = list(item.keys()) # Add attributes if hasattr(item, "attrs") and len(item.attrs) > 0: info["attributes"] = dict(item.attrs) return info
[docs] def find(self, pattern: str, path: str = "/") -> List[str]: """Find items matching pattern. Args: pattern: Pattern to search for in item names path: Starting path for search Returns: List of paths matching the pattern """ matches = [] def _search(current_path): item = ( self.file[current_path] if current_path != "/" else self.file ) if isinstance(item, h5py.Group): for key in item.keys(): subpath = f"{current_path}/{key}".replace("//", "/") if pattern.lower() in key.lower(): matches.append(subpath) _search(subpath) elif pattern.lower() in current_path.split("/")[-1].lower(): matches.append(current_path) _search(path) return matches
[docs] def get_shape(self, path: str) -> Optional[tuple]: """Get shape of a dataset. Args: path: Path to dataset Returns: Shape tuple or None if not a dataset """ item = self.file[path] if isinstance(item, h5py.Dataset): return item.shape return None
[docs] def get_dtype(self, path: str) -> Optional[np.dtype]: """Get dtype of a dataset. Args: path: Path to dataset Returns: Numpy dtype or None if not a dataset """ item = self.file[path] if isinstance(item, h5py.Dataset): return item.dtype return None
# Convenience function
[docs] def explore_h5(filepath: str) -> None: """Explore HDF5 file structure. Args: filepath: Path to HDF5 file """ if os.path.exists(filepath): explorer = H5Explorer(filepath) explorer.explore() explorer.close() else: warnings.warn(f"Warning: File does not exist: {filepath}")
[docs] def has_h5_key(h5_path, key, max_retries=3, action_on_corrupted="delete"): """ Robust version of has_h5_key that handles corrupted files and lock conflicts. """ h5_path = os.path.realpath(h5_path) if not os.path.exists(h5_path): return False for attempt in range(max_retries): try: with h5py.File(h5_path, "r") as h5_file: parts = [p for p in key.split("/") if p] current = h5_file for part in parts: if part in current: current = current[part] else: return False return True except (KeyError, FileNotFoundError): return False except (OSError, RuntimeError, ValueError) as e: error_msg = str(e).lower() lock_indicators = [ "resource temporarily unavailable", "file is already open", "unable to lock file", "file locking failed", ] corruption_indicators = [ "unable to synchronously", "bad symbol table", "free block size is zero", "truncated file", "unable to read signature", "corrupted", "invalid file signature", "unable to check link existence", "bad heap free list", ] if any(indicator in error_msg for indicator in lock_indicators): if attempt < max_retries - 1: base_wait = 0.1 * (2**attempt) jitter = random.uniform(0, base_wait * 0.5) wait_time = base_wait + jitter time.sleep(wait_time) continue else: return False elif any( indicator in error_msg for indicator in corruption_indicators ): if action_on_corrupted == "delete": if _delete_corrupted_entry(h5_path, key): return False return False else: raise e return False
def _delete_corrupted_entry(h5_path, key): """Delete corrupted entry from HDF5 file.""" try: with h5py.File(h5_path, "r+") as h5_file: if key in h5_file: del h5_file[key] print(f"Deleted corrupted entry: {key}") return True except: pass return False # EOF