Coverage for src / crump / cdf_extractor.py: 87%

260 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""CDF to CSV/Parquet extraction functionality.""" 

2 

3from __future__ import annotations 

4 

5import csv 

6import tempfile 

7from dataclasses import dataclass 

8from pathlib import Path 

9from typing import Any 

10 

11import numpy as np 

12 

13from crump.cdf_reader import CDFVariable, get_column_names_for_variable, read_cdf_variables 

14from crump.config import CrumpJob, apply_row_transformations 

15from crump.tabular_file import create_writer 

16 

17 

18@dataclass 

19class ExtractionResult: 

20 """Result of extracting data from a CDF file.""" 

21 

22 output_file: Path 

23 num_rows: int 

24 num_columns: int 

25 column_names: list[str] 

26 file_size: int 

27 variable_names: list[str] 

28 

29 

30def _make_unique_column_names(column_names: list[str]) -> list[str]: 

31 """Ensure all column names are unique by adding suffixes if needed. 

32 

33 Args: 

34 column_names: List of column names that may contain duplicates 

35 

36 Returns: 

37 List of unique column names 

38 """ 

39 seen: dict[str, int] = {} 

40 unique_names = [] 

41 

42 for name in column_names: 

43 if name not in seen: 

44 seen[name] = 0 

45 unique_names.append(name) 

46 else: 

47 seen[name] += 1 

48 unique_names.append(f"{name}_{seen[name]}") 

49 

50 return unique_names 

51 

52 

53def _group_variables_by_record_count( 

54 variables: list[CDFVariable], 

55) -> dict[int, list[CDFVariable]]: 

56 """Group variables by their record count. 

57 

58 Args: 

59 variables: List of CDFVariable objects 

60 

61 Returns: 

62 Dictionary mapping record count to list of variables 

63 """ 

64 groups: dict[int, list[CDFVariable]] = {} 

65 

66 for var in variables: 

67 if var.num_records not in groups: 

68 groups[var.num_records] = [] 

69 groups[var.num_records].append(var) 

70 

71 return groups 

72 

73 

74def _expand_variable_to_columns( 

75 variable: CDFVariable, cdf_file_path: Path, max_records: int | None = None 

76) -> tuple[list[str], list[list[Any]], int]: 

77 """Expand a CDF variable into column names and data columns. 

78 

79 Args: 

80 variable: The variable to expand 

81 cdf_file_path: Path to the CDF file 

82 max_records: Maximum number of records to extract (None = all) 

83 

84 Returns: 

85 Tuple of (column_names, data_columns, actual_records) where data_columns is a list of columns 

86 and actual_records is the number of records actually extracted 

87 """ 

88 column_names = get_column_names_for_variable(variable, cdf_file_path) 

89 

90 # Determine how many records to extract 

91 actual_records = variable.num_records 

92 if max_records is not None: 

93 actual_records = min(actual_records, max_records) 

94 

95 if not variable.is_array: 

96 # 1D variable - single column 

97 if isinstance(variable.data, np.ndarray): 

98 data_array = variable.data[:actual_records] 

99 # Convert datetime64 to ISO format strings 

100 if np.issubdtype(data_array.dtype, np.datetime64): 

101 data: list[Any] = [str(dt) for dt in data_array] 

102 else: 

103 data = data_array.tolist() 

104 else: 

105 data = [variable.data] 

106 data_columns = [data] 

107 else: 

108 # 2D variable - multiple columns 

109 data_columns = [] 

110 for i in range(variable.array_size): 

111 col_array = variable.data[:actual_records, i] 

112 # Convert datetime64 to ISO format strings 

113 if np.issubdtype(col_array.dtype, np.datetime64): 

114 column_data: list[Any] = [str(dt) for dt in col_array] 

115 else: 

116 column_data = col_array.tolist() 

117 data_columns.append(column_data) 

118 

119 return column_names, data_columns, actual_records 

120 

121 

122def _generate_output_filename( 

123 template: str, source_file: Path, variable_name: str | None = None 

124) -> str: 

125 """Generate output filename from template. 

126 

127 Args: 

128 template: Filename template with [SOURCE_FILE] and [VARIABLE_NAME] placeholders 

129 source_file: Source CDF file path 

130 variable_name: Variable name (optional) 

131 

132 Returns: 

133 Generated filename 

134 """ 

135 filename = template 

136 filename = filename.replace("[SOURCE_FILE]", source_file.stem) 

137 

138 if variable_name: 

139 filename = filename.replace("[VARIABLE_NAME]", variable_name) 

140 else: 

141 # If no variable name, remove the placeholder 

142 filename = filename.replace("-[VARIABLE_NAME]", "").replace("_[VARIABLE_NAME]", "") 

143 filename = filename.replace("[VARIABLE_NAME]-", "").replace("[VARIABLE_NAME]_", "") 

144 filename = filename.replace("[VARIABLE_NAME]", "") 

145 

146 return filename 

147 

148 

149def _get_unique_filename(base_filename: str, used_filenames: set[str]) -> str: 

150 """Get a unique filename by adding a numerical suffix if needed. 

151 

152 Args: 

153 base_filename: The base filename to use 

154 used_filenames: Set of filenames already used in this extraction 

155 

156 Returns: 

157 A unique filename 

158 """ 

159 # If filename hasn't been used yet in this extraction, use it as-is 

160 if base_filename not in used_filenames: 

161 return base_filename 

162 

163 # Add numerical suffix to make it unique 

164 base_name = Path(base_filename).stem 

165 extension = Path(base_filename).suffix 

166 counter = 1 

167 

168 while True: 

169 new_filename = f"{base_name}_{counter}{extension}" 

170 if new_filename not in used_filenames: 

171 return new_filename 

172 counter += 1 

173 

174 

175def _validate_existing_file_header(file_path: Path, expected_columns: list[str]) -> bool: 

176 """Validate that an existing CSV or Parquet file has the expected header. 

177 

178 File format is auto-detected from the file extension. 

179 

180 Args: 

181 file_path: Path to the file 

182 expected_columns: Expected column names 

183 

184 Returns: 

185 True if headers match, False otherwise 

186 """ 

187 try: 

188 from crump.tabular_file import create_reader 

189 

190 with create_reader(file_path) as reader: 

191 existing_header = reader.fieldnames 

192 return existing_header == expected_columns 

193 except Exception: 

194 return False 

195 

196 

197def extract_cdf_to_tabular_file( 

198 cdf_file_path: Path, 

199 output_dir: Path, 

200 filename_template: str = "[SOURCE_FILE]-[VARIABLE_NAME].csv", 

201 automerge: bool = True, 

202 append: bool = False, 

203 variable_names: list[str] | None = None, 

204 max_records: int | None = None, 

205 use_parquet: bool = False, # noqa: ARG001 # Deprecated parameter kept for compatibility 

206) -> list[ExtractionResult]: 

207 """Extract data from a CDF file to CSV or Parquet files. 

208 

209 The output format is determined automatically from the filename extension 

210 in the filename_template (.csv for CSV, .parquet or .pq for Parquet). 

211 

212 Args: 

213 cdf_file_path: Path to the CDF file 

214 output_dir: Directory to save output files 

215 filename_template: Template for output filenames (extension determines format) 

216 automerge: Whether to merge variables with same record count 

217 append: Whether to append to existing files 

218 variable_names: List of specific variables to extract (None = all) 

219 max_records: Maximum number of records to extract per variable (None = all) 

220 use_parquet: Deprecated. Use filename extension instead. 

221 

222 Returns: 

223 List of ExtractionResult objects 

224 

225 Raises: 

226 ValueError: If specified variables are not found 

227 FileExistsError: If output file exists and append is False 

228 ValueError: If appending but headers don't match 

229 """ 

230 # Read all variables 

231 all_variables = read_cdf_variables(cdf_file_path) 

232 

233 # Filter variables if specific ones are requested 

234 if variable_names: 

235 filtered_vars = [] 

236 requested_set = set(variable_names) 

237 found_set = set() 

238 

239 for var in all_variables: 

240 if var.name in requested_set: 

241 filtered_vars.append(var) 

242 found_set.add(var.name) 

243 

244 # Check for missing variables 

245 missing = requested_set - found_set 

246 if missing: 

247 raise ValueError(f"Variables not found in CDF file: {', '.join(sorted(missing))}") 

248 

249 variables = filtered_vars 

250 else: 

251 variables = all_variables 

252 

253 if not variables: 

254 return [] 

255 

256 # Ensure output directory exists 

257 output_dir.mkdir(parents=True, exist_ok=True) 

258 

259 results = [] 

260 used_filenames: set[str] = set() # Track filenames used in this extraction 

261 

262 if automerge: 

263 # Group variables by record count and create merged CSV files 

264 groups = _group_variables_by_record_count(variables) 

265 

266 for record_count, group_vars in sorted(groups.items(), key=lambda x: -x[0]): 

267 # Skip variables with very few records (likely metadata) 

268 if record_count < 2: 

269 continue 

270 

271 # Collect all columns from this group 

272 all_column_names = [] 

273 all_data_columns = [] 

274 var_names_in_group = [] 

275 actual_records_list = [] 

276 

277 for var in group_vars: 

278 col_names, data_cols, actual_records = _expand_variable_to_columns( 

279 var, cdf_file_path, max_records 

280 ) 

281 all_column_names.extend(col_names) 

282 all_data_columns.extend(data_cols) 

283 var_names_in_group.append(var.name) 

284 actual_records_list.append(actual_records) 

285 

286 # Use the minimum actual records across all variables in the group 

287 actual_record_count = min(actual_records_list) if actual_records_list else 0 

288 

289 # Make column names unique 

290 all_column_names = _make_unique_column_names(all_column_names) 

291 

292 # Generate filename using first variable name 

293 primary_var = var_names_in_group[0] 

294 base_filename = _generate_output_filename(filename_template, cdf_file_path, primary_var) 

295 

296 # Get unique filename (add numerical suffix if needed) 

297 output_filename = _get_unique_filename(base_filename, used_filenames) 

298 used_filenames.add(output_filename) 

299 output_path = output_dir / output_filename 

300 

301 # Check for existing file 

302 if output_path.exists() and not append: 

303 raise FileExistsError( 

304 f"Output file already exists: {output_path}. " 

305 "Use --append to add data to existing file." 

306 ) 

307 

308 # Validate header if appending 

309 if ( 

310 append 

311 and output_path.exists() 

312 and not _validate_existing_file_header(output_path, all_column_names) 

313 ): 

314 raise ValueError( 

315 f"Cannot append to {output_path}: " 

316 f"existing file has different columns. " 

317 f"Expected columns: {', '.join(all_column_names)}" 

318 ) 

319 

320 # Write to file using tabular writer (format auto-detected from extension) 

321 write_append = append and output_path.exists() 

322 

323 with create_writer(output_path, append=write_append) as writer: 

324 # Write header only if not appending 

325 if not write_append: 

326 writer.writerow(all_column_names) 

327 

328 # Transpose data to write rows 

329 for row_idx in range(actual_record_count): 

330 row = [col[row_idx] for col in all_data_columns] 

331 writer.writerow(row) 

332 

333 file_size = output_path.stat().st_size 

334 results.append( 

335 ExtractionResult( 

336 output_file=output_path, 

337 num_rows=actual_record_count, 

338 num_columns=len(all_column_names), 

339 column_names=all_column_names, 

340 file_size=file_size, 

341 variable_names=var_names_in_group, 

342 ) 

343 ) 

344 

345 else: 

346 # Create separate CSV for each variable 

347 for var in variables: 

348 # Skip variables with very few records 

349 if var.num_records < 2: 

350 continue 

351 

352 col_names, data_cols, actual_records = _expand_variable_to_columns( 

353 var, cdf_file_path, max_records 

354 ) 

355 col_names = _make_unique_column_names(col_names) 

356 

357 base_filename = _generate_output_filename(filename_template, cdf_file_path, var.name) 

358 

359 # Get unique filename (add numerical suffix if needed) 

360 output_filename = _get_unique_filename(base_filename, used_filenames) 

361 used_filenames.add(output_filename) 

362 output_path = output_dir / output_filename 

363 

364 # Check for existing file 

365 if output_path.exists() and not append: 

366 raise FileExistsError( 

367 f"Output file already exists: {output_path}. " 

368 "Use --append to add data to existing file." 

369 ) 

370 

371 # Validate header if appending 

372 if ( 

373 append 

374 and output_path.exists() 

375 and not _validate_existing_file_header(output_path, col_names) 

376 ): 

377 raise ValueError( 

378 f"Cannot append to {output_path}: " 

379 f"existing file has different columns. " 

380 f"Expected columns: {', '.join(col_names)}" 

381 ) 

382 

383 # Write to file using tabular writer (format auto-detected from extension) 

384 write_append = append and output_path.exists() 

385 

386 with create_writer(output_path, append=write_append) as writer: 

387 # Write header only if not appending 

388 if not write_append: 

389 writer.writerow(col_names) 

390 

391 # Transpose data to write rows 

392 for row_idx in range(actual_records): 

393 row = [col[row_idx] for col in data_cols] 

394 writer.writerow(row) 

395 

396 file_size = output_path.stat().st_size 

397 results.append( 

398 ExtractionResult( 

399 output_file=output_path, 

400 num_rows=actual_records, 

401 num_columns=len(col_names), 

402 column_names=col_names, 

403 file_size=file_size, 

404 variable_names=[var.name], 

405 ) 

406 ) 

407 

408 return results 

409 

410 

411def extract_cdf_with_config( 

412 cdf_file_path: Path, 

413 output_dir: Path, 

414 job: CrumpJob, 

415 max_records: int | None = None, 

416 automerge: bool = True, 

417 variable_names: list[str] | None = None, 

418 append: bool = False, 

419 filename_template: str = "[SOURCE_FILE]_[VARIABLE_NAME].csv", 

420 use_parquet: bool = False, 

421) -> list[ExtractionResult]: 

422 """Extract data from a CDF file to CSV/Parquet using job configuration for column selection and mapping. 

423 

424 This function extracts CDF data and applies the same column mappings and transformations 

425 that would be used when syncing to a database, but outputs to CSV/Parquet instead. 

426 

427 A CDF file may contain multiple groups of variables with different record counts, resulting 

428 in multiple output files. This function attempts to transform each one and returns results 

429 for those that successfully match the column mappings. 

430 

431 Args: 

432 cdf_file_path: Path to the CDF file 

433 output_dir: Directory to write output CSV file(s) 

434 job: CrumpJob configuration with column mappings and transformations 

435 max_records: Maximum number of records to extract (None = all) 

436 automerge: Whether to merge variables with same record count during raw extraction 

437 variable_names: Specific variable names to extract (None = all) 

438 append: Whether to append to existing CSV files instead of overwriting 

439 filename_template: Template for output filenames (use [SOURCE_FILE] and [VARIABLE_NAME]) 

440 

441 Returns: 

442 List of ExtractionResult objects for successfully transformed CSVs (may be empty) 

443 

444 Raises: 

445 ValueError: If CDF extraction fails completely or append header mismatch 

446 FileNotFoundError: If CDF file doesn't exist 

447 FileExistsError: If output file exists and append is False 

448 """ 

449 # Step 1: Extract CDF to temporary CSV files (raw dump) 

450 temp_dir = Path(tempfile.mkdtemp(prefix="data_sync_extract_")) 

451 results = [] 

452 

453 try: 

454 raw_results = extract_cdf_to_tabular_file( 

455 cdf_file_path=cdf_file_path, 

456 output_dir=temp_dir, 

457 filename_template=f"{cdf_file_path.stem}_[VARIABLE_NAME].csv", 

458 automerge=automerge, 

459 append=False, 

460 variable_names=variable_names, 

461 max_records=max_records, 

462 ) 

463 

464 if not raw_results: 

465 raise ValueError("No data could be extracted from CDF file") 

466 

467 # Step 2: Extract values from filename if configured 

468 filename_values = None 

469 if job.filename_to_column: 

470 filename_values = job.filename_to_column.extract_values_from_filename(cdf_file_path) 

471 

472 # Step 3: Try to transform each extracted CSV 

473 for raw_result in raw_results: 

474 raw_csv_path = raw_result.output_file 

475 

476 try: 

477 # Attempt to process this file with the job configuration 

478 result = _transform_tabular_file_with_config( 

479 raw_file_path=raw_csv_path, 

480 output_dir=output_dir, 

481 cdf_file_path=cdf_file_path, 

482 job=job, 

483 filename_values=filename_values, 

484 raw_result=raw_result, 

485 append=append, 

486 filename_template=filename_template, 

487 use_parquet=use_parquet, 

488 ) 

489 

490 if result: 

491 results.append(result) 

492 

493 except ValueError: 

494 # This file doesn't match the column mappings - skip it silently 

495 # This is expected when a CDF has multiple variable groups 

496 pass 

497 

498 return results 

499 

500 finally: 

501 # Clean up temporary files 

502 try: 

503 for temp_file in temp_dir.glob("*.csv"): 

504 temp_file.unlink() 

505 if temp_dir.exists(): 

506 temp_dir.rmdir() 

507 except Exception: 

508 pass # Best effort cleanup 

509 

510 

511def _transform_tabular_file_with_config( 

512 raw_file_path: Path, 

513 output_dir: Path, 

514 cdf_file_path: Path, 

515 job: CrumpJob, 

516 filename_values: dict[str, str] | None, 

517 raw_result: ExtractionResult, 

518 append: bool = False, 

519 filename_template: str = "[SOURCE_FILE]_[VARIABLE_NAME].csv", 

520 use_parquet: bool = False, # noqa: ARG001 # Deprecated parameter kept for compatibility 

521) -> ExtractionResult | None: 

522 """Transform a raw tabular file using job configuration. 

523 

524 Output format is auto-detected from the filename extension. 

525 

526 Args: 

527 raw_file_path: Path to raw tabular file (CSV or Parquet) 

528 output_dir: Output directory for transformed output file 

529 cdf_file_path: Original CDF file path 

530 job: Job configuration 

531 filename_values: Extracted filename values 

532 raw_result: Result from raw extraction 

533 append: Whether to append to existing file 

534 filename_template: Template for output filename (extension determines format) 

535 use_parquet: Deprecated. Use filename extension instead. 

536 

537 Returns: 

538 ExtractionResult if transformation succeeds, None if file doesn't match mappings 

539 

540 Raises: 

541 ValueError: If required columns are missing from file or append header mismatch 

542 FileExistsError: If output file exists and append is False 

543 """ 

544 with open(raw_file_path, encoding="utf-8") as f: 

545 reader = csv.DictReader(f) 

546 if not reader.fieldnames: 

547 return None 

548 

549 csv_columns = set(reader.fieldnames) 

550 

551 # Determine which columns to include in output 

552 output_columns = [] 

553 sync_columns = [] 

554 

555 # Add ID columns 

556 for id_col in job.id_mapping: 

557 if id_col.expression or id_col.function: 

558 # Custom function for ID 

559 sync_columns.append(id_col) 

560 output_columns.append(id_col.db_column) 

561 elif id_col.csv_column and id_col.csv_column in csv_columns: 

562 sync_columns.append(id_col) 

563 output_columns.append(id_col.db_column) 

564 else: 

565 raise ValueError( 

566 f"ID column '{id_col.csv_column}' not found in CSV. " 

567 f"Available columns: {', '.join(sorted(csv_columns))}" 

568 ) 

569 

570 # Add data columns 

571 if job.columns: 

572 for col in job.columns: 

573 if col.expression or col.function: 

574 # Custom function - always include 

575 sync_columns.append(col) 

576 output_columns.append(col.db_column) 

577 elif col.csv_column and col.csv_column in csv_columns: 

578 sync_columns.append(col) 

579 output_columns.append(col.db_column) 

580 elif col.csv_column: 

581 # Column specified but not found - skip silently 

582 pass 

583 else: 

584 # No columns specified - include all columns from CSV 

585 for csv_col in sorted(csv_columns): 

586 # Check if this column is already in id_mapping 

587 if not any( 

588 id_col.csv_column == csv_col for id_col in job.id_mapping if id_col.csv_column 

589 ): 

590 # Import ColumnMapping here to avoid circular import 

591 from crump.config import ColumnMapping 

592 

593 col_mapping = ColumnMapping(csv_column=csv_col, db_column=csv_col) 

594 sync_columns.append(col_mapping) 

595 output_columns.append(csv_col) 

596 

597 # Add filename-extracted columns if configured 

598 if filename_values and job.filename_to_column: 

599 for col_name, filename_col_mapping in job.filename_to_column.columns.items(): 

600 if col_name in filename_values: 

601 output_columns.append(filename_col_mapping.db_column) 

602 

603 # If no columns to output, skip this CSV 

604 if not output_columns: 

605 return None 

606 

607 # Generate output filename using template 

608 # Replace [SOURCE_FILE] with CDF filename stem 

609 # Replace [VARIABLE_NAME] with first variable name or joined names 

610 output_filename = filename_template.replace("[SOURCE_FILE]", cdf_file_path.stem) 

611 

612 if "[VARIABLE_NAME]" in output_filename: 

613 # Use variable names from raw result 

614 if len(raw_result.variable_names) == 1: 

615 var_name = raw_result.variable_names[0] 

616 else: 

617 # Multiple variables - use first 2 + count 

618 var_name = "_".join(raw_result.variable_names[:2]) 

619 if len(raw_result.variable_names) > 2: 

620 var_name += f"_plus{len(raw_result.variable_names) - 2}" 

621 output_filename = output_filename.replace("[VARIABLE_NAME]", var_name) 

622 

623 output_path = output_dir / output_filename 

624 

625 # Process rows and write output file 

626 output_dir.mkdir(parents=True, exist_ok=True) 

627 

628 # Check if we're appending and validate headers 

629 if append and output_path.exists(): 

630 # Validate that existing file has same columns 

631 if not _validate_existing_file_header(output_path, output_columns): 

632 raise ValueError( 

633 f"Cannot append to {output_path.name}: " 

634 f"existing file has different columns. " 

635 f"Expected: {output_columns}" 

636 ) 

637 elif not append and output_path.exists(): 

638 # File exists and we're not appending - error 

639 raise FileExistsError( 

640 f"Output file already exists: {output_path}. " 

641 f"Use --append to add to existing file or remove it first." 

642 ) 

643 

644 rows_written = 0 

645 

646 # Reset reader to beginning 

647 f.seek(0) 

648 reader = csv.DictReader(f) 

649 

650 # Write using tabular file writer (format auto-detected from extension) 

651 write_append = append and output_path.exists() 

652 with create_writer(output_path, append=write_append) as writer: 

653 # Write header only if not appending 

654 if not write_append: 

655 writer.writerow(output_columns) 

656 

657 for row in reader: 

658 # Apply column transformations 

659 output_row = apply_row_transformations( 

660 row, sync_columns, job.filename_to_column, filename_values 

661 ) 

662 

663 # Skip row if completely empty (all transformations produced None/empty) 

664 if not any(output_row.values()): 

665 continue 

666 

667 # Convert dict to list in column order 

668 row_list = [output_row.get(col, "") for col in output_columns] 

669 writer.writerow(row_list) 

670 rows_written += 1 

671 

672 # Only return result if we wrote at least one row 

673 if rows_written == 0 and not write_append: 

674 output_path.unlink() # Clean up empty file (only if we created it) 

675 return None 

676 

677 file_size = output_path.stat().st_size 

678 

679 return ExtractionResult( 

680 output_file=output_path, 

681 num_rows=rows_written, 

682 num_columns=len(output_columns), 

683 column_names=output_columns, 

684 file_size=file_size, 

685 variable_names=raw_result.variable_names, 

686 )