Coverage for src / crump / cdf_extractor.py: 87%
260 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
1"""CDF to CSV/Parquet extraction functionality."""
3from __future__ import annotations
5import csv
6import tempfile
7from dataclasses import dataclass
8from pathlib import Path
9from typing import Any
11import numpy as np
13from crump.cdf_reader import CDFVariable, get_column_names_for_variable, read_cdf_variables
14from crump.config import CrumpJob, apply_row_transformations
15from crump.tabular_file import create_writer
18@dataclass
19class ExtractionResult:
20 """Result of extracting data from a CDF file."""
22 output_file: Path
23 num_rows: int
24 num_columns: int
25 column_names: list[str]
26 file_size: int
27 variable_names: list[str]
30def _make_unique_column_names(column_names: list[str]) -> list[str]:
31 """Ensure all column names are unique by adding suffixes if needed.
33 Args:
34 column_names: List of column names that may contain duplicates
36 Returns:
37 List of unique column names
38 """
39 seen: dict[str, int] = {}
40 unique_names = []
42 for name in column_names:
43 if name not in seen:
44 seen[name] = 0
45 unique_names.append(name)
46 else:
47 seen[name] += 1
48 unique_names.append(f"{name}_{seen[name]}")
50 return unique_names
53def _group_variables_by_record_count(
54 variables: list[CDFVariable],
55) -> dict[int, list[CDFVariable]]:
56 """Group variables by their record count.
58 Args:
59 variables: List of CDFVariable objects
61 Returns:
62 Dictionary mapping record count to list of variables
63 """
64 groups: dict[int, list[CDFVariable]] = {}
66 for var in variables:
67 if var.num_records not in groups:
68 groups[var.num_records] = []
69 groups[var.num_records].append(var)
71 return groups
74def _expand_variable_to_columns(
75 variable: CDFVariable, cdf_file_path: Path, max_records: int | None = None
76) -> tuple[list[str], list[list[Any]], int]:
77 """Expand a CDF variable into column names and data columns.
79 Args:
80 variable: The variable to expand
81 cdf_file_path: Path to the CDF file
82 max_records: Maximum number of records to extract (None = all)
84 Returns:
85 Tuple of (column_names, data_columns, actual_records) where data_columns is a list of columns
86 and actual_records is the number of records actually extracted
87 """
88 column_names = get_column_names_for_variable(variable, cdf_file_path)
90 # Determine how many records to extract
91 actual_records = variable.num_records
92 if max_records is not None:
93 actual_records = min(actual_records, max_records)
95 if not variable.is_array:
96 # 1D variable - single column
97 if isinstance(variable.data, np.ndarray):
98 data_array = variable.data[:actual_records]
99 # Convert datetime64 to ISO format strings
100 if np.issubdtype(data_array.dtype, np.datetime64):
101 data: list[Any] = [str(dt) for dt in data_array]
102 else:
103 data = data_array.tolist()
104 else:
105 data = [variable.data]
106 data_columns = [data]
107 else:
108 # 2D variable - multiple columns
109 data_columns = []
110 for i in range(variable.array_size):
111 col_array = variable.data[:actual_records, i]
112 # Convert datetime64 to ISO format strings
113 if np.issubdtype(col_array.dtype, np.datetime64):
114 column_data: list[Any] = [str(dt) for dt in col_array]
115 else:
116 column_data = col_array.tolist()
117 data_columns.append(column_data)
119 return column_names, data_columns, actual_records
122def _generate_output_filename(
123 template: str, source_file: Path, variable_name: str | None = None
124) -> str:
125 """Generate output filename from template.
127 Args:
128 template: Filename template with [SOURCE_FILE] and [VARIABLE_NAME] placeholders
129 source_file: Source CDF file path
130 variable_name: Variable name (optional)
132 Returns:
133 Generated filename
134 """
135 filename = template
136 filename = filename.replace("[SOURCE_FILE]", source_file.stem)
138 if variable_name:
139 filename = filename.replace("[VARIABLE_NAME]", variable_name)
140 else:
141 # If no variable name, remove the placeholder
142 filename = filename.replace("-[VARIABLE_NAME]", "").replace("_[VARIABLE_NAME]", "")
143 filename = filename.replace("[VARIABLE_NAME]-", "").replace("[VARIABLE_NAME]_", "")
144 filename = filename.replace("[VARIABLE_NAME]", "")
146 return filename
149def _get_unique_filename(base_filename: str, used_filenames: set[str]) -> str:
150 """Get a unique filename by adding a numerical suffix if needed.
152 Args:
153 base_filename: The base filename to use
154 used_filenames: Set of filenames already used in this extraction
156 Returns:
157 A unique filename
158 """
159 # If filename hasn't been used yet in this extraction, use it as-is
160 if base_filename not in used_filenames:
161 return base_filename
163 # Add numerical suffix to make it unique
164 base_name = Path(base_filename).stem
165 extension = Path(base_filename).suffix
166 counter = 1
168 while True:
169 new_filename = f"{base_name}_{counter}{extension}"
170 if new_filename not in used_filenames:
171 return new_filename
172 counter += 1
175def _validate_existing_file_header(file_path: Path, expected_columns: list[str]) -> bool:
176 """Validate that an existing CSV or Parquet file has the expected header.
178 File format is auto-detected from the file extension.
180 Args:
181 file_path: Path to the file
182 expected_columns: Expected column names
184 Returns:
185 True if headers match, False otherwise
186 """
187 try:
188 from crump.tabular_file import create_reader
190 with create_reader(file_path) as reader:
191 existing_header = reader.fieldnames
192 return existing_header == expected_columns
193 except Exception:
194 return False
197def extract_cdf_to_tabular_file(
198 cdf_file_path: Path,
199 output_dir: Path,
200 filename_template: str = "[SOURCE_FILE]-[VARIABLE_NAME].csv",
201 automerge: bool = True,
202 append: bool = False,
203 variable_names: list[str] | None = None,
204 max_records: int | None = None,
205 use_parquet: bool = False, # noqa: ARG001 # Deprecated parameter kept for compatibility
206) -> list[ExtractionResult]:
207 """Extract data from a CDF file to CSV or Parquet files.
209 The output format is determined automatically from the filename extension
210 in the filename_template (.csv for CSV, .parquet or .pq for Parquet).
212 Args:
213 cdf_file_path: Path to the CDF file
214 output_dir: Directory to save output files
215 filename_template: Template for output filenames (extension determines format)
216 automerge: Whether to merge variables with same record count
217 append: Whether to append to existing files
218 variable_names: List of specific variables to extract (None = all)
219 max_records: Maximum number of records to extract per variable (None = all)
220 use_parquet: Deprecated. Use filename extension instead.
222 Returns:
223 List of ExtractionResult objects
225 Raises:
226 ValueError: If specified variables are not found
227 FileExistsError: If output file exists and append is False
228 ValueError: If appending but headers don't match
229 """
230 # Read all variables
231 all_variables = read_cdf_variables(cdf_file_path)
233 # Filter variables if specific ones are requested
234 if variable_names:
235 filtered_vars = []
236 requested_set = set(variable_names)
237 found_set = set()
239 for var in all_variables:
240 if var.name in requested_set:
241 filtered_vars.append(var)
242 found_set.add(var.name)
244 # Check for missing variables
245 missing = requested_set - found_set
246 if missing:
247 raise ValueError(f"Variables not found in CDF file: {', '.join(sorted(missing))}")
249 variables = filtered_vars
250 else:
251 variables = all_variables
253 if not variables:
254 return []
256 # Ensure output directory exists
257 output_dir.mkdir(parents=True, exist_ok=True)
259 results = []
260 used_filenames: set[str] = set() # Track filenames used in this extraction
262 if automerge:
263 # Group variables by record count and create merged CSV files
264 groups = _group_variables_by_record_count(variables)
266 for record_count, group_vars in sorted(groups.items(), key=lambda x: -x[0]):
267 # Skip variables with very few records (likely metadata)
268 if record_count < 2:
269 continue
271 # Collect all columns from this group
272 all_column_names = []
273 all_data_columns = []
274 var_names_in_group = []
275 actual_records_list = []
277 for var in group_vars:
278 col_names, data_cols, actual_records = _expand_variable_to_columns(
279 var, cdf_file_path, max_records
280 )
281 all_column_names.extend(col_names)
282 all_data_columns.extend(data_cols)
283 var_names_in_group.append(var.name)
284 actual_records_list.append(actual_records)
286 # Use the minimum actual records across all variables in the group
287 actual_record_count = min(actual_records_list) if actual_records_list else 0
289 # Make column names unique
290 all_column_names = _make_unique_column_names(all_column_names)
292 # Generate filename using first variable name
293 primary_var = var_names_in_group[0]
294 base_filename = _generate_output_filename(filename_template, cdf_file_path, primary_var)
296 # Get unique filename (add numerical suffix if needed)
297 output_filename = _get_unique_filename(base_filename, used_filenames)
298 used_filenames.add(output_filename)
299 output_path = output_dir / output_filename
301 # Check for existing file
302 if output_path.exists() and not append:
303 raise FileExistsError(
304 f"Output file already exists: {output_path}. "
305 "Use --append to add data to existing file."
306 )
308 # Validate header if appending
309 if (
310 append
311 and output_path.exists()
312 and not _validate_existing_file_header(output_path, all_column_names)
313 ):
314 raise ValueError(
315 f"Cannot append to {output_path}: "
316 f"existing file has different columns. "
317 f"Expected columns: {', '.join(all_column_names)}"
318 )
320 # Write to file using tabular writer (format auto-detected from extension)
321 write_append = append and output_path.exists()
323 with create_writer(output_path, append=write_append) as writer:
324 # Write header only if not appending
325 if not write_append:
326 writer.writerow(all_column_names)
328 # Transpose data to write rows
329 for row_idx in range(actual_record_count):
330 row = [col[row_idx] for col in all_data_columns]
331 writer.writerow(row)
333 file_size = output_path.stat().st_size
334 results.append(
335 ExtractionResult(
336 output_file=output_path,
337 num_rows=actual_record_count,
338 num_columns=len(all_column_names),
339 column_names=all_column_names,
340 file_size=file_size,
341 variable_names=var_names_in_group,
342 )
343 )
345 else:
346 # Create separate CSV for each variable
347 for var in variables:
348 # Skip variables with very few records
349 if var.num_records < 2:
350 continue
352 col_names, data_cols, actual_records = _expand_variable_to_columns(
353 var, cdf_file_path, max_records
354 )
355 col_names = _make_unique_column_names(col_names)
357 base_filename = _generate_output_filename(filename_template, cdf_file_path, var.name)
359 # Get unique filename (add numerical suffix if needed)
360 output_filename = _get_unique_filename(base_filename, used_filenames)
361 used_filenames.add(output_filename)
362 output_path = output_dir / output_filename
364 # Check for existing file
365 if output_path.exists() and not append:
366 raise FileExistsError(
367 f"Output file already exists: {output_path}. "
368 "Use --append to add data to existing file."
369 )
371 # Validate header if appending
372 if (
373 append
374 and output_path.exists()
375 and not _validate_existing_file_header(output_path, col_names)
376 ):
377 raise ValueError(
378 f"Cannot append to {output_path}: "
379 f"existing file has different columns. "
380 f"Expected columns: {', '.join(col_names)}"
381 )
383 # Write to file using tabular writer (format auto-detected from extension)
384 write_append = append and output_path.exists()
386 with create_writer(output_path, append=write_append) as writer:
387 # Write header only if not appending
388 if not write_append:
389 writer.writerow(col_names)
391 # Transpose data to write rows
392 for row_idx in range(actual_records):
393 row = [col[row_idx] for col in data_cols]
394 writer.writerow(row)
396 file_size = output_path.stat().st_size
397 results.append(
398 ExtractionResult(
399 output_file=output_path,
400 num_rows=actual_records,
401 num_columns=len(col_names),
402 column_names=col_names,
403 file_size=file_size,
404 variable_names=[var.name],
405 )
406 )
408 return results
411def extract_cdf_with_config(
412 cdf_file_path: Path,
413 output_dir: Path,
414 job: CrumpJob,
415 max_records: int | None = None,
416 automerge: bool = True,
417 variable_names: list[str] | None = None,
418 append: bool = False,
419 filename_template: str = "[SOURCE_FILE]_[VARIABLE_NAME].csv",
420 use_parquet: bool = False,
421) -> list[ExtractionResult]:
422 """Extract data from a CDF file to CSV/Parquet using job configuration for column selection and mapping.
424 This function extracts CDF data and applies the same column mappings and transformations
425 that would be used when syncing to a database, but outputs to CSV/Parquet instead.
427 A CDF file may contain multiple groups of variables with different record counts, resulting
428 in multiple output files. This function attempts to transform each one and returns results
429 for those that successfully match the column mappings.
431 Args:
432 cdf_file_path: Path to the CDF file
433 output_dir: Directory to write output CSV file(s)
434 job: CrumpJob configuration with column mappings and transformations
435 max_records: Maximum number of records to extract (None = all)
436 automerge: Whether to merge variables with same record count during raw extraction
437 variable_names: Specific variable names to extract (None = all)
438 append: Whether to append to existing CSV files instead of overwriting
439 filename_template: Template for output filenames (use [SOURCE_FILE] and [VARIABLE_NAME])
441 Returns:
442 List of ExtractionResult objects for successfully transformed CSVs (may be empty)
444 Raises:
445 ValueError: If CDF extraction fails completely or append header mismatch
446 FileNotFoundError: If CDF file doesn't exist
447 FileExistsError: If output file exists and append is False
448 """
449 # Step 1: Extract CDF to temporary CSV files (raw dump)
450 temp_dir = Path(tempfile.mkdtemp(prefix="data_sync_extract_"))
451 results = []
453 try:
454 raw_results = extract_cdf_to_tabular_file(
455 cdf_file_path=cdf_file_path,
456 output_dir=temp_dir,
457 filename_template=f"{cdf_file_path.stem}_[VARIABLE_NAME].csv",
458 automerge=automerge,
459 append=False,
460 variable_names=variable_names,
461 max_records=max_records,
462 )
464 if not raw_results:
465 raise ValueError("No data could be extracted from CDF file")
467 # Step 2: Extract values from filename if configured
468 filename_values = None
469 if job.filename_to_column:
470 filename_values = job.filename_to_column.extract_values_from_filename(cdf_file_path)
472 # Step 3: Try to transform each extracted CSV
473 for raw_result in raw_results:
474 raw_csv_path = raw_result.output_file
476 try:
477 # Attempt to process this file with the job configuration
478 result = _transform_tabular_file_with_config(
479 raw_file_path=raw_csv_path,
480 output_dir=output_dir,
481 cdf_file_path=cdf_file_path,
482 job=job,
483 filename_values=filename_values,
484 raw_result=raw_result,
485 append=append,
486 filename_template=filename_template,
487 use_parquet=use_parquet,
488 )
490 if result:
491 results.append(result)
493 except ValueError:
494 # This file doesn't match the column mappings - skip it silently
495 # This is expected when a CDF has multiple variable groups
496 pass
498 return results
500 finally:
501 # Clean up temporary files
502 try:
503 for temp_file in temp_dir.glob("*.csv"):
504 temp_file.unlink()
505 if temp_dir.exists():
506 temp_dir.rmdir()
507 except Exception:
508 pass # Best effort cleanup
511def _transform_tabular_file_with_config(
512 raw_file_path: Path,
513 output_dir: Path,
514 cdf_file_path: Path,
515 job: CrumpJob,
516 filename_values: dict[str, str] | None,
517 raw_result: ExtractionResult,
518 append: bool = False,
519 filename_template: str = "[SOURCE_FILE]_[VARIABLE_NAME].csv",
520 use_parquet: bool = False, # noqa: ARG001 # Deprecated parameter kept for compatibility
521) -> ExtractionResult | None:
522 """Transform a raw tabular file using job configuration.
524 Output format is auto-detected from the filename extension.
526 Args:
527 raw_file_path: Path to raw tabular file (CSV or Parquet)
528 output_dir: Output directory for transformed output file
529 cdf_file_path: Original CDF file path
530 job: Job configuration
531 filename_values: Extracted filename values
532 raw_result: Result from raw extraction
533 append: Whether to append to existing file
534 filename_template: Template for output filename (extension determines format)
535 use_parquet: Deprecated. Use filename extension instead.
537 Returns:
538 ExtractionResult if transformation succeeds, None if file doesn't match mappings
540 Raises:
541 ValueError: If required columns are missing from file or append header mismatch
542 FileExistsError: If output file exists and append is False
543 """
544 with open(raw_file_path, encoding="utf-8") as f:
545 reader = csv.DictReader(f)
546 if not reader.fieldnames:
547 return None
549 csv_columns = set(reader.fieldnames)
551 # Determine which columns to include in output
552 output_columns = []
553 sync_columns = []
555 # Add ID columns
556 for id_col in job.id_mapping:
557 if id_col.expression or id_col.function:
558 # Custom function for ID
559 sync_columns.append(id_col)
560 output_columns.append(id_col.db_column)
561 elif id_col.csv_column and id_col.csv_column in csv_columns:
562 sync_columns.append(id_col)
563 output_columns.append(id_col.db_column)
564 else:
565 raise ValueError(
566 f"ID column '{id_col.csv_column}' not found in CSV. "
567 f"Available columns: {', '.join(sorted(csv_columns))}"
568 )
570 # Add data columns
571 if job.columns:
572 for col in job.columns:
573 if col.expression or col.function:
574 # Custom function - always include
575 sync_columns.append(col)
576 output_columns.append(col.db_column)
577 elif col.csv_column and col.csv_column in csv_columns:
578 sync_columns.append(col)
579 output_columns.append(col.db_column)
580 elif col.csv_column:
581 # Column specified but not found - skip silently
582 pass
583 else:
584 # No columns specified - include all columns from CSV
585 for csv_col in sorted(csv_columns):
586 # Check if this column is already in id_mapping
587 if not any(
588 id_col.csv_column == csv_col for id_col in job.id_mapping if id_col.csv_column
589 ):
590 # Import ColumnMapping here to avoid circular import
591 from crump.config import ColumnMapping
593 col_mapping = ColumnMapping(csv_column=csv_col, db_column=csv_col)
594 sync_columns.append(col_mapping)
595 output_columns.append(csv_col)
597 # Add filename-extracted columns if configured
598 if filename_values and job.filename_to_column:
599 for col_name, filename_col_mapping in job.filename_to_column.columns.items():
600 if col_name in filename_values:
601 output_columns.append(filename_col_mapping.db_column)
603 # If no columns to output, skip this CSV
604 if not output_columns:
605 return None
607 # Generate output filename using template
608 # Replace [SOURCE_FILE] with CDF filename stem
609 # Replace [VARIABLE_NAME] with first variable name or joined names
610 output_filename = filename_template.replace("[SOURCE_FILE]", cdf_file_path.stem)
612 if "[VARIABLE_NAME]" in output_filename:
613 # Use variable names from raw result
614 if len(raw_result.variable_names) == 1:
615 var_name = raw_result.variable_names[0]
616 else:
617 # Multiple variables - use first 2 + count
618 var_name = "_".join(raw_result.variable_names[:2])
619 if len(raw_result.variable_names) > 2:
620 var_name += f"_plus{len(raw_result.variable_names) - 2}"
621 output_filename = output_filename.replace("[VARIABLE_NAME]", var_name)
623 output_path = output_dir / output_filename
625 # Process rows and write output file
626 output_dir.mkdir(parents=True, exist_ok=True)
628 # Check if we're appending and validate headers
629 if append and output_path.exists():
630 # Validate that existing file has same columns
631 if not _validate_existing_file_header(output_path, output_columns):
632 raise ValueError(
633 f"Cannot append to {output_path.name}: "
634 f"existing file has different columns. "
635 f"Expected: {output_columns}"
636 )
637 elif not append and output_path.exists():
638 # File exists and we're not appending - error
639 raise FileExistsError(
640 f"Output file already exists: {output_path}. "
641 f"Use --append to add to existing file or remove it first."
642 )
644 rows_written = 0
646 # Reset reader to beginning
647 f.seek(0)
648 reader = csv.DictReader(f)
650 # Write using tabular file writer (format auto-detected from extension)
651 write_append = append and output_path.exists()
652 with create_writer(output_path, append=write_append) as writer:
653 # Write header only if not appending
654 if not write_append:
655 writer.writerow(output_columns)
657 for row in reader:
658 # Apply column transformations
659 output_row = apply_row_transformations(
660 row, sync_columns, job.filename_to_column, filename_values
661 )
663 # Skip row if completely empty (all transformations produced None/empty)
664 if not any(output_row.values()):
665 continue
667 # Convert dict to list in column order
668 row_list = [output_row.get(col, "") for col in output_columns]
669 writer.writerow(row_list)
670 rows_written += 1
672 # Only return result if we wrote at least one row
673 if rows_written == 0 and not write_append:
674 output_path.unlink() # Clean up empty file (only if we created it)
675 return None
677 file_size = output_path.stat().st_size
679 return ExtractionResult(
680 output_file=output_path,
681 num_rows=rows_written,
682 num_columns=len(output_columns),
683 column_names=output_columns,
684 file_size=file_size,
685 variable_names=raw_result.variable_names,
686 )