#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Public API for HDF5 to Zarr migration.
Internal helpers in _h5_helpers.py, error compat in _compat.py.
"""
import os
import warnings
from pathlib import Path
from typing import Any, List, Optional, Tuple, Union
import h5py
import zarr
from tqdm import tqdm
from ._compat import (
FileFormatError,
PathNotFoundError,
SciTeXIOError,
check_file_exists,
check_path,
)
from ._h5_helpers import (
copy_h5_attributes,
get_zarr_compressor,
migrate_group,
validate_migration,
)
[docs]
def migrate_h5_to_zarr(
h5_path: Union[str, Path],
zarr_path: Optional[Union[str, Path]] = None,
compressor: Optional[Union[str, Any]] = "zstd",
chunks: Optional[Union[bool, Tuple[int, ...]]] = True,
overwrite: bool = False,
show_progress: bool = True,
validate: bool = True,
) -> str:
"""
Migrate HDF5 file to Zarr format.
Parameters
----------
h5_path : str or Path
Path to input HDF5 file
zarr_path : str or Path, optional
Path for output Zarr store. If None, uses h5_path with .zarr extension
compressor : str or compressor object, optional
Compression to use: 'zstd', 'lz4', 'gzip', 'blosc', or None
chunks : bool or tuple, optional
Chunking strategy. True for auto, False for no chunks, or specific shape
overwrite : bool, optional
Whether to overwrite existing Zarr store
show_progress : bool, optional
Whether to show migration progress
validate : bool, optional
Whether to validate the migration by comparing shapes
Returns
-------
str
Path to created Zarr store
"""
h5_path = Path(h5_path)
if not h5_path.is_absolute():
check_file_exists(str(h5_path))
else:
if not h5_path.exists():
raise PathNotFoundError(str(h5_path))
if zarr_path is None:
zarr_path = h5_path.with_suffix(".zarr")
else:
zarr_path = Path(zarr_path)
if not zarr_path.is_absolute():
check_path(str(zarr_path))
if zarr_path.exists() and not overwrite:
raise SciTeXIOError(
f"Zarr store already exists: {zarr_path}",
suggestion="Use overwrite=True to replace existing store",
)
compressor_obj = get_zarr_compressor(compressor)
if show_progress:
print(f"Migrating HDF5 to Zarr:")
print(f" Source: {h5_path}")
print(f" Target: {zarr_path}")
print(f" Compressor: {compressor}")
try:
with h5py.File(str(h5_path), "r") as h5_file:
if zarr_path.exists() and overwrite:
import shutil
shutil.rmtree(zarr_path)
zarr_store = zarr.open(str(zarr_path), mode="w")
copy_h5_attributes(h5_file, zarr_store)
migrate_group(h5_file, zarr_store, compressor_obj, chunks, show_progress)
if show_progress:
print("Migration complete!")
if validate:
if show_progress:
print("Validating migration...")
validate_migration(h5_file, zarr_store, show_progress)
except OSError as e:
if "Unable to open file" in str(e) or "bad symbol table" in str(e):
warnings.warn(f"HDF5 file appears to be corrupted: {h5_path}")
raise FileFormatError(
str(h5_path),
expected_format="HDF5",
actual_format="corrupted HDF5",
)
else:
raise SciTeXIOError(
f"Failed to open HDF5 file: {h5_path}",
context={"error": str(e)},
)
except Exception as e:
raise SciTeXIOError(
f"Migration failed: {str(e)}",
context={"h5_path": str(h5_path), "zarr_path": str(zarr_path)},
suggestion="Check file permissions and disk space",
)
return str(zarr_path)
[docs]
def migrate_h5_to_zarr_batch(
h5_paths: List[Union[str, Path]],
output_dir: Optional[Union[str, Path]] = None,
compressor: Optional[Union[str, Any]] = "zstd",
chunks: Optional[Union[bool, Tuple[int, ...]]] = True,
overwrite: bool = False,
parallel: bool = False,
n_workers: Optional[int] = None,
) -> List[str]:
"""
Migrate multiple HDF5 files to Zarr format.
Parameters
----------
h5_paths : list of str or Path
List of HDF5 files to migrate
output_dir : str or Path, optional
Directory for output Zarr stores
compressor : str or compressor object, optional
Compression to use
chunks : bool or tuple, optional
Chunking strategy
overwrite : bool, optional
Whether to overwrite existing Zarr stores
parallel : bool, optional
Whether to process files in parallel
n_workers : int, optional
Number of parallel workers
Returns
-------
list of str
Paths to created Zarr stores
"""
h5_paths = [Path(p) for p in h5_paths]
zarr_paths = []
for h5_path in h5_paths:
if output_dir is None:
zarr_path = h5_path.with_suffix(".zarr")
else:
output_dir_path = Path(output_dir)
output_dir_path.mkdir(parents=True, exist_ok=True)
zarr_path = output_dir_path / h5_path.with_suffix(".zarr").name
zarr_paths.append(zarr_path)
print(f"Migrating {len(h5_paths)} HDF5 files to Zarr format...")
if parallel and len(h5_paths) > 1:
migrated_paths = _migrate_parallel(
h5_paths, zarr_paths, compressor, chunks, overwrite, n_workers
)
else:
migrated_paths = _migrate_sequential(
h5_paths, zarr_paths, compressor, chunks, overwrite
)
print(f"\nSuccessfully migrated {len(migrated_paths)}/{len(h5_paths)} files")
return migrated_paths
def _migrate_parallel(h5_paths, zarr_paths, compressor, chunks, overwrite, n_workers):
"""Run migrations in parallel."""
import functools
from concurrent.futures import ProcessPoolExecutor, as_completed
if n_workers is None:
n_workers = min(os.cpu_count() or 4, len(h5_paths))
print(f"Using {n_workers} parallel workers...")
migrate_func = functools.partial(
migrate_h5_to_zarr,
compressor=compressor,
chunks=chunks,
overwrite=overwrite,
show_progress=False,
validate=True,
)
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = {
executor.submit(migrate_func, h5_path, zarr_path): i
for i, (h5_path, zarr_path) in enumerate(zip(h5_paths, zarr_paths))
}
results = []
with tqdm(total=len(h5_paths), desc="Migrating") as pbar:
for future in as_completed(futures):
idx = futures[future]
try:
result = future.result()
results.append((idx, result))
except Exception as e:
print(f"\nError migrating {h5_paths[idx]}: {e}")
results.append((idx, None))
pbar.update(1)
results.sort(key=lambda x: x[0])
return [r[1] for r in results if r[1] is not None]
def _migrate_sequential(h5_paths, zarr_paths, compressor, chunks, overwrite):
"""Run migrations sequentially."""
migrated_paths = []
for h5_path, zarr_path in tqdm(
zip(h5_paths, zarr_paths), total=len(h5_paths), desc="Migrating"
):
try:
result = migrate_h5_to_zarr(
h5_path,
zarr_path,
compressor=compressor,
chunks=chunks,
overwrite=overwrite,
show_progress=False,
validate=True,
)
migrated_paths.append(result)
except Exception as e:
print(f"\nError migrating {h5_path}: {e}")
return migrated_paths