Coverage for src / crump / cli_sync.py: 75%
140 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-11 14:40 +0000
1"""Sync command for syncing CSV, Parquet, and CDF files to database."""
3import tempfile
4from pathlib import Path
6import click
7from rich.console import Console
9from crump.cdf_extractor import extract_cdf_to_tabular_file
10from crump.config import CrumpConfig
11from crump.console_utils import BULLET, CHECKMARK, HORIZONTAL_LINE
12from crump.database import sync_file_to_db, sync_file_to_db_dry_run
14console = Console()
17def _extract_cdf_and_find_tabular_files(
18 cdf_file: Path, temp_dir: Path, max_records: int | None = None
19) -> list[Path]:
20 """Extract CDF file to temporary tabular files (CSV).
22 Args:
23 cdf_file: Path to CDF file
24 temp_dir: Temporary directory for file extraction
25 max_records: Maximum number of records to extract per variable (None = all)
27 Returns:
28 List of extracted file paths
30 Raises:
31 ValueError: If extraction fails
32 """
33 console.print("[dim] Extracting CDF data to temporary files...[/dim]")
35 if max_records is not None:
36 console.print(f"[dim] Max records per variable: {max_records:,}[/dim]")
38 try:
39 # Extract data from CDF
40 results = extract_cdf_to_tabular_file(
41 cdf_file_path=cdf_file,
42 output_dir=temp_dir,
43 filename_template=f"{cdf_file.stem}_[VARIABLE_NAME].csv",
44 automerge=True,
45 append=False,
46 variable_names=None,
47 max_records=max_records,
48 )
50 if not results:
51 raise ValueError("No data could be extracted from CDF file")
53 csv_files = [result.output_file for result in results]
54 console.print(f"[dim] Extracted {len(csv_files)} CSV file(s) from CDF[/dim]")
56 return csv_files
58 except Exception as e:
59 raise ValueError(f"Failed to extract CDF file: {e}") from e
62@click.command()
63@click.argument("file_path", type=click.Path(exists=True, path_type=Path), required=True)
64@click.option(
65 "--config",
66 "-c",
67 type=click.Path(exists=True, path_type=Path),
68 required=True,
69 help="Path to the YAML configuration file",
70)
71@click.option(
72 "--job",
73 "-j",
74 type=str,
75 default=None,
76 help="Job name from config file (optional - auto-detected if config contains only one job)",
77)
78@click.option(
79 "--db-url",
80 envvar="DATABASE_URL",
81 required=True,
82 help="PostgreSQL connection string (or set DATABASE_URL env var)",
83)
84@click.option(
85 "--dry-run",
86 is_flag=True,
87 default=False,
88 help="Simulate the sync without making any database changes",
89)
90@click.option(
91 "--max-records",
92 type=int,
93 default=None,
94 help="Maximum number of records to extract per variable from CDF files (default: extract all records)",
95)
96@click.option(
97 "--history/--no-history",
98 default=False,
99 help="Record sync history in _crump_history table (default: no history)",
100)
101def sync(
102 file_path: Path,
103 config: Path,
104 job: str | None,
105 db_url: str,
106 dry_run: bool,
107 max_records: int | None,
108 history: bool,
109) -> None:
110 """Sync a CSV, Parquet, or CDF file to the database using a configuration.
112 Supports CSV, Parquet, and CDF file formats. For CDF files, data is automatically
113 extracted to temporary CSV files before syncing to the database.
115 If the config file contains only one job, the --job parameter is optional
116 and will be auto-detected.
118 Arguments:
119 FILE_PATH: Path to the CSV, Parquet, or CDF file to sync (required)
121 Options:
122 --config, -c: Path to the YAML configuration file (required)
123 --job, -j: Name of the job to run from config (optional - auto-detected if single job)
125 Examples:
126 # Sync CSV file with explicit job name
127 crump sync data.csv --config crump_config.yml --job my_job --db-url postgresql://localhost/mydb
129 # Sync Parquet file
130 crump sync data.parquet -c crump_config.yml -j my_job --db-url postgresql://localhost/mydb
132 # Sync with auto-detected job (when config has only one job)
133 crump sync data.csv --config crump_config.yml --db-url postgresql://localhost/mydb
135 # Sync a CDF file (extracts to CSV automatically)
136 crump sync data.cdf -c crump_config.yml -j my_job --db-url postgresql://localhost/mydb
138 # Sync CDF with limited records (useful for testing)
139 crump sync data.cdf --config crump_config.yml --job my_job --db-url postgresql://localhost/mydb --max-records 200
141 # Using environment variable
142 export DATABASE_URL=postgresql://localhost/mydb
143 crump sync data.parquet --config crump_config.yml --job my_job
145 # Dry-run mode to preview changes
146 crump sync data.parquet -c crump_config.yml -j my_job --dry-run
148 # Dry-run with limited records from CDF and auto-detected job
149 crump sync data.cdf --config crump_config.yml --dry-run --max-records 100
151 # Enable history tracking
152 crump sync data.parquet --config crump_config.yml --job my_job --history
153 """
154 temp_dir: Path | None = None
155 temp_csv_files: list[Path] = []
157 try:
158 # Load configuration
159 crump_config = CrumpConfig.from_yaml(config)
161 # Get the specified job or auto-detect if there's only one
162 try:
163 result = crump_config.get_job_or_auto_detect(job, filename=file_path.as_posix())
164 if not result:
165 if job:
166 available_jobs = ", ".join(crump_config.jobs.keys())
167 console.print(f"[red]Error:[/red] Job '{job}' not found in config")
168 console.print(f"[dim]Available jobs: {available_jobs}[/dim]")
169 else:
170 console.print("[red]Error:[/red] Config file contains no jobs")
171 raise click.Abort()
173 crump_job, detected_job_name = result
175 # Inform user if we auto-detected the job
176 if job is None:
177 console.print(f"[dim]Auto-detected job: {detected_job_name}[/dim]")
179 except ValueError as e:
180 # Multiple jobs found, need explicit job name
181 available_jobs = ", ".join(crump_config.jobs.keys())
182 console.print(f"[red]Error:[/red] {e}")
183 console.print(f"[dim]Available jobs: {available_jobs}[/dim]")
184 raise click.Abort() from e
186 # Check if input file is CDF - if so, extract to temporary CSV
187 csv_file_to_sync = file_path
188 if file_path.suffix.lower() == ".cdf":
189 console.print(f"[cyan]Processing CDF file: {file_path.name}[/cyan]")
191 # Create temporary directory for CSV extraction
192 temp_dir = Path(tempfile.mkdtemp(prefix="crump_cdf_"))
194 # Extract CDF to temporary CSV files
195 temp_csv_files = _extract_cdf_and_find_tabular_files(file_path, temp_dir, max_records)
197 # Find the CSV file that matches this job's configuration
198 # Try each extracted CSV to see which one works with this job
199 matching_csv = None
200 for csv_file in temp_csv_files:
201 # We'll try to use this CSV - if it fails due to column mismatch,
202 # we'll try the next one
203 matching_csv = csv_file
204 csv_file_to_sync = csv_file
205 console.print(f"[dim] Using extracted CSV: {csv_file.name}[/dim]")
206 break
208 if not matching_csv:
209 console.print("[red]Error:[/red] No suitable CSV data found in CDF file")
210 raise click.Abort()
212 # Extract values from filename if filename_to_column is configured
213 # Use the CSV filename for extraction (which might be extracted from CDF)
214 filename_values = None
215 if crump_job.filename_to_column:
216 filename_values = crump_job.filename_to_column.extract_values_from_filename(
217 csv_file_to_sync
218 )
219 if not filename_values:
220 # For CDF files, filename extraction might not work because the extracted
221 # CSV has a different name. This is OK - just skip filename extraction
222 if file_path.suffix.lower() == ".cdf":
223 console.print(
224 f"[dim] Note: Could not extract values from '{csv_file_to_sync.name}' "
225 f"(extracted from CDF). Skipping filename-based metadata.[/dim]"
226 )
227 else:
228 # For CSV files, filename extraction failure is an error
229 console.print(
230 f"[red]Error:[/red] Could not extract values from filename '{csv_file_to_sync.name}'"
231 )
232 pattern = (
233 crump_job.filename_to_column.template
234 if crump_job.filename_to_column.template
235 else crump_job.filename_to_column.regex
236 )
237 console.print(f"[dim] Pattern: {pattern}[/dim]")
238 raise click.Abort()
239 elif filename_values:
240 console.print(f"[dim] Extracted values: {filename_values}[/dim]")
242 # Perform sync or dry-run
243 if dry_run:
244 console.print(
245 f"[cyan]DRY RUN: Simulating sync of {csv_file_to_sync.name} using job '{job}'...[/cyan]"
246 )
247 # History is never recorded during dry-run
248 summary = sync_file_to_db_dry_run(csv_file_to_sync, crump_job, db_url, filename_values)
250 # Display dry-run summary
251 console.print("\n[bold yellow]Dry-run Summary[/bold yellow]")
252 console.print(f"[dim]{HORIZONTAL_LINE * 60}[/dim]")
254 # Schema changes
255 if not summary.table_exists:
256 console.print(
257 f"[yellow] {BULLET} Table '{summary.table_name}' would be CREATED[/yellow]"
258 )
259 else:
260 console.print(f"[green] {BULLET} Table '{summary.table_name}' exists[/green]")
262 if summary.new_columns:
263 console.print(
264 f"[yellow] {BULLET} {len(summary.new_columns)} column(s) would be ADDED:[/yellow]"
265 )
266 for col_name, col_type in summary.new_columns:
267 console.print(f"[dim] - {col_name} ({col_type})[/dim]")
268 else:
269 console.print(f"[green] {BULLET} No new columns needed[/green]")
271 if summary.new_indexes:
272 console.print(
273 f"[yellow] {BULLET} {len(summary.new_indexes)} index(es) would be CREATED:[/yellow]"
274 )
275 for idx_name in summary.new_indexes:
276 console.print(f"[dim] - {idx_name}[/dim]")
277 else:
278 console.print(f"[green] {BULLET} No new indexes needed[/green]")
280 # Data changes
281 console.print("\n[bold]Data Changes:[/bold]")
282 console.print(
283 f"[green] {BULLET} {summary.rows_to_sync} row(s) would be inserted/updated[/green]"
284 )
286 if filename_values and summary.rows_to_delete > 0:
287 console.print(
288 f"[yellow] {BULLET} {summary.rows_to_delete} stale row(s) would be deleted[/yellow]"
289 )
290 elif filename_values:
291 console.print(f"[green] {BULLET} No stale rows to delete[/green]")
293 console.print(
294 f"\n[bold green]{CHECKMARK} Dry-run complete - no changes made to database[/bold green]"
295 )
296 console.print(f"[dim] Source file: {file_path}[/dim]")
297 if csv_file_to_sync != file_path:
298 console.print(f"[dim] CSV extracted: {csv_file_to_sync.name}[/dim]")
299 if filename_values:
300 console.print(f"[dim] Extracted values: {filename_values}[/dim]")
301 else:
302 # Sync the file
303 console.print(f"[cyan]Syncing {csv_file_to_sync.name} using job '{job}'...[/cyan]")
304 rows_synced = sync_file_to_db(
305 csv_file_to_sync, crump_job, db_url, filename_values, enable_history=history
306 )
308 console.print(f"[green]{CHECKMARK} Successfully synced {rows_synced} rows[/green]")
309 console.print(f"[dim] Table: {crump_job.target_table}[/dim]")
310 console.print(f"[dim] Source file: {file_path}[/dim]")
311 if csv_file_to_sync != file_path:
312 console.print(f"[dim] CSV extracted: {csv_file_to_sync.name}[/dim]")
313 if filename_values:
314 console.print(f"[dim] Extracted values: {filename_values}[/dim]")
315 if history:
316 console.print("[dim] History recorded in _crump_history table[/dim]")
318 except FileNotFoundError as e:
319 console.print(f"[red]Error:[/red] {e}")
320 raise click.Abort() from e
321 except ValueError as e:
322 console.print(f"[red]Error:[/red] {e}")
323 raise click.Abort() from e
324 except Exception as e:
325 console.print(f"[red]Unexpected error:[/red] {e}")
326 raise click.Abort() from e
327 finally:
328 # Clean up temporary files if CDF was extracted
329 if temp_csv_files:
330 console.print("[dim]Cleaning up temporary files...[/dim]")
331 for temp_file in temp_csv_files:
332 try:
333 temp_file.unlink()
334 except Exception as e:
335 console.print(f"[yellow]Warning: Could not delete {temp_file}: {e}[/yellow]")
337 # Clean up temporary directory
338 if temp_dir and temp_dir.exists():
339 try:
340 temp_dir.rmdir()
341 except Exception as e:
342 console.print(f"[yellow]Warning: Could not delete temp directory: {e}[/yellow]")