Coverage for src / crump / config.py: 91%
401 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
1"""Configuration file handling for data_sync."""
3from __future__ import annotations
5import enum
6import fnmatch
7import importlib
8import re
9from pathlib import Path
10from typing import Any
12import yaml # type: ignore[import-untyped]
15class FailureMode(enum.Enum):
16 """Controls how data/config mismatches are handled during sync.
18 STRICT: Rejects rows that don't conform to the config schema.
19 - Missing nullable fields → NULL
20 - Missing non-nullable fields → skip row
21 - String exceeding varchar limit → skip row
23 PERMISSIVE: Best-effort import of as much data as possible.
24 - Missing nullable fields → NULL
25 - Missing non-nullable fields → default value (0 for integers, "" for strings)
26 - String exceeding varchar limit → truncate to fit
27 """
29 STRICT = "strict"
30 PERMISSIVE = "permissive"
33class DuplicateKeySafeLoader(yaml.SafeLoader):
34 """Custom YAML loader that handles duplicate null keys by converting them to a list."""
36 pass
39def dict_constructor(loader: yaml.Loader, node: yaml.Node) -> dict[Any, Any]:
40 """Custom dict constructor that preserves duplicate null keys in a list.
42 When a dict has multiple entries with None (null/~) as the key, instead of
43 overwriting them, collect them all in a list under the None key.
44 """
45 pairs = loader.construct_pairs(node)
46 result: dict[Any, Any] = {}
48 for key, value in pairs:
49 if key is None and key in result:
50 # Duplicate null key found
51 if not isinstance(result[None], list):
52 # Convert first occurrence to list
53 result[None] = [result[None]]
54 result[None].append(value)
55 elif key is None and any(k is None for k, _ in pairs[: pairs.index((key, value))]):
56 # This is not the first null key, skip (already handled above)
57 continue
58 else:
59 result[key] = value
61 return result
64# Register the custom constructor
65DuplicateKeySafeLoader.add_constructor(
66 yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, dict_constructor
67)
70class ColumnMapping:
71 """Mapping between CSV and database columns."""
73 def __init__(
74 self,
75 csv_column: str | None,
76 db_column: str,
77 data_type: str | None = None,
78 nullable: bool | None = None,
79 lookup: dict[str, Any] | None = None,
80 expression: str | None = None,
81 function: str | None = None,
82 input_columns: list[str] | None = None,
83 ) -> None:
84 """Initialize column mapping.
86 Args:
87 csv_column: Name of the column in the CSV file. Can be None for custom functions
88 creating new columns. Can be specified with expression/function to provide
89 a named reference for the transformation.
90 db_column: Name of the column in the database
91 data_type: Optional data type (integer, float, date, datetime, text, varchar(N))
92 nullable: Optional flag indicating if NULL values are allowed (True=NULL, False=NOT NULL)
93 lookup: Optional dictionary to map CSV values to database values.
94 Values not in the lookup are passed through unchanged.
95 Example: {"active": 1, "inactive": 0} to convert status strings to integers.
96 expression: Optional inline Python expression for custom calculations.
97 Example: "float(temperature) * 1.8 + 32" for Celsius to Fahrenheit
98 function: Optional reference to external function in format "module.function".
99 Example: "my_functions.calculate_percentage"
100 input_columns: List of CSV column names to pass as parameters to expression/function.
101 Required if expression or function is specified.
103 Raises:
104 ValueError: If both expression and function are specified, or if expression/function
105 is specified without input_columns.
106 """
107 # Validation
108 if expression is not None and function is not None:
109 raise ValueError("Cannot specify both 'expression' and 'function'")
111 if (expression is not None or function is not None) and not input_columns:
112 raise ValueError("Must specify 'input_columns' when using 'expression' or 'function'")
114 self.csv_column = csv_column
115 self.db_column = db_column
116 self.data_type = data_type
117 self.nullable = nullable
118 self.lookup = lookup
119 self.expression = expression
120 self.function = function
121 self.input_columns = input_columns
123 def apply_lookup(self, value: str) -> Any:
124 """Apply lookup transformation to a value.
126 Args:
127 value: The value from the CSV
129 Returns:
130 The transformed value if found in lookup, otherwise the original value
131 """
132 if self.lookup is None:
133 return value
134 return self.lookup.get(value, value)
136 def apply_custom_function(self, row_data: dict[str, Any]) -> Any:
137 """Apply custom function or expression to compute a value.
139 Args:
140 row_data: Dictionary of column name to value from the CSV row
142 Returns:
143 The computed value from the expression or function
145 Raises:
146 ValueError: If input columns are missing from row_data
147 RuntimeError: If expression evaluation or function execution fails
148 """
149 if self.expression is None and self.function is None:
150 raise RuntimeError("No expression or function defined for custom function")
152 if not self.input_columns:
153 raise RuntimeError("No input_columns defined for custom function")
155 # Collect input values
156 input_values = []
157 for col_name in self.input_columns:
158 if col_name not in row_data:
159 raise ValueError(f"Input column '{col_name}' not found in row data")
160 input_values.append(row_data[col_name])
162 if self.expression:
163 # Execute inline expression
164 # Create a namespace with only the input values and safe built-ins
165 namespace = {}
166 for i, col_name in enumerate(self.input_columns):
167 namespace[col_name] = input_values[i]
169 # Add safe built-in functions
170 namespace["__builtins__"] = {
171 "abs": abs,
172 "min": min,
173 "max": max,
174 "round": round,
175 "int": int,
176 "float": float,
177 "str": str,
178 "bool": bool,
179 "len": len,
180 }
182 try:
183 result = eval(self.expression, namespace)
184 return result
185 except Exception as e:
186 raise RuntimeError(f"Failed to evaluate expression '{self.expression}': {e}") from e
187 else:
188 # Execute external function
189 try:
190 # Parse module and function name
191 parts = self.function.split(".") # type: ignore[union-attr]
192 if len(parts) < 2:
193 raise ValueError(
194 f"Function must be in format 'module.function', got '{self.function}'"
195 )
197 module_name = ".".join(parts[:-1])
198 function_name = parts[-1]
200 # Import module and get function
201 module = importlib.import_module(module_name)
202 func = getattr(module, function_name)
204 # Call function with input values
205 result = func(*input_values)
206 return result
207 except Exception as e:
208 raise RuntimeError(f"Failed to execute function '{self.function}': {e}") from e
211class FilenameColumnMapping:
212 """Mapping for a single column extracted from filename."""
214 def __init__(
215 self,
216 name: str,
217 db_column: str | None = None,
218 data_type: str | None = None,
219 use_to_delete_old_rows: bool = False,
220 ) -> None:
221 """Initialize filename column mapping.
223 Args:
224 name: Name of the extracted value (from template/regex)
225 db_column: Database column name (defaults to name if not specified)
226 data_type: Data type (varchar(N), integer, float, date, datetime, text)
227 use_to_delete_old_rows: If True, this column is used to identify stale rows
228 """
229 self.name = name
230 self.db_column = db_column or name
231 self.data_type = data_type
232 self.use_to_delete_old_rows = use_to_delete_old_rows
235class FilenameToColumn:
236 """Configuration for extracting multiple values from filename."""
238 def __init__(
239 self,
240 columns: dict[str, FilenameColumnMapping],
241 template: str | None = None,
242 regex: str | None = None,
243 ) -> None:
244 """Initialize filename to column mapping.
246 Args:
247 columns: Dictionary of column name to FilenameColumnMapping
248 template: Filename template with [column_name] syntax (mutually exclusive with regex)
249 regex: Regex pattern with named groups (mutually exclusive with template)
251 Raises:
252 ValueError: If both template and regex are specified, or neither is specified
253 """
254 if (template is None) == (regex is None):
255 raise ValueError("Must specify exactly one of 'template' or 'regex'")
257 self.columns = columns
258 self.template = template
259 self.regex = regex
261 # Pre-compile regex
262 if template:
263 self._compiled_regex = self._template_to_regex(template)
264 else:
265 self._compiled_regex = re.compile(regex)
267 def _template_to_regex(self, template: str) -> re.Pattern:
268 r"""Convert template string to regex pattern.
270 Args:
271 template: Template string with [column_name] placeholders
273 Returns:
274 Compiled regex pattern with named groups
276 Example:
277 >>> mapping = FilenameToColumn(...)
278 >>> # Template: "[mission]level2[sensor]_[date].cdf"
279 >>> # becomes: "(?P<mission>.+?)level2(?P<sensor>.+?)_(?P<date>.+?)\.cdf"
280 """
281 # Escape special regex characters
282 escaped = re.escape(template)
283 # Replace \[column_name\] with named groups using non-greedy matching
284 pattern = re.sub(r"\\\[(\w+)\\\]", r"(?P<\1>.+?)", escaped)
285 return re.compile(pattern)
287 def extract_values_from_filename(self, filename: str | Path) -> dict[str, str] | None:
288 """Extract values from filename using template or regex.
290 Args:
291 filename: The filename (or path) to extract values from
293 Returns:
294 Dictionary of column name to extracted value, or None if no match
296 Example:
297 >>> mapping = FilenameToColumn(
298 ... columns={...},
299 ... template="[mission]level2[sensor]_[date]_v[version].cdf"
300 ... )
301 >>> mapping.extract_values_from_filename("imap_level2_primary_20000102_v002.cdf")
302 {'mission': 'imap', 'sensor': 'primary', 'date': '20000102', 'version': '002'}
303 """
304 if isinstance(filename, Path):
305 filename = filename.name
307 match = self._compiled_regex.search(filename)
308 if not match:
309 return None
311 return match.groupdict()
313 def get_delete_key_columns(self) -> list[str]:
314 """Get list of database column names used for stale row deletion.
316 Returns:
317 List of db_column names where use_to_delete_old_rows is True
318 """
319 return [col.db_column for col in self.columns.values() if col.use_to_delete_old_rows]
322class IndexColumn:
323 """Column definition for a database index."""
325 def __init__(self, column: str, order: str = "ASC") -> None:
326 """Initialize index column.
328 Args:
329 column: Column name
330 order: Sort order - 'ASC' or 'DESC' (default: 'ASC')
332 Raises:
333 ValueError: If order is not 'ASC' or 'DESC'
334 """
335 if order.upper() not in ("ASC", "DESC"):
336 raise ValueError(f"Index order must be 'ASC' or 'DESC', got '{order}'")
337 self.column = column
338 self.order = order.upper()
341class Index:
342 """Database index configuration."""
344 def __init__(self, name: str, columns: list[IndexColumn]) -> None:
345 """Initialize index.
347 Args:
348 name: Index name
349 columns: List of columns with sort order
351 Raises:
352 ValueError: If columns list is empty
353 """
354 if not columns:
355 raise ValueError("Index must have at least one column")
356 self.name = name
357 self.columns = columns
360class CrumpJob:
361 """Configuration for a single sync job."""
363 def __init__(
364 self,
365 name: str,
366 target_table: str,
367 id_mapping: list[ColumnMapping],
368 columns: list[ColumnMapping] | None = None,
369 filename_match: str | None = None,
370 filename_to_column: FilenameToColumn | None = None,
371 indexes: list[Index] | None = None,
372 sample_percentage: float | None = None,
373 failure_mode: FailureMode = FailureMode.PERMISSIVE,
374 ) -> None:
375 """Initialize a sync job.
377 Args:
378 name: Name of the job
379 target_table: Target database table name
380 id_mapping: List of mappings for ID columns (supports compound primary keys)
381 columns: List of column mappings to sync (all columns if None)
382 filename_to_column: Optional filename-to-column extraction configuration
383 filename_match: Allow automatic job selection from config file if this is specified
384 indexes: Optional list of database indexes to create
385 sample_percentage: Optional percentage of rows to sample (0-100). If None or 100,
386 syncs all rows. Values like 10 mean 1 in every 10 rows.
387 Always includes first and last row.
388 failure_mode: How to handle data/config mismatches. PERMISSIVE (default) tries to
389 import as much data as possible. STRICT rejects non-conforming rows.
390 """
391 self.name = name
392 self.target_table = target_table
393 self.id_mapping = id_mapping
394 self.columns = columns or []
395 self.filename_to_column = filename_to_column
396 self.indexes = indexes or []
397 self.sample_percentage = sample_percentage
398 self.filename_match = filename_match
399 self.failure_mode = failure_mode
401 # Validate sample_percentage
402 if sample_percentage is not None and not (0 <= sample_percentage <= 100):
403 raise ValueError(
404 f"sample_percentage must be between 0 and 100, got {sample_percentage}"
405 )
408class CrumpConfig:
409 """Configuration for data synchronization."""
411 def __init__(
412 self, jobs: dict[str, CrumpJob], id_column_matchers: list[str] | None = None
413 ) -> None:
414 """Initialize sync configuration.
416 Args:
417 jobs: Dictionary of job name to CrumpJob
418 id_column_matchers: Optional list of column name patterns to match as ID columns
419 (e.g., ['id', 'uuid', 'key']). If None, uses default patterns.
420 """
421 self.jobs = jobs
422 self.id_column_matchers = id_column_matchers
424 def get_job(self, name: str) -> CrumpJob | None:
425 """Get a job by name.
427 Args:
428 name: Name of the job
430 Returns:
431 CrumpJob if found, None otherwise
432 """
433 return self.jobs.get(name)
435 def get_job_or_auto_detect(
436 self, name: str | None = None, filename: str | None = None
437 ) -> tuple[CrumpJob, str] | None:
438 """Get a job by name, or auto-detect if there's only one job.
440 Args:
441 name: Name of the job (optional - if None, auto-detect single job)
443 Returns:
444 Tuple of (CrumpJob, job_name) if found/detected, None otherwise
446 Raises:
447 ValueError: If name is None and config has multiple jobs
448 """
449 if name is not None:
450 # Job name explicitly provided
451 job = self.jobs.get(name)
452 if job:
453 return (job, name)
454 return None
456 # Auto-detect: only allowed if there's exactly one job
457 if len(self.jobs) == 0:
458 return None
460 if len(self.jobs) == 1:
461 # Only one job - use it automatically
462 job_name = next(iter(self.jobs.keys()))
463 return (self.jobs[job_name], job_name)
465 if filename is not None:
466 # Try to auto-detect job based on filename_match
467 for job_name, job in self.jobs.items():
468 if job.filename_match:
469 # match using the full path as a string
470 if fnmatch.fnmatch(filename, job.filename_match):
471 return (job, job_name)
472 # or match using just the filename part
473 if fnmatch.fnmatch(Path(filename).name, job.filename_match):
474 return (job, job_name)
475 # support regex as well but check if it is a valid regex
476 try:
477 pattern = re.compile(job.filename_match)
478 if pattern.search(filename):
479 return (job, job_name)
480 except re.error:
481 continue
483 # No matching job found
485 # Multiple jobs - cannot auto-detect
486 raise ValueError(
487 f"Config contains {len(self.jobs)} jobs and unable to match one automatically. "
488 "Please specify --job to select which one to use or configure filename_match in the job config."
489 )
491 @classmethod
492 def from_yaml(cls, config_path: Path) -> CrumpConfig:
493 r"""Load configuration from a YAML file.
495 Args:
496 config_path: Path to the YAML configuration file
498 Returns:
499 CrumpConfig instance
501 Raises:
502 FileNotFoundError: If config file doesn't exist
503 ValueError: If config file is invalid
505 Example YAML structure:
506 id_column_matchers: # Optional, root-level config
507 - id
508 - uuid
509 - key
510 jobs:
511 my_job:
512 target_table: users
513 id_mapping:
514 user_id: id # Single column primary key
515 # Or for compound primary key:
516 # user_id: id
517 # tenant_id: tenant
518 columns:
519 name: full_name # Simple format
520 email: email_address
521 age:
522 db_column: user_age
523 type: integer
524 salary:
525 db_column: monthly_salary
526 type: float
527 sample_percentage: 10 # Optional: sync only 10% of rows (1 in 10)
528 # Always includes first and last row
529 # Omit or set to 100 to sync all rows
530 filename_to_column: # Optional: extract values from filename
531 template: "[mission]level2[sensor]_[date]_v[version].cdf"
532 # OR use regex with named groups:
533 # regex: "(?P<mission>[a-z]+)_level2_(?P<sensor>[a-z]+)_(?P<date>\\d{8})_v(?P<version>\\d+)\\.cdf"
534 columns:
535 mission:
536 db_column: mission_name
537 type: varchar(10)
538 sensor:
539 db_column: sensor_type
540 type: varchar(20)
541 date:
542 db_column: observation_date
543 type: date
544 use_to_delete_old_rows: true # Use this column to identify stale rows
545 version:
546 db_column: file_version
547 type: varchar(10)
548 indexes: # Optional
549 - name: idx_email
550 columns:
551 - column: email
552 order: ASC
553 - name: idx_observation_date
554 columns:
555 - column: observation_date
556 order: DESC
557 """
558 if not config_path.exists():
559 raise FileNotFoundError(f"Config file not found: {config_path}")
561 with open(config_path, encoding="utf-8") as f:
562 data = yaml.load(f, Loader=DuplicateKeySafeLoader)
564 if not data or "jobs" not in data:
565 raise ValueError("Config file must contain 'jobs' section")
567 # Parse optional id_column_matchers
568 id_column_matchers = data.get("id_column_matchers")
569 if id_column_matchers is not None and not isinstance(id_column_matchers, list):
570 raise ValueError("id_column_matchers must be a list of strings")
572 jobs = {}
573 for job_name, job_data in data["jobs"].items():
574 jobs[job_name] = cls._parse_job(job_name, job_data)
576 return cls(jobs=jobs, id_column_matchers=id_column_matchers)
578 @staticmethod
579 def _parse_column_mapping(csv_col: str | None, value: Any, job_name: str) -> ColumnMapping:
580 """Parse a column mapping from config value.
582 Supports multiple formats:
583 1. Simple: csv_column: db_column
584 2. Extended: csv_column: {db_column: name, type: data_type, nullable: true/false, lookup: {...}}
585 3. Custom function: null: {db_column: name, expression: "...", input_columns: [...]}
586 4. Custom function: null: {db_column: name, function: "module.func", input_columns: [...]}
588 Args:
589 csv_col: CSV column name (can be None for custom functions)
590 value: Either a string (db_column) or dict with db_column and optional fields
591 job_name: Job name (for error messages)
593 Returns:
594 ColumnMapping instance
596 Raises:
597 ValueError: If mapping format is invalid
598 """
599 if isinstance(value, str):
600 # Simple format: csv_column: db_column
601 if csv_col is None:
602 raise ValueError(
603 f"Job '{job_name}' cannot use simple format with null csv_column key"
604 )
605 return ColumnMapping(csv_column=csv_col, db_column=value)
606 elif isinstance(value, dict):
607 # Extended format with various options
608 if "db_column" not in value:
609 raise ValueError(
610 f"Job '{job_name}' column '{csv_col}' extended mapping must have 'db_column'"
611 )
612 db_column = value["db_column"]
613 data_type = value.get("type") # Optional
614 nullable = value.get("nullable") # Optional
615 lookup = value.get("lookup") # Optional
616 expression = value.get("expression") # Optional
617 function = value.get("function") # Optional
618 input_columns = value.get("input_columns") # Optional
620 # Validate lookup is a dict if provided
621 if lookup is not None and not isinstance(lookup, dict):
622 raise ValueError(f"Job '{job_name}' column '{csv_col}' lookup must be a dictionary")
624 # Validate expression is a string if provided
625 if expression is not None and not isinstance(expression, str):
626 raise ValueError(f"Job '{job_name}' column '{csv_col}' expression must be a string")
628 # Validate function is a string if provided
629 if function is not None and not isinstance(function, str):
630 raise ValueError(f"Job '{job_name}' column '{csv_col}' function must be a string")
632 # Validate input_columns is a list if provided
633 if input_columns is not None and not isinstance(input_columns, list):
634 raise ValueError(
635 f"Job '{job_name}' column '{csv_col}' input_columns must be a list"
636 )
638 return ColumnMapping(
639 csv_column=csv_col,
640 db_column=db_column,
641 data_type=data_type,
642 nullable=nullable,
643 lookup=lookup,
644 expression=expression,
645 function=function,
646 input_columns=input_columns,
647 )
648 else:
649 raise ValueError(
650 f"Job '{job_name}' column '{csv_col}' must be string or dict, got {type(value)}"
651 )
653 @staticmethod
654 def _parse_job(name: str, job_data: dict[str, Any]) -> CrumpJob:
655 """Parse a job from configuration data.
657 Args:
658 name: Name of the job
659 job_data: Job configuration dictionary
661 Returns:
662 CrumpJob instance
664 Raises:
665 ValueError: If job configuration is invalid
666 """
667 if "target_table" not in job_data:
668 raise ValueError(f"Job '{name}' missing 'target_table'")
670 if "id_mapping" not in job_data:
671 raise ValueError(f"Job '{name}' missing 'id_mapping'")
673 # Parse id_mapping as a dict: {csv_column: db_column} or {csv_column: {db_column: x, type: y}}
674 # Supports multiple columns for compound primary keys
675 id_data = job_data["id_mapping"]
676 if not isinstance(id_data, dict):
677 raise ValueError(f"Job '{name}' id_mapping must be a dictionary")
679 if len(id_data) < 1:
680 raise ValueError(f"Job '{name}' id_mapping must have at least one mapping")
682 id_mapping = []
683 for csv_col, value in id_data.items():
684 id_mapping.append(CrumpConfig._parse_column_mapping(csv_col, value, name))
686 # Parse columns as a dict: {csv_column: db_column} or {csv_column: {db_column: x, type: y}}
687 columns = []
688 if "columns" in job_data and job_data["columns"]:
689 col_data = job_data["columns"]
690 if not isinstance(col_data, dict):
691 raise ValueError(f"Job '{name}' columns must be a dictionary")
693 for csv_col, value in col_data.items():
694 # Handle multiple custom functions with null keys (collected as list)
695 if csv_col is None and isinstance(value, list):
696 for item in value:
697 columns.append(CrumpConfig._parse_column_mapping(csv_col, item, name))
698 else:
699 columns.append(CrumpConfig._parse_column_mapping(csv_col, value, name))
701 # Parse optional filename_to_column
702 filename_to_column = None
703 if "filename_to_column" in job_data and job_data["filename_to_column"]:
704 ftc_data = job_data["filename_to_column"]
705 if not isinstance(ftc_data, dict):
706 raise ValueError(f"Job '{name}' filename_to_column must be a dictionary")
708 # Check that exactly one of template or regex is specified
709 has_template = "template" in ftc_data and ftc_data["template"]
710 has_regex = "regex" in ftc_data and ftc_data["regex"]
712 if not has_template and not has_regex:
713 raise ValueError(
714 f"Job '{name}' filename_to_column must have either 'template' or 'regex'"
715 )
717 if has_template and has_regex:
718 raise ValueError(
719 f"Job '{name}' filename_to_column cannot have both 'template' and 'regex'"
720 )
722 # Parse columns
723 if "columns" not in ftc_data or not ftc_data["columns"]:
724 raise ValueError(f"Job '{name}' filename_to_column must have 'columns'")
726 if not isinstance(ftc_data["columns"], dict):
727 raise ValueError(f"Job '{name}' filename_to_column columns must be a dictionary")
729 ftc_columns = {}
730 for col_name, col_data in ftc_data["columns"].items():
731 if isinstance(col_data, dict):
732 db_column = col_data.get("db_column")
733 data_type = col_data.get("type")
734 use_to_delete_old_rows = col_data.get("use_to_delete_old_rows", False)
735 elif col_data is None:
736 # Simple format: column_name: null (use defaults)
737 db_column = None
738 data_type = None
739 use_to_delete_old_rows = False
740 else:
741 raise ValueError(
742 f"Job '{name}' filename_to_column column '{col_name}' must be a dictionary or null"
743 )
745 ftc_columns[col_name] = FilenameColumnMapping(
746 name=col_name,
747 db_column=db_column,
748 data_type=data_type,
749 use_to_delete_old_rows=use_to_delete_old_rows,
750 )
752 filename_to_column = FilenameToColumn(
753 columns=ftc_columns,
754 template=ftc_data.get("template"),
755 regex=ftc_data.get("regex"),
756 )
758 # Parse optional indexes
759 indexes = []
760 if "indexes" in job_data and job_data["indexes"]:
761 indexes_data = job_data["indexes"]
762 if not isinstance(indexes_data, list):
763 raise ValueError(f"Job '{name}' indexes must be a list")
765 for idx_data in indexes_data:
766 if not isinstance(idx_data, dict):
767 raise ValueError(f"Job '{name}' index entry must be a dictionary")
769 if "name" not in idx_data:
770 raise ValueError(f"Job '{name}' index missing 'name'")
772 if "columns" not in idx_data:
773 raise ValueError(f"Job '{name}' index missing 'columns'")
775 idx_columns = []
776 for col_data in idx_data["columns"]:
777 if not isinstance(col_data, dict):
778 raise ValueError(f"Job '{name}' index column must be a dictionary")
780 if "column" not in col_data:
781 raise ValueError(f"Job '{name}' index column missing 'column' field")
783 order = col_data.get("order", "ASC")
784 idx_columns.append(IndexColumn(column=col_data["column"], order=order))
786 indexes.append(Index(name=idx_data["name"], columns=idx_columns))
788 # Parse optional sample_percentage
789 sample_percentage = None
790 if "sample_percentage" in job_data and job_data["sample_percentage"] is not None:
791 sample_percentage = job_data["sample_percentage"]
792 # Validate it's a number
793 if not isinstance(sample_percentage, (int, float)):
794 raise ValueError(f"Job '{name}' sample_percentage must be a number")
795 if not (0 <= sample_percentage <= 100):
796 raise ValueError(
797 f"Job '{name}' sample_percentage must be between 0 and 100, got {sample_percentage}"
798 )
800 filename_match = None
801 if "filename_match" in job_data and job_data["filename_match"]:
802 filename_match = job_data["filename_match"]
803 if not isinstance(filename_match, str):
804 raise ValueError(f"Job '{name}' filename_match must be a string")
806 # Parse optional failure_mode (default: permissive)
807 failure_mode = FailureMode.PERMISSIVE
808 if "failure_mode" in job_data and job_data["failure_mode"] is not None:
809 fm_value = job_data["failure_mode"]
810 if not isinstance(fm_value, str):
811 raise ValueError(f"Job '{name}' failure_mode must be a string")
812 fm_lower = fm_value.lower().strip()
813 if fm_lower == "strict":
814 failure_mode = FailureMode.STRICT
815 elif fm_lower == "permissive":
816 failure_mode = FailureMode.PERMISSIVE
817 else:
818 raise ValueError(
819 f"Job '{name}' failure_mode must be 'strict' or 'permissive', got '{fm_value}'"
820 )
822 return CrumpJob(
823 name=name,
824 target_table=job_data["target_table"],
825 id_mapping=id_mapping,
826 columns=columns if columns else None,
827 filename_to_column=filename_to_column,
828 filename_match=filename_match,
829 indexes=indexes if indexes else None,
830 sample_percentage=sample_percentage,
831 failure_mode=failure_mode,
832 )
834 def add_or_update_job(self, job: CrumpJob, force: bool = False) -> bool:
835 """Add a new job or update an existing one.
837 Args:
838 job: CrumpJob to add or update
839 force: If True, overwrite existing job. If False, raise error if job exists.
841 Returns:
842 True if job was added/updated
844 Raises:
845 ValueError: If job already exists and force=False
846 """
847 if job.name in self.jobs and not force:
848 raise ValueError(f"Job '{job.name}' already exists. Use force=True to overwrite.")
850 self.jobs[job.name] = job
851 return True
853 def to_yaml_dict(self) -> dict[str, Any]:
854 """Convert config to dictionary suitable for YAML serialization.
856 Returns:
857 Dictionary representation of config
858 """
859 jobs_dict = {}
861 for job_name, job in self.jobs.items():
862 # Build id_mapping dict (supports compound primary keys)
863 id_mapping_dict: dict[str, Any] = {}
864 for id_col in job.id_mapping:
865 needs_extended = (
866 id_col.data_type
867 or id_col.nullable is not None
868 or id_col.lookup is not None
869 or id_col.expression is not None
870 or id_col.function is not None
871 or id_col.input_columns is not None
872 )
873 if needs_extended:
874 mapping_dict: dict[str, Any] = {"db_column": id_col.db_column}
875 if id_col.data_type:
876 mapping_dict["type"] = id_col.data_type
877 if id_col.nullable is not None:
878 mapping_dict["nullable"] = id_col.nullable
879 if id_col.lookup is not None:
880 mapping_dict["lookup"] = id_col.lookup
881 if id_col.expression is not None:
882 mapping_dict["expression"] = id_col.expression
883 if id_col.function is not None:
884 mapping_dict["function"] = id_col.function
885 if id_col.input_columns is not None:
886 mapping_dict["input_columns"] = id_col.input_columns
887 # Use None as key for custom functions (no csv_column)
888 key = id_col.csv_column if id_col.csv_column is not None else None
889 id_mapping_dict[key] = mapping_dict # type: ignore[index]
890 else:
891 id_mapping_dict[id_col.csv_column] = id_col.db_column # type: ignore[index]
893 job_dict: dict[str, Any] = {
894 "target_table": job.target_table,
895 "id_mapping": id_mapping_dict,
896 }
898 # Add columns if present
899 if job.columns:
900 columns_dict: dict[str, Any] = {}
901 for col in job.columns:
902 needs_extended = (
903 col.data_type
904 or col.nullable is not None
905 or col.lookup is not None
906 or col.expression is not None
907 or col.function is not None
908 or col.input_columns is not None
909 )
910 if needs_extended:
911 mapping_dict = {"db_column": col.db_column}
912 if col.data_type:
913 mapping_dict["type"] = col.data_type
914 if col.nullable is not None:
915 mapping_dict["nullable"] = col.nullable
916 if col.lookup is not None:
917 mapping_dict["lookup"] = col.lookup
918 if col.expression is not None:
919 mapping_dict["expression"] = col.expression
920 if col.function is not None:
921 mapping_dict["function"] = col.function
922 if col.input_columns is not None:
923 mapping_dict["input_columns"] = col.input_columns
924 # Use None as key for custom functions (no csv_column)
925 key = col.csv_column if col.csv_column is not None else None
926 columns_dict[key] = mapping_dict # type: ignore[index]
927 else:
928 columns_dict[col.csv_column] = col.db_column # type: ignore[index]
929 job_dict["columns"] = columns_dict
931 # Add filename_to_column if present
932 if job.filename_to_column:
933 ftc_dict: dict[str, Any] = {}
934 if job.filename_to_column.template:
935 ftc_dict["template"] = job.filename_to_column.template
936 else:
937 ftc_dict["regex"] = job.filename_to_column.regex
939 ftc_columns_dict: dict[str, Any] = {}
940 for col_name, col_mapping in job.filename_to_column.columns.items():
941 col_dict: dict[str, Any] = {}
942 if col_mapping.db_column != col_mapping.name:
943 col_dict["db_column"] = col_mapping.db_column
944 if col_mapping.data_type:
945 col_dict["type"] = col_mapping.data_type
946 if col_mapping.use_to_delete_old_rows:
947 col_dict["use_to_delete_old_rows"] = True
949 # If col_dict is empty, use None to keep it minimal
950 ftc_columns_dict[col_name] = col_dict if col_dict else None
952 ftc_dict["columns"] = ftc_columns_dict
953 job_dict["filename_to_column"] = ftc_dict
955 # Add indexes if present
956 if job.indexes:
957 indexes_list = []
958 for index in job.indexes:
959 index_dict = {
960 "name": index.name,
961 "columns": [
962 {"column": col.column, "order": col.order} for col in index.columns
963 ],
964 }
965 indexes_list.append(index_dict)
966 job_dict["indexes"] = indexes_list
968 # Add sample_percentage if present and not default
969 if job.sample_percentage is not None and job.sample_percentage != 100:
970 job_dict["sample_percentage"] = job.sample_percentage
972 # Add failure_mode if not the default (permissive)
973 if job.failure_mode != FailureMode.PERMISSIVE:
974 job_dict["failure_mode"] = job.failure_mode.value
976 jobs_dict[job_name] = job_dict
978 result: dict[str, Any] = {"jobs": jobs_dict}
980 # Add id_column_matchers if present
981 if self.id_column_matchers is not None:
982 result["id_column_matchers"] = self.id_column_matchers
984 return result
986 def save_to_yaml(self, config_path: Path) -> None:
987 """Save configuration to a YAML file.
989 Args:
990 config_path: Path to save the YAML file
991 """
992 config_dict = self.to_yaml_dict()
994 with open(config_path, "w", encoding="utf-8") as f:
995 yaml.dump(config_dict, f, default_flow_style=False, sort_keys=False)
998def apply_row_transformations(
999 row: dict[str, Any],
1000 sync_columns: list[ColumnMapping],
1001 filename_to_column: FilenameToColumn | None = None,
1002 filename_values: dict[str, str] | None = None,
1003) -> dict[str, Any]:
1004 """Apply column transformations to a CSV row.
1006 This is a shared helper function used by both sync and extract operations
1007 to apply the same transformations consistently.
1009 Args:
1010 row: Dictionary representing a CSV row (column_name -> value)
1011 sync_columns: List of ColumnMapping objects defining transformations
1012 filename_to_column: Optional FilenameToColumn configuration
1013 filename_values: Optional dict of values extracted from filename
1015 Returns:
1016 Dictionary with transformed values (db_column_name -> value)
1017 """
1018 row_data = {}
1020 # Process each column mapping
1021 for col_mapping in sync_columns:
1022 # Check if this column uses a custom function/expression
1023 if col_mapping.expression or col_mapping.function:
1024 # Apply custom function/expression
1025 row_data[col_mapping.db_column] = col_mapping.apply_custom_function(row)
1026 elif col_mapping.csv_column and col_mapping.csv_column in row:
1027 csv_value = row[col_mapping.csv_column]
1028 # Apply lookup transformation if configured
1029 row_data[col_mapping.db_column] = col_mapping.apply_lookup(csv_value)
1030 elif col_mapping.csv_column and col_mapping.csv_column not in row:
1031 # CSV column is missing from this row - mark as None for validation
1032 row_data[col_mapping.db_column] = None
1034 # Add filename values if configured
1035 if filename_to_column and filename_values:
1036 for col_name, filename_col_mapping in filename_to_column.columns.items():
1037 if col_name in filename_values:
1038 row_data[filename_col_mapping.db_column] = filename_values[col_name]
1040 return row_data