Coverage for src / crump / config.py: 91%

401 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""Configuration file handling for data_sync.""" 

2 

3from __future__ import annotations 

4 

5import enum 

6import fnmatch 

7import importlib 

8import re 

9from pathlib import Path 

10from typing import Any 

11 

12import yaml # type: ignore[import-untyped] 

13 

14 

15class FailureMode(enum.Enum): 

16 """Controls how data/config mismatches are handled during sync. 

17 

18 STRICT: Rejects rows that don't conform to the config schema. 

19 - Missing nullable fields → NULL 

20 - Missing non-nullable fields → skip row 

21 - String exceeding varchar limit → skip row 

22 

23 PERMISSIVE: Best-effort import of as much data as possible. 

24 - Missing nullable fields → NULL 

25 - Missing non-nullable fields → default value (0 for integers, "" for strings) 

26 - String exceeding varchar limit → truncate to fit 

27 """ 

28 

29 STRICT = "strict" 

30 PERMISSIVE = "permissive" 

31 

32 

33class DuplicateKeySafeLoader(yaml.SafeLoader): 

34 """Custom YAML loader that handles duplicate null keys by converting them to a list.""" 

35 

36 pass 

37 

38 

39def dict_constructor(loader: yaml.Loader, node: yaml.Node) -> dict[Any, Any]: 

40 """Custom dict constructor that preserves duplicate null keys in a list. 

41 

42 When a dict has multiple entries with None (null/~) as the key, instead of 

43 overwriting them, collect them all in a list under the None key. 

44 """ 

45 pairs = loader.construct_pairs(node) 

46 result: dict[Any, Any] = {} 

47 

48 for key, value in pairs: 

49 if key is None and key in result: 

50 # Duplicate null key found 

51 if not isinstance(result[None], list): 

52 # Convert first occurrence to list 

53 result[None] = [result[None]] 

54 result[None].append(value) 

55 elif key is None and any(k is None for k, _ in pairs[: pairs.index((key, value))]): 

56 # This is not the first null key, skip (already handled above) 

57 continue 

58 else: 

59 result[key] = value 

60 

61 return result 

62 

63 

64# Register the custom constructor 

65DuplicateKeySafeLoader.add_constructor( 

66 yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, dict_constructor 

67) 

68 

69 

70class ColumnMapping: 

71 """Mapping between CSV and database columns.""" 

72 

73 def __init__( 

74 self, 

75 csv_column: str | None, 

76 db_column: str, 

77 data_type: str | None = None, 

78 nullable: bool | None = None, 

79 lookup: dict[str, Any] | None = None, 

80 expression: str | None = None, 

81 function: str | None = None, 

82 input_columns: list[str] | None = None, 

83 ) -> None: 

84 """Initialize column mapping. 

85 

86 Args: 

87 csv_column: Name of the column in the CSV file. Can be None for custom functions 

88 creating new columns. Can be specified with expression/function to provide 

89 a named reference for the transformation. 

90 db_column: Name of the column in the database 

91 data_type: Optional data type (integer, float, date, datetime, text, varchar(N)) 

92 nullable: Optional flag indicating if NULL values are allowed (True=NULL, False=NOT NULL) 

93 lookup: Optional dictionary to map CSV values to database values. 

94 Values not in the lookup are passed through unchanged. 

95 Example: {"active": 1, "inactive": 0} to convert status strings to integers. 

96 expression: Optional inline Python expression for custom calculations. 

97 Example: "float(temperature) * 1.8 + 32" for Celsius to Fahrenheit 

98 function: Optional reference to external function in format "module.function". 

99 Example: "my_functions.calculate_percentage" 

100 input_columns: List of CSV column names to pass as parameters to expression/function. 

101 Required if expression or function is specified. 

102 

103 Raises: 

104 ValueError: If both expression and function are specified, or if expression/function 

105 is specified without input_columns. 

106 """ 

107 # Validation 

108 if expression is not None and function is not None: 

109 raise ValueError("Cannot specify both 'expression' and 'function'") 

110 

111 if (expression is not None or function is not None) and not input_columns: 

112 raise ValueError("Must specify 'input_columns' when using 'expression' or 'function'") 

113 

114 self.csv_column = csv_column 

115 self.db_column = db_column 

116 self.data_type = data_type 

117 self.nullable = nullable 

118 self.lookup = lookup 

119 self.expression = expression 

120 self.function = function 

121 self.input_columns = input_columns 

122 

123 def apply_lookup(self, value: str) -> Any: 

124 """Apply lookup transformation to a value. 

125 

126 Args: 

127 value: The value from the CSV 

128 

129 Returns: 

130 The transformed value if found in lookup, otherwise the original value 

131 """ 

132 if self.lookup is None: 

133 return value 

134 return self.lookup.get(value, value) 

135 

136 def apply_custom_function(self, row_data: dict[str, Any]) -> Any: 

137 """Apply custom function or expression to compute a value. 

138 

139 Args: 

140 row_data: Dictionary of column name to value from the CSV row 

141 

142 Returns: 

143 The computed value from the expression or function 

144 

145 Raises: 

146 ValueError: If input columns are missing from row_data 

147 RuntimeError: If expression evaluation or function execution fails 

148 """ 

149 if self.expression is None and self.function is None: 

150 raise RuntimeError("No expression or function defined for custom function") 

151 

152 if not self.input_columns: 

153 raise RuntimeError("No input_columns defined for custom function") 

154 

155 # Collect input values 

156 input_values = [] 

157 for col_name in self.input_columns: 

158 if col_name not in row_data: 

159 raise ValueError(f"Input column '{col_name}' not found in row data") 

160 input_values.append(row_data[col_name]) 

161 

162 if self.expression: 

163 # Execute inline expression 

164 # Create a namespace with only the input values and safe built-ins 

165 namespace = {} 

166 for i, col_name in enumerate(self.input_columns): 

167 namespace[col_name] = input_values[i] 

168 

169 # Add safe built-in functions 

170 namespace["__builtins__"] = { 

171 "abs": abs, 

172 "min": min, 

173 "max": max, 

174 "round": round, 

175 "int": int, 

176 "float": float, 

177 "str": str, 

178 "bool": bool, 

179 "len": len, 

180 } 

181 

182 try: 

183 result = eval(self.expression, namespace) 

184 return result 

185 except Exception as e: 

186 raise RuntimeError(f"Failed to evaluate expression '{self.expression}': {e}") from e 

187 else: 

188 # Execute external function 

189 try: 

190 # Parse module and function name 

191 parts = self.function.split(".") # type: ignore[union-attr] 

192 if len(parts) < 2: 

193 raise ValueError( 

194 f"Function must be in format 'module.function', got '{self.function}'" 

195 ) 

196 

197 module_name = ".".join(parts[:-1]) 

198 function_name = parts[-1] 

199 

200 # Import module and get function 

201 module = importlib.import_module(module_name) 

202 func = getattr(module, function_name) 

203 

204 # Call function with input values 

205 result = func(*input_values) 

206 return result 

207 except Exception as e: 

208 raise RuntimeError(f"Failed to execute function '{self.function}': {e}") from e 

209 

210 

211class FilenameColumnMapping: 

212 """Mapping for a single column extracted from filename.""" 

213 

214 def __init__( 

215 self, 

216 name: str, 

217 db_column: str | None = None, 

218 data_type: str | None = None, 

219 use_to_delete_old_rows: bool = False, 

220 ) -> None: 

221 """Initialize filename column mapping. 

222 

223 Args: 

224 name: Name of the extracted value (from template/regex) 

225 db_column: Database column name (defaults to name if not specified) 

226 data_type: Data type (varchar(N), integer, float, date, datetime, text) 

227 use_to_delete_old_rows: If True, this column is used to identify stale rows 

228 """ 

229 self.name = name 

230 self.db_column = db_column or name 

231 self.data_type = data_type 

232 self.use_to_delete_old_rows = use_to_delete_old_rows 

233 

234 

235class FilenameToColumn: 

236 """Configuration for extracting multiple values from filename.""" 

237 

238 def __init__( 

239 self, 

240 columns: dict[str, FilenameColumnMapping], 

241 template: str | None = None, 

242 regex: str | None = None, 

243 ) -> None: 

244 """Initialize filename to column mapping. 

245 

246 Args: 

247 columns: Dictionary of column name to FilenameColumnMapping 

248 template: Filename template with [column_name] syntax (mutually exclusive with regex) 

249 regex: Regex pattern with named groups (mutually exclusive with template) 

250 

251 Raises: 

252 ValueError: If both template and regex are specified, or neither is specified 

253 """ 

254 if (template is None) == (regex is None): 

255 raise ValueError("Must specify exactly one of 'template' or 'regex'") 

256 

257 self.columns = columns 

258 self.template = template 

259 self.regex = regex 

260 

261 # Pre-compile regex 

262 if template: 

263 self._compiled_regex = self._template_to_regex(template) 

264 else: 

265 self._compiled_regex = re.compile(regex) 

266 

267 def _template_to_regex(self, template: str) -> re.Pattern: 

268 r"""Convert template string to regex pattern. 

269 

270 Args: 

271 template: Template string with [column_name] placeholders 

272 

273 Returns: 

274 Compiled regex pattern with named groups 

275 

276 Example: 

277 >>> mapping = FilenameToColumn(...) 

278 >>> # Template: "[mission]level2[sensor]_[date].cdf" 

279 >>> # becomes: "(?P<mission>.+?)level2(?P<sensor>.+?)_(?P<date>.+?)\.cdf" 

280 """ 

281 # Escape special regex characters 

282 escaped = re.escape(template) 

283 # Replace \[column_name\] with named groups using non-greedy matching 

284 pattern = re.sub(r"\\\[(\w+)\\\]", r"(?P<\1>.+?)", escaped) 

285 return re.compile(pattern) 

286 

287 def extract_values_from_filename(self, filename: str | Path) -> dict[str, str] | None: 

288 """Extract values from filename using template or regex. 

289 

290 Args: 

291 filename: The filename (or path) to extract values from 

292 

293 Returns: 

294 Dictionary of column name to extracted value, or None if no match 

295 

296 Example: 

297 >>> mapping = FilenameToColumn( 

298 ... columns={...}, 

299 ... template="[mission]level2[sensor]_[date]_v[version].cdf" 

300 ... ) 

301 >>> mapping.extract_values_from_filename("imap_level2_primary_20000102_v002.cdf") 

302 {'mission': 'imap', 'sensor': 'primary', 'date': '20000102', 'version': '002'} 

303 """ 

304 if isinstance(filename, Path): 

305 filename = filename.name 

306 

307 match = self._compiled_regex.search(filename) 

308 if not match: 

309 return None 

310 

311 return match.groupdict() 

312 

313 def get_delete_key_columns(self) -> list[str]: 

314 """Get list of database column names used for stale row deletion. 

315 

316 Returns: 

317 List of db_column names where use_to_delete_old_rows is True 

318 """ 

319 return [col.db_column for col in self.columns.values() if col.use_to_delete_old_rows] 

320 

321 

322class IndexColumn: 

323 """Column definition for a database index.""" 

324 

325 def __init__(self, column: str, order: str = "ASC") -> None: 

326 """Initialize index column. 

327 

328 Args: 

329 column: Column name 

330 order: Sort order - 'ASC' or 'DESC' (default: 'ASC') 

331 

332 Raises: 

333 ValueError: If order is not 'ASC' or 'DESC' 

334 """ 

335 if order.upper() not in ("ASC", "DESC"): 

336 raise ValueError(f"Index order must be 'ASC' or 'DESC', got '{order}'") 

337 self.column = column 

338 self.order = order.upper() 

339 

340 

341class Index: 

342 """Database index configuration.""" 

343 

344 def __init__(self, name: str, columns: list[IndexColumn]) -> None: 

345 """Initialize index. 

346 

347 Args: 

348 name: Index name 

349 columns: List of columns with sort order 

350 

351 Raises: 

352 ValueError: If columns list is empty 

353 """ 

354 if not columns: 

355 raise ValueError("Index must have at least one column") 

356 self.name = name 

357 self.columns = columns 

358 

359 

360class CrumpJob: 

361 """Configuration for a single sync job.""" 

362 

363 def __init__( 

364 self, 

365 name: str, 

366 target_table: str, 

367 id_mapping: list[ColumnMapping], 

368 columns: list[ColumnMapping] | None = None, 

369 filename_match: str | None = None, 

370 filename_to_column: FilenameToColumn | None = None, 

371 indexes: list[Index] | None = None, 

372 sample_percentage: float | None = None, 

373 failure_mode: FailureMode = FailureMode.PERMISSIVE, 

374 ) -> None: 

375 """Initialize a sync job. 

376 

377 Args: 

378 name: Name of the job 

379 target_table: Target database table name 

380 id_mapping: List of mappings for ID columns (supports compound primary keys) 

381 columns: List of column mappings to sync (all columns if None) 

382 filename_to_column: Optional filename-to-column extraction configuration 

383 filename_match: Allow automatic job selection from config file if this is specified 

384 indexes: Optional list of database indexes to create 

385 sample_percentage: Optional percentage of rows to sample (0-100). If None or 100, 

386 syncs all rows. Values like 10 mean 1 in every 10 rows. 

387 Always includes first and last row. 

388 failure_mode: How to handle data/config mismatches. PERMISSIVE (default) tries to 

389 import as much data as possible. STRICT rejects non-conforming rows. 

390 """ 

391 self.name = name 

392 self.target_table = target_table 

393 self.id_mapping = id_mapping 

394 self.columns = columns or [] 

395 self.filename_to_column = filename_to_column 

396 self.indexes = indexes or [] 

397 self.sample_percentage = sample_percentage 

398 self.filename_match = filename_match 

399 self.failure_mode = failure_mode 

400 

401 # Validate sample_percentage 

402 if sample_percentage is not None and not (0 <= sample_percentage <= 100): 

403 raise ValueError( 

404 f"sample_percentage must be between 0 and 100, got {sample_percentage}" 

405 ) 

406 

407 

408class CrumpConfig: 

409 """Configuration for data synchronization.""" 

410 

411 def __init__( 

412 self, jobs: dict[str, CrumpJob], id_column_matchers: list[str] | None = None 

413 ) -> None: 

414 """Initialize sync configuration. 

415 

416 Args: 

417 jobs: Dictionary of job name to CrumpJob 

418 id_column_matchers: Optional list of column name patterns to match as ID columns 

419 (e.g., ['id', 'uuid', 'key']). If None, uses default patterns. 

420 """ 

421 self.jobs = jobs 

422 self.id_column_matchers = id_column_matchers 

423 

424 def get_job(self, name: str) -> CrumpJob | None: 

425 """Get a job by name. 

426 

427 Args: 

428 name: Name of the job 

429 

430 Returns: 

431 CrumpJob if found, None otherwise 

432 """ 

433 return self.jobs.get(name) 

434 

435 def get_job_or_auto_detect( 

436 self, name: str | None = None, filename: str | None = None 

437 ) -> tuple[CrumpJob, str] | None: 

438 """Get a job by name, or auto-detect if there's only one job. 

439 

440 Args: 

441 name: Name of the job (optional - if None, auto-detect single job) 

442 

443 Returns: 

444 Tuple of (CrumpJob, job_name) if found/detected, None otherwise 

445 

446 Raises: 

447 ValueError: If name is None and config has multiple jobs 

448 """ 

449 if name is not None: 

450 # Job name explicitly provided 

451 job = self.jobs.get(name) 

452 if job: 

453 return (job, name) 

454 return None 

455 

456 # Auto-detect: only allowed if there's exactly one job 

457 if len(self.jobs) == 0: 

458 return None 

459 

460 if len(self.jobs) == 1: 

461 # Only one job - use it automatically 

462 job_name = next(iter(self.jobs.keys())) 

463 return (self.jobs[job_name], job_name) 

464 

465 if filename is not None: 

466 # Try to auto-detect job based on filename_match 

467 for job_name, job in self.jobs.items(): 

468 if job.filename_match: 

469 # match using the full path as a string 

470 if fnmatch.fnmatch(filename, job.filename_match): 

471 return (job, job_name) 

472 # or match using just the filename part 

473 if fnmatch.fnmatch(Path(filename).name, job.filename_match): 

474 return (job, job_name) 

475 # support regex as well but check if it is a valid regex 

476 try: 

477 pattern = re.compile(job.filename_match) 

478 if pattern.search(filename): 

479 return (job, job_name) 

480 except re.error: 

481 continue 

482 

483 # No matching job found 

484 

485 # Multiple jobs - cannot auto-detect 

486 raise ValueError( 

487 f"Config contains {len(self.jobs)} jobs and unable to match one automatically. " 

488 "Please specify --job to select which one to use or configure filename_match in the job config." 

489 ) 

490 

491 @classmethod 

492 def from_yaml(cls, config_path: Path) -> CrumpConfig: 

493 r"""Load configuration from a YAML file. 

494 

495 Args: 

496 config_path: Path to the YAML configuration file 

497 

498 Returns: 

499 CrumpConfig instance 

500 

501 Raises: 

502 FileNotFoundError: If config file doesn't exist 

503 ValueError: If config file is invalid 

504 

505 Example YAML structure: 

506 id_column_matchers: # Optional, root-level config 

507 - id 

508 - uuid 

509 - key 

510 jobs: 

511 my_job: 

512 target_table: users 

513 id_mapping: 

514 user_id: id # Single column primary key 

515 # Or for compound primary key: 

516 # user_id: id 

517 # tenant_id: tenant 

518 columns: 

519 name: full_name # Simple format 

520 email: email_address 

521 age: 

522 db_column: user_age 

523 type: integer 

524 salary: 

525 db_column: monthly_salary 

526 type: float 

527 sample_percentage: 10 # Optional: sync only 10% of rows (1 in 10) 

528 # Always includes first and last row 

529 # Omit or set to 100 to sync all rows 

530 filename_to_column: # Optional: extract values from filename 

531 template: "[mission]level2[sensor]_[date]_v[version].cdf" 

532 # OR use regex with named groups: 

533 # regex: "(?P<mission>[a-z]+)_level2_(?P<sensor>[a-z]+)_(?P<date>\\d{8})_v(?P<version>\\d+)\\.cdf" 

534 columns: 

535 mission: 

536 db_column: mission_name 

537 type: varchar(10) 

538 sensor: 

539 db_column: sensor_type 

540 type: varchar(20) 

541 date: 

542 db_column: observation_date 

543 type: date 

544 use_to_delete_old_rows: true # Use this column to identify stale rows 

545 version: 

546 db_column: file_version 

547 type: varchar(10) 

548 indexes: # Optional 

549 - name: idx_email 

550 columns: 

551 - column: email 

552 order: ASC 

553 - name: idx_observation_date 

554 columns: 

555 - column: observation_date 

556 order: DESC 

557 """ 

558 if not config_path.exists(): 

559 raise FileNotFoundError(f"Config file not found: {config_path}") 

560 

561 with open(config_path, encoding="utf-8") as f: 

562 data = yaml.load(f, Loader=DuplicateKeySafeLoader) 

563 

564 if not data or "jobs" not in data: 

565 raise ValueError("Config file must contain 'jobs' section") 

566 

567 # Parse optional id_column_matchers 

568 id_column_matchers = data.get("id_column_matchers") 

569 if id_column_matchers is not None and not isinstance(id_column_matchers, list): 

570 raise ValueError("id_column_matchers must be a list of strings") 

571 

572 jobs = {} 

573 for job_name, job_data in data["jobs"].items(): 

574 jobs[job_name] = cls._parse_job(job_name, job_data) 

575 

576 return cls(jobs=jobs, id_column_matchers=id_column_matchers) 

577 

578 @staticmethod 

579 def _parse_column_mapping(csv_col: str | None, value: Any, job_name: str) -> ColumnMapping: 

580 """Parse a column mapping from config value. 

581 

582 Supports multiple formats: 

583 1. Simple: csv_column: db_column 

584 2. Extended: csv_column: {db_column: name, type: data_type, nullable: true/false, lookup: {...}} 

585 3. Custom function: null: {db_column: name, expression: "...", input_columns: [...]} 

586 4. Custom function: null: {db_column: name, function: "module.func", input_columns: [...]} 

587 

588 Args: 

589 csv_col: CSV column name (can be None for custom functions) 

590 value: Either a string (db_column) or dict with db_column and optional fields 

591 job_name: Job name (for error messages) 

592 

593 Returns: 

594 ColumnMapping instance 

595 

596 Raises: 

597 ValueError: If mapping format is invalid 

598 """ 

599 if isinstance(value, str): 

600 # Simple format: csv_column: db_column 

601 if csv_col is None: 

602 raise ValueError( 

603 f"Job '{job_name}' cannot use simple format with null csv_column key" 

604 ) 

605 return ColumnMapping(csv_column=csv_col, db_column=value) 

606 elif isinstance(value, dict): 

607 # Extended format with various options 

608 if "db_column" not in value: 

609 raise ValueError( 

610 f"Job '{job_name}' column '{csv_col}' extended mapping must have 'db_column'" 

611 ) 

612 db_column = value["db_column"] 

613 data_type = value.get("type") # Optional 

614 nullable = value.get("nullable") # Optional 

615 lookup = value.get("lookup") # Optional 

616 expression = value.get("expression") # Optional 

617 function = value.get("function") # Optional 

618 input_columns = value.get("input_columns") # Optional 

619 

620 # Validate lookup is a dict if provided 

621 if lookup is not None and not isinstance(lookup, dict): 

622 raise ValueError(f"Job '{job_name}' column '{csv_col}' lookup must be a dictionary") 

623 

624 # Validate expression is a string if provided 

625 if expression is not None and not isinstance(expression, str): 

626 raise ValueError(f"Job '{job_name}' column '{csv_col}' expression must be a string") 

627 

628 # Validate function is a string if provided 

629 if function is not None and not isinstance(function, str): 

630 raise ValueError(f"Job '{job_name}' column '{csv_col}' function must be a string") 

631 

632 # Validate input_columns is a list if provided 

633 if input_columns is not None and not isinstance(input_columns, list): 

634 raise ValueError( 

635 f"Job '{job_name}' column '{csv_col}' input_columns must be a list" 

636 ) 

637 

638 return ColumnMapping( 

639 csv_column=csv_col, 

640 db_column=db_column, 

641 data_type=data_type, 

642 nullable=nullable, 

643 lookup=lookup, 

644 expression=expression, 

645 function=function, 

646 input_columns=input_columns, 

647 ) 

648 else: 

649 raise ValueError( 

650 f"Job '{job_name}' column '{csv_col}' must be string or dict, got {type(value)}" 

651 ) 

652 

653 @staticmethod 

654 def _parse_job(name: str, job_data: dict[str, Any]) -> CrumpJob: 

655 """Parse a job from configuration data. 

656 

657 Args: 

658 name: Name of the job 

659 job_data: Job configuration dictionary 

660 

661 Returns: 

662 CrumpJob instance 

663 

664 Raises: 

665 ValueError: If job configuration is invalid 

666 """ 

667 if "target_table" not in job_data: 

668 raise ValueError(f"Job '{name}' missing 'target_table'") 

669 

670 if "id_mapping" not in job_data: 

671 raise ValueError(f"Job '{name}' missing 'id_mapping'") 

672 

673 # Parse id_mapping as a dict: {csv_column: db_column} or {csv_column: {db_column: x, type: y}} 

674 # Supports multiple columns for compound primary keys 

675 id_data = job_data["id_mapping"] 

676 if not isinstance(id_data, dict): 

677 raise ValueError(f"Job '{name}' id_mapping must be a dictionary") 

678 

679 if len(id_data) < 1: 

680 raise ValueError(f"Job '{name}' id_mapping must have at least one mapping") 

681 

682 id_mapping = [] 

683 for csv_col, value in id_data.items(): 

684 id_mapping.append(CrumpConfig._parse_column_mapping(csv_col, value, name)) 

685 

686 # Parse columns as a dict: {csv_column: db_column} or {csv_column: {db_column: x, type: y}} 

687 columns = [] 

688 if "columns" in job_data and job_data["columns"]: 

689 col_data = job_data["columns"] 

690 if not isinstance(col_data, dict): 

691 raise ValueError(f"Job '{name}' columns must be a dictionary") 

692 

693 for csv_col, value in col_data.items(): 

694 # Handle multiple custom functions with null keys (collected as list) 

695 if csv_col is None and isinstance(value, list): 

696 for item in value: 

697 columns.append(CrumpConfig._parse_column_mapping(csv_col, item, name)) 

698 else: 

699 columns.append(CrumpConfig._parse_column_mapping(csv_col, value, name)) 

700 

701 # Parse optional filename_to_column 

702 filename_to_column = None 

703 if "filename_to_column" in job_data and job_data["filename_to_column"]: 

704 ftc_data = job_data["filename_to_column"] 

705 if not isinstance(ftc_data, dict): 

706 raise ValueError(f"Job '{name}' filename_to_column must be a dictionary") 

707 

708 # Check that exactly one of template or regex is specified 

709 has_template = "template" in ftc_data and ftc_data["template"] 

710 has_regex = "regex" in ftc_data and ftc_data["regex"] 

711 

712 if not has_template and not has_regex: 

713 raise ValueError( 

714 f"Job '{name}' filename_to_column must have either 'template' or 'regex'" 

715 ) 

716 

717 if has_template and has_regex: 

718 raise ValueError( 

719 f"Job '{name}' filename_to_column cannot have both 'template' and 'regex'" 

720 ) 

721 

722 # Parse columns 

723 if "columns" not in ftc_data or not ftc_data["columns"]: 

724 raise ValueError(f"Job '{name}' filename_to_column must have 'columns'") 

725 

726 if not isinstance(ftc_data["columns"], dict): 

727 raise ValueError(f"Job '{name}' filename_to_column columns must be a dictionary") 

728 

729 ftc_columns = {} 

730 for col_name, col_data in ftc_data["columns"].items(): 

731 if isinstance(col_data, dict): 

732 db_column = col_data.get("db_column") 

733 data_type = col_data.get("type") 

734 use_to_delete_old_rows = col_data.get("use_to_delete_old_rows", False) 

735 elif col_data is None: 

736 # Simple format: column_name: null (use defaults) 

737 db_column = None 

738 data_type = None 

739 use_to_delete_old_rows = False 

740 else: 

741 raise ValueError( 

742 f"Job '{name}' filename_to_column column '{col_name}' must be a dictionary or null" 

743 ) 

744 

745 ftc_columns[col_name] = FilenameColumnMapping( 

746 name=col_name, 

747 db_column=db_column, 

748 data_type=data_type, 

749 use_to_delete_old_rows=use_to_delete_old_rows, 

750 ) 

751 

752 filename_to_column = FilenameToColumn( 

753 columns=ftc_columns, 

754 template=ftc_data.get("template"), 

755 regex=ftc_data.get("regex"), 

756 ) 

757 

758 # Parse optional indexes 

759 indexes = [] 

760 if "indexes" in job_data and job_data["indexes"]: 

761 indexes_data = job_data["indexes"] 

762 if not isinstance(indexes_data, list): 

763 raise ValueError(f"Job '{name}' indexes must be a list") 

764 

765 for idx_data in indexes_data: 

766 if not isinstance(idx_data, dict): 

767 raise ValueError(f"Job '{name}' index entry must be a dictionary") 

768 

769 if "name" not in idx_data: 

770 raise ValueError(f"Job '{name}' index missing 'name'") 

771 

772 if "columns" not in idx_data: 

773 raise ValueError(f"Job '{name}' index missing 'columns'") 

774 

775 idx_columns = [] 

776 for col_data in idx_data["columns"]: 

777 if not isinstance(col_data, dict): 

778 raise ValueError(f"Job '{name}' index column must be a dictionary") 

779 

780 if "column" not in col_data: 

781 raise ValueError(f"Job '{name}' index column missing 'column' field") 

782 

783 order = col_data.get("order", "ASC") 

784 idx_columns.append(IndexColumn(column=col_data["column"], order=order)) 

785 

786 indexes.append(Index(name=idx_data["name"], columns=idx_columns)) 

787 

788 # Parse optional sample_percentage 

789 sample_percentage = None 

790 if "sample_percentage" in job_data and job_data["sample_percentage"] is not None: 

791 sample_percentage = job_data["sample_percentage"] 

792 # Validate it's a number 

793 if not isinstance(sample_percentage, (int, float)): 

794 raise ValueError(f"Job '{name}' sample_percentage must be a number") 

795 if not (0 <= sample_percentage <= 100): 

796 raise ValueError( 

797 f"Job '{name}' sample_percentage must be between 0 and 100, got {sample_percentage}" 

798 ) 

799 

800 filename_match = None 

801 if "filename_match" in job_data and job_data["filename_match"]: 

802 filename_match = job_data["filename_match"] 

803 if not isinstance(filename_match, str): 

804 raise ValueError(f"Job '{name}' filename_match must be a string") 

805 

806 # Parse optional failure_mode (default: permissive) 

807 failure_mode = FailureMode.PERMISSIVE 

808 if "failure_mode" in job_data and job_data["failure_mode"] is not None: 

809 fm_value = job_data["failure_mode"] 

810 if not isinstance(fm_value, str): 

811 raise ValueError(f"Job '{name}' failure_mode must be a string") 

812 fm_lower = fm_value.lower().strip() 

813 if fm_lower == "strict": 

814 failure_mode = FailureMode.STRICT 

815 elif fm_lower == "permissive": 

816 failure_mode = FailureMode.PERMISSIVE 

817 else: 

818 raise ValueError( 

819 f"Job '{name}' failure_mode must be 'strict' or 'permissive', got '{fm_value}'" 

820 ) 

821 

822 return CrumpJob( 

823 name=name, 

824 target_table=job_data["target_table"], 

825 id_mapping=id_mapping, 

826 columns=columns if columns else None, 

827 filename_to_column=filename_to_column, 

828 filename_match=filename_match, 

829 indexes=indexes if indexes else None, 

830 sample_percentage=sample_percentage, 

831 failure_mode=failure_mode, 

832 ) 

833 

834 def add_or_update_job(self, job: CrumpJob, force: bool = False) -> bool: 

835 """Add a new job or update an existing one. 

836 

837 Args: 

838 job: CrumpJob to add or update 

839 force: If True, overwrite existing job. If False, raise error if job exists. 

840 

841 Returns: 

842 True if job was added/updated 

843 

844 Raises: 

845 ValueError: If job already exists and force=False 

846 """ 

847 if job.name in self.jobs and not force: 

848 raise ValueError(f"Job '{job.name}' already exists. Use force=True to overwrite.") 

849 

850 self.jobs[job.name] = job 

851 return True 

852 

853 def to_yaml_dict(self) -> dict[str, Any]: 

854 """Convert config to dictionary suitable for YAML serialization. 

855 

856 Returns: 

857 Dictionary representation of config 

858 """ 

859 jobs_dict = {} 

860 

861 for job_name, job in self.jobs.items(): 

862 # Build id_mapping dict (supports compound primary keys) 

863 id_mapping_dict: dict[str, Any] = {} 

864 for id_col in job.id_mapping: 

865 needs_extended = ( 

866 id_col.data_type 

867 or id_col.nullable is not None 

868 or id_col.lookup is not None 

869 or id_col.expression is not None 

870 or id_col.function is not None 

871 or id_col.input_columns is not None 

872 ) 

873 if needs_extended: 

874 mapping_dict: dict[str, Any] = {"db_column": id_col.db_column} 

875 if id_col.data_type: 

876 mapping_dict["type"] = id_col.data_type 

877 if id_col.nullable is not None: 

878 mapping_dict["nullable"] = id_col.nullable 

879 if id_col.lookup is not None: 

880 mapping_dict["lookup"] = id_col.lookup 

881 if id_col.expression is not None: 

882 mapping_dict["expression"] = id_col.expression 

883 if id_col.function is not None: 

884 mapping_dict["function"] = id_col.function 

885 if id_col.input_columns is not None: 

886 mapping_dict["input_columns"] = id_col.input_columns 

887 # Use None as key for custom functions (no csv_column) 

888 key = id_col.csv_column if id_col.csv_column is not None else None 

889 id_mapping_dict[key] = mapping_dict # type: ignore[index] 

890 else: 

891 id_mapping_dict[id_col.csv_column] = id_col.db_column # type: ignore[index] 

892 

893 job_dict: dict[str, Any] = { 

894 "target_table": job.target_table, 

895 "id_mapping": id_mapping_dict, 

896 } 

897 

898 # Add columns if present 

899 if job.columns: 

900 columns_dict: dict[str, Any] = {} 

901 for col in job.columns: 

902 needs_extended = ( 

903 col.data_type 

904 or col.nullable is not None 

905 or col.lookup is not None 

906 or col.expression is not None 

907 or col.function is not None 

908 or col.input_columns is not None 

909 ) 

910 if needs_extended: 

911 mapping_dict = {"db_column": col.db_column} 

912 if col.data_type: 

913 mapping_dict["type"] = col.data_type 

914 if col.nullable is not None: 

915 mapping_dict["nullable"] = col.nullable 

916 if col.lookup is not None: 

917 mapping_dict["lookup"] = col.lookup 

918 if col.expression is not None: 

919 mapping_dict["expression"] = col.expression 

920 if col.function is not None: 

921 mapping_dict["function"] = col.function 

922 if col.input_columns is not None: 

923 mapping_dict["input_columns"] = col.input_columns 

924 # Use None as key for custom functions (no csv_column) 

925 key = col.csv_column if col.csv_column is not None else None 

926 columns_dict[key] = mapping_dict # type: ignore[index] 

927 else: 

928 columns_dict[col.csv_column] = col.db_column # type: ignore[index] 

929 job_dict["columns"] = columns_dict 

930 

931 # Add filename_to_column if present 

932 if job.filename_to_column: 

933 ftc_dict: dict[str, Any] = {} 

934 if job.filename_to_column.template: 

935 ftc_dict["template"] = job.filename_to_column.template 

936 else: 

937 ftc_dict["regex"] = job.filename_to_column.regex 

938 

939 ftc_columns_dict: dict[str, Any] = {} 

940 for col_name, col_mapping in job.filename_to_column.columns.items(): 

941 col_dict: dict[str, Any] = {} 

942 if col_mapping.db_column != col_mapping.name: 

943 col_dict["db_column"] = col_mapping.db_column 

944 if col_mapping.data_type: 

945 col_dict["type"] = col_mapping.data_type 

946 if col_mapping.use_to_delete_old_rows: 

947 col_dict["use_to_delete_old_rows"] = True 

948 

949 # If col_dict is empty, use None to keep it minimal 

950 ftc_columns_dict[col_name] = col_dict if col_dict else None 

951 

952 ftc_dict["columns"] = ftc_columns_dict 

953 job_dict["filename_to_column"] = ftc_dict 

954 

955 # Add indexes if present 

956 if job.indexes: 

957 indexes_list = [] 

958 for index in job.indexes: 

959 index_dict = { 

960 "name": index.name, 

961 "columns": [ 

962 {"column": col.column, "order": col.order} for col in index.columns 

963 ], 

964 } 

965 indexes_list.append(index_dict) 

966 job_dict["indexes"] = indexes_list 

967 

968 # Add sample_percentage if present and not default 

969 if job.sample_percentage is not None and job.sample_percentage != 100: 

970 job_dict["sample_percentage"] = job.sample_percentage 

971 

972 # Add failure_mode if not the default (permissive) 

973 if job.failure_mode != FailureMode.PERMISSIVE: 

974 job_dict["failure_mode"] = job.failure_mode.value 

975 

976 jobs_dict[job_name] = job_dict 

977 

978 result: dict[str, Any] = {"jobs": jobs_dict} 

979 

980 # Add id_column_matchers if present 

981 if self.id_column_matchers is not None: 

982 result["id_column_matchers"] = self.id_column_matchers 

983 

984 return result 

985 

986 def save_to_yaml(self, config_path: Path) -> None: 

987 """Save configuration to a YAML file. 

988 

989 Args: 

990 config_path: Path to save the YAML file 

991 """ 

992 config_dict = self.to_yaml_dict() 

993 

994 with open(config_path, "w", encoding="utf-8") as f: 

995 yaml.dump(config_dict, f, default_flow_style=False, sort_keys=False) 

996 

997 

998def apply_row_transformations( 

999 row: dict[str, Any], 

1000 sync_columns: list[ColumnMapping], 

1001 filename_to_column: FilenameToColumn | None = None, 

1002 filename_values: dict[str, str] | None = None, 

1003) -> dict[str, Any]: 

1004 """Apply column transformations to a CSV row. 

1005 

1006 This is a shared helper function used by both sync and extract operations 

1007 to apply the same transformations consistently. 

1008 

1009 Args: 

1010 row: Dictionary representing a CSV row (column_name -> value) 

1011 sync_columns: List of ColumnMapping objects defining transformations 

1012 filename_to_column: Optional FilenameToColumn configuration 

1013 filename_values: Optional dict of values extracted from filename 

1014 

1015 Returns: 

1016 Dictionary with transformed values (db_column_name -> value) 

1017 """ 

1018 row_data = {} 

1019 

1020 # Process each column mapping 

1021 for col_mapping in sync_columns: 

1022 # Check if this column uses a custom function/expression 

1023 if col_mapping.expression or col_mapping.function: 

1024 # Apply custom function/expression 

1025 row_data[col_mapping.db_column] = col_mapping.apply_custom_function(row) 

1026 elif col_mapping.csv_column and col_mapping.csv_column in row: 

1027 csv_value = row[col_mapping.csv_column] 

1028 # Apply lookup transformation if configured 

1029 row_data[col_mapping.db_column] = col_mapping.apply_lookup(csv_value) 

1030 elif col_mapping.csv_column and col_mapping.csv_column not in row: 

1031 # CSV column is missing from this row - mark as None for validation 

1032 row_data[col_mapping.db_column] = None 

1033 

1034 # Add filename values if configured 

1035 if filename_to_column and filename_values: 

1036 for col_name, filename_col_mapping in filename_to_column.columns.items(): 

1037 if col_name in filename_values: 

1038 row_data[filename_col_mapping.db_column] = filename_values[col_name] 

1039 

1040 return row_data