Coverage for src / crump / cli_prepare.py: 92%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""Prepare command for analyzing CSV files and generating config.""" 

2 

3import re 

4import tempfile 

5from pathlib import Path 

6 

7import click 

8from rich.console import Console 

9from rich.table import Table 

10 

11from crump.cdf_extractor import extract_cdf_to_tabular_file 

12from crump.config import ( 

13 ColumnMapping, 

14 CrumpConfig, 

15 CrumpJob, 

16 FilenameColumnMapping, 

17 FilenameToColumn, 

18 Index, 

19 IndexColumn, 

20) 

21from crump.console_utils import CHECKMARK 

22from crump.type_detection import analyze_tabular_file_types_and_nullable, suggest_id_column 

23 

24console = Console() 

25 

26 

27def generate_job_name_from_filename(filename: str) -> str: 

28 """Generate a job name from a filename. 

29 

30 Args: 

31 filename: The filename to convert (with or without extension) 

32 

33 Returns: 

34 A cleaned job name 

35 

36 Rules: 

37 - Strip file extension 

38 - Remove all numbers 

39 - Convert multiple underscores to single underscore 

40 - Convert multiple hyphens to single hyphen 

41 - Strip leading/trailing underscores and hyphens 

42 - Convert to lowercase 

43 

44 Examples: 

45 >>> generate_job_name_from_filename("sales_data_2024.csv") 

46 'sales_data' 

47 >>> generate_job_name_from_filename("user__info__123.csv") 

48 'user_info' 

49 >>> generate_job_name_from_filename("test--file--456.csv") 

50 'test-file' 

51 """ 

52 # Strip extension 

53 name = Path(filename).stem 

54 

55 # Remove all numbers 

56 name = re.sub(r"\d+", "", name) 

57 

58 # Convert multiple underscores to single 

59 name = re.sub(r"_+", "_", name) 

60 

61 # Convert multiple hyphens to single 

62 name = re.sub(r"-+", "-", name) 

63 

64 # Strip leading/trailing underscores and hyphens 

65 name = name.strip("_-") 

66 

67 # Convert to lowercase 

68 name = name.lower() 

69 

70 # If empty after cleaning, use a default 

71 if not name: 

72 name = "job" 

73 

74 return name 

75 

76 

77def suggest_indexes(column_info: dict[str, tuple[str, bool]], id_column: str) -> list[Index]: 

78 """Suggest database indexes based on column types and names. 

79 

80 Args: 

81 column_info: Dictionary mapping column names to (data_type, nullable) tuples 

82 id_column: Name of the ID column (to exclude from indexing) 

83 

84 Returns: 

85 List of suggested Index objects 

86 

87 Rules: 

88 - Date/datetime columns get descending indexes 

89 - Columns ending in '_id' or '_key' get ascending indexes 

90 - ID column is excluded (already a primary key) 

91 """ 

92 indexes = [] 

93 

94 for col_name, (col_type, _nullable) in column_info.items(): 

95 # Skip the ID column (it's already a primary key) 

96 if col_name == id_column: 

97 continue 

98 

99 index_name = None 

100 order = None 

101 

102 # Date/datetime columns get descending indexes 

103 if col_type in ("date", "datetime"): 

104 index_name = f"idx_{col_name}" 

105 order = "DESC" 

106 

107 # Columns ending in _id or _key get ascending indexes 

108 elif col_name.lower().endswith("_id") or col_name.lower().endswith("_key"): 

109 index_name = f"idx_{col_name}" 

110 order = "ASC" 

111 

112 # Create the index if we determined it should have one 

113 if index_name and order: 

114 indexes.append( 

115 Index(name=index_name, columns=[IndexColumn(column=col_name, order=order)]) 

116 ) 

117 

118 return indexes 

119 

120 

121def detect_filename_patterns(filename: str) -> FilenameToColumn | None: 

122 """Detect common date patterns in filename and suggest filename_to_column mapping. 

123 

124 Args: 

125 filename: The filename to analyze (with or without extension) 

126 

127 Returns: 

128 FilenameToColumn object if patterns detected, None otherwise 

129 

130 Detects patterns like: 

131 - YYYYMMDD (20241231) 

132 - YYYY-MM-DD (2024-12-31) 

133 - YYYY_MM_DD (2024_12_31) 

134 

135 Examples: 

136 >>> detect_filename_patterns("data_20241231.csv") 

137 FilenameToColumn with date column 

138 >>> detect_filename_patterns("report_2024-12-31_v1.csv") 

139 FilenameToColumn with date and version columns 

140 """ 

141 # Strip extension 

142 name = Path(filename).stem 

143 

144 # Define date patterns to detect 

145 date_patterns = [ 

146 # YYYYMMDD pattern 

147 (r"(\d{8})", "date", "YYYYMMDD", "date"), 

148 # YYYY-MM-DD pattern 

149 (r"(\d{4}-\d{2}-\d{2})", "date", "YYYY-MM-DD", "date"), 

150 # YYYY_MM_DD pattern 

151 (r"(\d{4}_\d{2}_\d{2})", "date", "YYYY_MM_DD", "date"), 

152 ] 

153 

154 # Try to find a date pattern 

155 for pattern, _col_name, _pattern_desc, _col_type in date_patterns: 

156 match = re.search(pattern, name) 

157 if match: 

158 # Build a template from the filename 

159 # Replace the matched pattern with [date] placeholder 

160 template = name[: match.start()] + f"[{_col_name}]" + name[match.end() :] 

161 template += Path(filename).suffix # Add extension back 

162 

163 # Create the FilenameToColumn mapping 

164 columns = { 

165 _col_name: FilenameColumnMapping( 

166 name=_col_name, 

167 db_column="file_date", 

168 data_type=_col_type, 

169 use_to_delete_old_rows=True, 

170 ) 

171 } 

172 

173 return FilenameToColumn(columns=columns, template=template) 

174 

175 return None 

176 

177 

178def _create_column_mappings( 

179 columns: list[str], 

180 id_column: str, 

181 column_info: dict[str, tuple[str, bool]], 

182) -> list[ColumnMapping]: 

183 """Create column mappings for non-ID columns. 

184 

185 Args: 

186 columns: List of all column names 

187 id_column: Name of the ID column to exclude 

188 column_info: Dictionary mapping column names to (data_type, nullable) tuples 

189 

190 Returns: 

191 List of ColumnMapping objects for non-ID columns 

192 """ 

193 column_mappings = [] 

194 for col in columns: 

195 if col != id_column: 

196 col_type, nullable = column_info[col] 

197 column_mappings.append( 

198 ColumnMapping(csv_column=col, db_column=col, data_type=col_type, nullable=nullable) 

199 ) 

200 return column_mappings 

201 

202 

203def _extract_cdf_to_temp_tabular_files( 

204 cdf_file: Path, temp_dir: Path, max_records: int = 50 

205) -> list[Path]: 

206 """Extract CDF file to temporary tabular files (CSV). 

207 

208 Args: 

209 cdf_file: Path to the CDF file 

210 temp_dir: Temporary directory to store files 

211 max_records: Maximum number of records to extract (default: 50) 

212 

213 Returns: 

214 List of paths to temporary files created 

215 

216 Raises: 

217 ValueError: If CDF extraction fails 

218 """ 

219 console.print(f"[cyan]Extracting data from CDF file: {cdf_file.name}[/cyan]") 

220 console.print(f"[dim] Extracting first {max_records} records from each variable...[/dim]") 

221 

222 try: 

223 # Extract with automerge enabled to group variables by record count 

224 results = extract_cdf_to_tabular_file( 

225 cdf_file_path=cdf_file, 

226 output_dir=temp_dir, 

227 filename_template=f"{cdf_file.stem}_[VARIABLE_NAME].csv", 

228 automerge=True, 

229 append=False, 

230 variable_names=None, 

231 max_records=max_records, 

232 ) 

233 

234 if not results: 

235 console.print( 

236 "[yellow] No data extracted from CDF file (no suitable variables)[/yellow]" 

237 ) 

238 return [] 

239 

240 # Display extraction summary 

241 console.print(f"[green] Extracted {len(results)} CSV file(s) from CDF[/green]") 

242 for result in results: 

243 console.print( 

244 f"[dim] - {result.output_file.name}: " 

245 f"{result.num_rows} rows, {result.num_columns} columns[/dim]" 

246 ) 

247 

248 return [result.output_file for result in results] 

249 

250 except Exception as e: 

251 raise ValueError(f"Failed to extract CDF file: {e}") from e 

252 

253 

254def _display_prepare_results( 

255 job: CrumpJob, 

256 config: Path, 

257 id_column: str, 

258 column_info: dict[str, tuple[str, bool]], 

259 column_mappings: list[ColumnMapping], 

260 suggested_indexes: list[Index], 

261) -> None: 

262 """Display the results of the prepare command. 

263 

264 Args: 

265 job: The created CrumpJob 

266 config: Path to config file 

267 id_column: Name of the ID column 

268 column_info: Dictionary mapping column names to (data_type, nullable) tuples 

269 column_mappings: List of column mappings (excluding ID) 

270 suggested_indexes: List of suggested indexes 

271 """ 

272 console.print(f"[green]{CHECKMARK} Job configuration created successfully![/green]") 

273 console.print(f"[dim] Config file: {config}[/dim]") 

274 console.print(f"[dim] Job name: {job.name}[/dim]") 

275 console.print(f"[dim] Target table: {job.target_table}[/dim]") 

276 

277 # Display column mappings in a table 

278 table = Table(title="Column Mappings") 

279 table.add_column("CSV Column", style="cyan") 

280 table.add_column("DB Column", style="green") 

281 table.add_column("Type", style="yellow") 

282 table.add_column("Nullable", style="magenta") 

283 

284 # Add ID mapping 

285 id_type, id_nullable = column_info[id_column] 

286 nullable_str = "NULL" if id_nullable else "NOT NULL" 

287 table.add_row(id_column, "id", id_type + " (ID)", nullable_str) 

288 

289 # Add other columns 

290 for col_mapping in column_mappings: 

291 nullable_str = "NULL" if col_mapping.nullable else "NOT NULL" 

292 table.add_row( 

293 col_mapping.csv_column, 

294 col_mapping.db_column, 

295 col_mapping.data_type or "text", 

296 nullable_str, 

297 ) 

298 

299 console.print(table) 

300 

301 # Display suggested indexes if any 

302 if suggested_indexes: 

303 index_table = Table(title="Suggested Indexes") 

304 index_table.add_column("Index Name", style="cyan") 

305 index_table.add_column("Column", style="green") 

306 index_table.add_column("Order", style="yellow") 

307 

308 for index in suggested_indexes: 

309 for idx_col in index.columns: 

310 index_table.add_row(index.name, idx_col.column, idx_col.order) 

311 

312 console.print(index_table) 

313 

314 console.print("[dim]Review the configuration and adjust as needed before syncing.[/dim]") 

315 

316 

317@click.command() 

318@click.argument("file_paths", nargs=-1, type=click.Path(exists=True, path_type=Path), required=True) 

319@click.option( 

320 "--config", 

321 "-c", 

322 "config", 

323 type=click.Path(path_type=Path), 

324 required=True, 

325 help="Path to the YAML configuration file", 

326) 

327@click.option( 

328 "--job", 

329 "-j", 

330 "job", 

331 type=str, 

332 default=None, 

333 help="Name of the job to create (auto-generated from filename if not provided)", 

334) 

335@click.option( 

336 "--force", 

337 "-f", 

338 is_flag=True, 

339 help="Overwrite job if it already exists in config", 

340) 

341def prepare(file_paths: tuple[Path, ...], config: Path, job: str | None, force: bool) -> None: 

342 """Prepare config entries by analyzing CSV and CDF files. 

343 

344 Analyzes CSV and CDF files to detect column names and data types, then generates 

345 configuration entries. Each CSV file creates one job in the config file. 

346 For CDF files, each variable group (grouped by record count) creates one job. 

347 If job name is not provided, it will be auto-generated from the filename. 

348 

349 Arguments: 

350 FILE_PATHS: One or more CSV or CDF files to analyze (required) 

351 

352 Options: 

353 --config, -c: Path to the YAML configuration file (required) 

354 --job, -j: Name of the job to create (optional - auto-generated from filename if not provided) 

355 --force, -f: Overwrite job if it already exists in config 

356 

357 Examples: 

358 # Create job config with auto-generated name 

359 crump prepare data.csv --config crump_config.yml 

360 

361 # Process a CDF file (creates multiple jobs, one per variable group) 

362 crump prepare data.cdf --config crump_config.yml 

363 

364 # Create job config with custom name 

365 crump prepare data.csv --config crump_config.yml --job my_job 

366 

367 # Process multiple CSV files (auto-generates job names) 

368 crump prepare file1.csv file2.csv file3.csv --config crump_config.yml 

369 

370 # Process multiple files with glob pattern (CSV and CDF supported) 

371 crump prepare data/*.csv data/*.cdf -c crump_config.yml 

372 

373 # Overwrite existing job config 

374 crump prepare data.csv -c crump_config.yml -j my_job --force 

375 """ 

376 temp_dir: Path | None = None 

377 temp_csv_files: list[Path] = [] 

378 

379 try: 

380 from crump.file_types import InputFileType 

381 

382 # Separate CSV and CDF files using InputFileType 

383 csv_files = [] 

384 cdf_files = [] 

385 unsupported_files = [] 

386 

387 for f in file_paths: 

388 try: 

389 file_type = InputFileType.from_path(str(f)) 

390 if file_type == InputFileType.CSV: 

391 csv_files.append(f) 

392 elif file_type == InputFileType.CDF: 

393 cdf_files.append(f) 

394 else: 

395 unsupported_files.append(f) 

396 except ValueError: 

397 unsupported_files.append(f) 

398 

399 # Warn about unsupported files 

400 if unsupported_files: 

401 for f in unsupported_files: 

402 console.print( 

403 f"[yellow]Warning: Unsupported file type '{f.suffix}' for {f.name}. " 

404 "Only .csv and .cdf files are supported.[/yellow]" 

405 ) 

406 

407 # Validate: if job name provided, only one file allowed 

408 if job and len(file_paths) > 1: 

409 console.print( 

410 "[red]Error:[/red] Cannot specify job name when processing multiple files. " 

411 "Job names will be auto-generated from filenames." 

412 ) 

413 raise click.Abort() 

414 

415 # Validate: if job name provided with CDF file, warn user 

416 if job and cdf_files: 

417 console.print( 

418 "[yellow]Warning:[/yellow] Custom job name will be ignored for CDF files. " 

419 "Job names will be auto-generated from variable names." 

420 ) 

421 

422 # Extract CDF files to temporary CSV files 

423 if cdf_files: 

424 console.print(f"\n[bold]Processing {len(cdf_files)} CDF file(s)...[/bold]\n") 

425 temp_dir = Path(tempfile.mkdtemp(prefix="data_sync_cdf_")) 

426 

427 for cdf_file in cdf_files: 

428 try: 

429 extracted_csvs = _extract_cdf_to_temp_tabular_files( 

430 cdf_file, temp_dir, max_records=50 

431 ) 

432 temp_csv_files.extend(extracted_csvs) 

433 console.print( 

434 f"[green]{CHECKMARK} CDF extraction complete: {cdf_file.name}[/green]\n" 

435 ) 

436 except ValueError as e: 

437 console.print(f"[red]Error extracting {cdf_file.name}:[/red] {e}\n") 

438 if len(file_paths) == 1: 

439 raise click.Abort() from e 

440 continue 

441 

442 # Combine original CSV files with extracted temp CSV files 

443 all_csv_files = csv_files + temp_csv_files 

444 

445 if not all_csv_files: 

446 console.print("[yellow]No CSV files to process[/yellow]") 

447 return 

448 

449 # Display summary of files to process 

450 if temp_csv_files: 

451 console.print( 

452 f"\n[bold]Processing {len(all_csv_files)} CSV file(s) " 

453 f"({len(csv_files)} original, {len(temp_csv_files)} from CDF)...[/bold]\n" 

454 ) 

455 

456 # Load or create config once (used for all files) 

457 crump_config = CrumpConfig.from_yaml(config) if config.exists() else CrumpConfig(jobs={}) 

458 

459 jobs_created = 0 

460 jobs_updated = 0 

461 

462 # Process each CSV file 

463 for file_path in all_csv_files: 

464 # Determine job name 

465 job_name = job or generate_job_name_from_filename(file_path.name) 

466 

467 console.print(f"\n[cyan]Analyzing {file_path.name}...[/cyan]") 

468 console.print(f"[dim] Job name: {job_name}[/dim]") 

469 

470 # Analyze CSV file to detect types and nullable status 

471 column_info = analyze_tabular_file_types_and_nullable(file_path) 

472 

473 if not column_info: 

474 console.print("[red]Error:[/red] No columns found in CSV file") 

475 continue 

476 

477 columns = list(column_info.keys()) 

478 console.print(f"[dim] Found {len(columns)} columns[/dim]") 

479 

480 # Suggest ID column using matchers from config if available 

481 id_column = suggest_id_column(columns, crump_config.id_column_matchers) 

482 console.print(f"[dim] Suggested ID column: {id_column}[/dim]") 

483 

484 # Create column mappings and suggest indexes 

485 column_mappings = _create_column_mappings(columns, id_column, column_info) 

486 suggested_indexes = suggest_indexes(column_info, id_column) 

487 console.print(f"[dim] Suggested {len(suggested_indexes)} index(es)[/dim]") 

488 

489 # Detect filename patterns and suggest filename_to_column mapping 

490 filename_to_column = detect_filename_patterns(file_path.name) 

491 if filename_to_column: 

492 console.print("[dim] Detected date pattern in filename[/dim]") 

493 console.print(f"[dim] Template: {filename_to_column.template}[/dim]") 

494 

495 # Create the job with ID mapping that includes nullable info 

496 id_type, id_nullable = column_info[id_column] 

497 new_job = CrumpJob( 

498 name=job_name, 

499 target_table=job_name, # Use job name as table name 

500 id_mapping=[ 

501 ColumnMapping( 

502 csv_column=id_column, 

503 db_column="id", 

504 data_type=id_type, 

505 nullable=id_nullable, 

506 ) 

507 ], 

508 columns=column_mappings if column_mappings else None, 

509 filename_to_column=filename_to_column, 

510 indexes=suggested_indexes if suggested_indexes else None, 

511 ) 

512 

513 # Add or update job 

514 try: 

515 job_exists = job_name in crump_config.jobs 

516 crump_config.add_or_update_job(new_job, force=force) 

517 

518 if job_exists: 

519 jobs_updated += 1 

520 else: 

521 jobs_created += 1 

522 

523 except ValueError as e: 

524 console.print(f"[red]Error:[/red] {e}") 

525 console.print("[dim]Use --force to overwrite the existing job[/dim]") 

526 if len(file_paths) == 1: 

527 raise click.Abort() from e 

528 continue 

529 

530 # Display results for this file 

531 _display_prepare_results( 

532 new_job, config, id_column, column_info, column_mappings, suggested_indexes 

533 ) 

534 

535 # Save config once after processing all files 

536 if jobs_created > 0 or jobs_updated > 0: 

537 crump_config.save_to_yaml(config) 

538 console.print(f"\n[green]{CHECKMARK} Configuration saved to {config}[/green]") 

539 if jobs_created > 0: 

540 console.print(f"[dim] Jobs created: {jobs_created}[/dim]") 

541 if jobs_updated > 0: 

542 console.print(f"[dim] Jobs updated: {jobs_updated}[/dim]") 

543 if temp_csv_files: 

544 console.print(f"[dim] CSV files extracted from CDF: {len(temp_csv_files)}[/dim]") 

545 else: 

546 console.print("\n[yellow]No jobs were created or updated[/yellow]") 

547 

548 except FileNotFoundError as e: 

549 console.print(f"[red]Error:[/red] {e}") 

550 raise click.Abort() from e 

551 except ValueError as e: 

552 console.print(f"[red]Error:[/red] {e}") 

553 raise click.Abort() from e 

554 except Exception as e: 

555 console.print(f"[red]Unexpected error:[/red] {e}") 

556 raise click.Abort() from e 

557 finally: 

558 # Clean up temporary files 

559 if temp_csv_files: 

560 console.print("\n[dim]Cleaning up temporary files...[/dim]") 

561 for temp_file in temp_csv_files: 

562 try: 

563 temp_file.unlink() 

564 except Exception as e: 

565 console.print(f"[yellow]Warning: Could not delete {temp_file}: {e}[/yellow]") 

566 

567 # Clean up temporary directory 

568 if temp_dir and temp_dir.exists(): 

569 try: 

570 temp_dir.rmdir() 

571 except Exception as e: 

572 console.print(f"[yellow]Warning: Could not delete temp directory: {e}[/yellow]")