Coverage for src / crump / cli_sync.py: 75%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-11 14:40 +0000

1"""Sync command for syncing CSV, Parquet, and CDF files to database.""" 

2 

3import tempfile 

4from pathlib import Path 

5 

6import click 

7from rich.console import Console 

8 

9from crump.cdf_extractor import extract_cdf_to_tabular_file 

10from crump.config import CrumpConfig 

11from crump.console_utils import BULLET, CHECKMARK, HORIZONTAL_LINE 

12from crump.database import sync_file_to_db, sync_file_to_db_dry_run 

13 

14console = Console() 

15 

16 

17def _extract_cdf_and_find_tabular_files( 

18 cdf_file: Path, temp_dir: Path, max_records: int | None = None 

19) -> list[Path]: 

20 """Extract CDF file to temporary tabular files (CSV). 

21 

22 Args: 

23 cdf_file: Path to CDF file 

24 temp_dir: Temporary directory for file extraction 

25 max_records: Maximum number of records to extract per variable (None = all) 

26 

27 Returns: 

28 List of extracted file paths 

29 

30 Raises: 

31 ValueError: If extraction fails 

32 """ 

33 console.print("[dim] Extracting CDF data to temporary files...[/dim]") 

34 

35 if max_records is not None: 

36 console.print(f"[dim] Max records per variable: {max_records:,}[/dim]") 

37 

38 try: 

39 # Extract data from CDF 

40 results = extract_cdf_to_tabular_file( 

41 cdf_file_path=cdf_file, 

42 output_dir=temp_dir, 

43 filename_template=f"{cdf_file.stem}_[VARIABLE_NAME].csv", 

44 automerge=True, 

45 append=False, 

46 variable_names=None, 

47 max_records=max_records, 

48 ) 

49 

50 if not results: 

51 raise ValueError("No data could be extracted from CDF file") 

52 

53 csv_files = [result.output_file for result in results] 

54 console.print(f"[dim] Extracted {len(csv_files)} CSV file(s) from CDF[/dim]") 

55 

56 return csv_files 

57 

58 except Exception as e: 

59 raise ValueError(f"Failed to extract CDF file: {e}") from e 

60 

61 

62@click.command() 

63@click.argument("file_path", type=click.Path(exists=True, path_type=Path), required=True) 

64@click.option( 

65 "--config", 

66 "-c", 

67 type=click.Path(exists=True, path_type=Path), 

68 required=True, 

69 help="Path to the YAML configuration file", 

70) 

71@click.option( 

72 "--job", 

73 "-j", 

74 type=str, 

75 default=None, 

76 help="Job name from config file (optional - auto-detected if config contains only one job)", 

77) 

78@click.option( 

79 "--db-url", 

80 envvar="DATABASE_URL", 

81 required=True, 

82 help="PostgreSQL connection string (or set DATABASE_URL env var)", 

83) 

84@click.option( 

85 "--dry-run", 

86 is_flag=True, 

87 default=False, 

88 help="Simulate the sync without making any database changes", 

89) 

90@click.option( 

91 "--max-records", 

92 type=int, 

93 default=None, 

94 help="Maximum number of records to extract per variable from CDF files (default: extract all records)", 

95) 

96@click.option( 

97 "--history/--no-history", 

98 default=False, 

99 help="Record sync history in _crump_history table (default: no history)", 

100) 

101def sync( 

102 file_path: Path, 

103 config: Path, 

104 job: str | None, 

105 db_url: str, 

106 dry_run: bool, 

107 max_records: int | None, 

108 history: bool, 

109) -> None: 

110 """Sync a CSV, Parquet, or CDF file to the database using a configuration. 

111 

112 Supports CSV, Parquet, and CDF file formats. For CDF files, data is automatically 

113 extracted to temporary CSV files before syncing to the database. 

114 

115 If the config file contains only one job, the --job parameter is optional 

116 and will be auto-detected. 

117 

118 Arguments: 

119 FILE_PATH: Path to the CSV, Parquet, or CDF file to sync (required) 

120 

121 Options: 

122 --config, -c: Path to the YAML configuration file (required) 

123 --job, -j: Name of the job to run from config (optional - auto-detected if single job) 

124 

125 Examples: 

126 # Sync CSV file with explicit job name 

127 crump sync data.csv --config crump_config.yml --job my_job --db-url postgresql://localhost/mydb 

128 

129 # Sync Parquet file 

130 crump sync data.parquet -c crump_config.yml -j my_job --db-url postgresql://localhost/mydb 

131 

132 # Sync with auto-detected job (when config has only one job) 

133 crump sync data.csv --config crump_config.yml --db-url postgresql://localhost/mydb 

134 

135 # Sync a CDF file (extracts to CSV automatically) 

136 crump sync data.cdf -c crump_config.yml -j my_job --db-url postgresql://localhost/mydb 

137 

138 # Sync CDF with limited records (useful for testing) 

139 crump sync data.cdf --config crump_config.yml --job my_job --db-url postgresql://localhost/mydb --max-records 200 

140 

141 # Using environment variable 

142 export DATABASE_URL=postgresql://localhost/mydb 

143 crump sync data.parquet --config crump_config.yml --job my_job 

144 

145 # Dry-run mode to preview changes 

146 crump sync data.parquet -c crump_config.yml -j my_job --dry-run 

147 

148 # Dry-run with limited records from CDF and auto-detected job 

149 crump sync data.cdf --config crump_config.yml --dry-run --max-records 100 

150 

151 # Enable history tracking 

152 crump sync data.parquet --config crump_config.yml --job my_job --history 

153 """ 

154 temp_dir: Path | None = None 

155 temp_csv_files: list[Path] = [] 

156 

157 try: 

158 # Load configuration 

159 crump_config = CrumpConfig.from_yaml(config) 

160 

161 # Get the specified job or auto-detect if there's only one 

162 try: 

163 result = crump_config.get_job_or_auto_detect(job, filename=file_path.as_posix()) 

164 if not result: 

165 if job: 

166 available_jobs = ", ".join(crump_config.jobs.keys()) 

167 console.print(f"[red]Error:[/red] Job '{job}' not found in config") 

168 console.print(f"[dim]Available jobs: {available_jobs}[/dim]") 

169 else: 

170 console.print("[red]Error:[/red] Config file contains no jobs") 

171 raise click.Abort() 

172 

173 crump_job, detected_job_name = result 

174 

175 # Inform user if we auto-detected the job 

176 if job is None: 

177 console.print(f"[dim]Auto-detected job: {detected_job_name}[/dim]") 

178 

179 except ValueError as e: 

180 # Multiple jobs found, need explicit job name 

181 available_jobs = ", ".join(crump_config.jobs.keys()) 

182 console.print(f"[red]Error:[/red] {e}") 

183 console.print(f"[dim]Available jobs: {available_jobs}[/dim]") 

184 raise click.Abort() from e 

185 

186 # Check if input file is CDF - if so, extract to temporary CSV 

187 csv_file_to_sync = file_path 

188 if file_path.suffix.lower() == ".cdf": 

189 console.print(f"[cyan]Processing CDF file: {file_path.name}[/cyan]") 

190 

191 # Create temporary directory for CSV extraction 

192 temp_dir = Path(tempfile.mkdtemp(prefix="crump_cdf_")) 

193 

194 # Extract CDF to temporary CSV files 

195 temp_csv_files = _extract_cdf_and_find_tabular_files(file_path, temp_dir, max_records) 

196 

197 # Find the CSV file that matches this job's configuration 

198 # Try each extracted CSV to see which one works with this job 

199 matching_csv = None 

200 for csv_file in temp_csv_files: 

201 # We'll try to use this CSV - if it fails due to column mismatch, 

202 # we'll try the next one 

203 matching_csv = csv_file 

204 csv_file_to_sync = csv_file 

205 console.print(f"[dim] Using extracted CSV: {csv_file.name}[/dim]") 

206 break 

207 

208 if not matching_csv: 

209 console.print("[red]Error:[/red] No suitable CSV data found in CDF file") 

210 raise click.Abort() 

211 

212 # Extract values from filename if filename_to_column is configured 

213 # Use the CSV filename for extraction (which might be extracted from CDF) 

214 filename_values = None 

215 if crump_job.filename_to_column: 

216 filename_values = crump_job.filename_to_column.extract_values_from_filename( 

217 csv_file_to_sync 

218 ) 

219 if not filename_values: 

220 # For CDF files, filename extraction might not work because the extracted 

221 # CSV has a different name. This is OK - just skip filename extraction 

222 if file_path.suffix.lower() == ".cdf": 

223 console.print( 

224 f"[dim] Note: Could not extract values from '{csv_file_to_sync.name}' " 

225 f"(extracted from CDF). Skipping filename-based metadata.[/dim]" 

226 ) 

227 else: 

228 # For CSV files, filename extraction failure is an error 

229 console.print( 

230 f"[red]Error:[/red] Could not extract values from filename '{csv_file_to_sync.name}'" 

231 ) 

232 pattern = ( 

233 crump_job.filename_to_column.template 

234 if crump_job.filename_to_column.template 

235 else crump_job.filename_to_column.regex 

236 ) 

237 console.print(f"[dim] Pattern: {pattern}[/dim]") 

238 raise click.Abort() 

239 elif filename_values: 

240 console.print(f"[dim] Extracted values: {filename_values}[/dim]") 

241 

242 # Perform sync or dry-run 

243 if dry_run: 

244 console.print( 

245 f"[cyan]DRY RUN: Simulating sync of {csv_file_to_sync.name} using job '{job}'...[/cyan]" 

246 ) 

247 # History is never recorded during dry-run 

248 summary = sync_file_to_db_dry_run(csv_file_to_sync, crump_job, db_url, filename_values) 

249 

250 # Display dry-run summary 

251 console.print("\n[bold yellow]Dry-run Summary[/bold yellow]") 

252 console.print(f"[dim]{HORIZONTAL_LINE * 60}[/dim]") 

253 

254 # Schema changes 

255 if not summary.table_exists: 

256 console.print( 

257 f"[yellow] {BULLET} Table '{summary.table_name}' would be CREATED[/yellow]" 

258 ) 

259 else: 

260 console.print(f"[green] {BULLET} Table '{summary.table_name}' exists[/green]") 

261 

262 if summary.new_columns: 

263 console.print( 

264 f"[yellow] {BULLET} {len(summary.new_columns)} column(s) would be ADDED:[/yellow]" 

265 ) 

266 for col_name, col_type in summary.new_columns: 

267 console.print(f"[dim] - {col_name} ({col_type})[/dim]") 

268 else: 

269 console.print(f"[green] {BULLET} No new columns needed[/green]") 

270 

271 if summary.new_indexes: 

272 console.print( 

273 f"[yellow] {BULLET} {len(summary.new_indexes)} index(es) would be CREATED:[/yellow]" 

274 ) 

275 for idx_name in summary.new_indexes: 

276 console.print(f"[dim] - {idx_name}[/dim]") 

277 else: 

278 console.print(f"[green] {BULLET} No new indexes needed[/green]") 

279 

280 # Data changes 

281 console.print("\n[bold]Data Changes:[/bold]") 

282 console.print( 

283 f"[green] {BULLET} {summary.rows_to_sync} row(s) would be inserted/updated[/green]" 

284 ) 

285 

286 if filename_values and summary.rows_to_delete > 0: 

287 console.print( 

288 f"[yellow] {BULLET} {summary.rows_to_delete} stale row(s) would be deleted[/yellow]" 

289 ) 

290 elif filename_values: 

291 console.print(f"[green] {BULLET} No stale rows to delete[/green]") 

292 

293 console.print( 

294 f"\n[bold green]{CHECKMARK} Dry-run complete - no changes made to database[/bold green]" 

295 ) 

296 console.print(f"[dim] Source file: {file_path}[/dim]") 

297 if csv_file_to_sync != file_path: 

298 console.print(f"[dim] CSV extracted: {csv_file_to_sync.name}[/dim]") 

299 if filename_values: 

300 console.print(f"[dim] Extracted values: {filename_values}[/dim]") 

301 else: 

302 # Sync the file 

303 console.print(f"[cyan]Syncing {csv_file_to_sync.name} using job '{job}'...[/cyan]") 

304 rows_synced = sync_file_to_db( 

305 csv_file_to_sync, crump_job, db_url, filename_values, enable_history=history 

306 ) 

307 

308 console.print(f"[green]{CHECKMARK} Successfully synced {rows_synced} rows[/green]") 

309 console.print(f"[dim] Table: {crump_job.target_table}[/dim]") 

310 console.print(f"[dim] Source file: {file_path}[/dim]") 

311 if csv_file_to_sync != file_path: 

312 console.print(f"[dim] CSV extracted: {csv_file_to_sync.name}[/dim]") 

313 if filename_values: 

314 console.print(f"[dim] Extracted values: {filename_values}[/dim]") 

315 if history: 

316 console.print("[dim] History recorded in _crump_history table[/dim]") 

317 

318 except FileNotFoundError as e: 

319 console.print(f"[red]Error:[/red] {e}") 

320 raise click.Abort() from e 

321 except ValueError as e: 

322 console.print(f"[red]Error:[/red] {e}") 

323 raise click.Abort() from e 

324 except Exception as e: 

325 console.print(f"[red]Unexpected error:[/red] {e}") 

326 raise click.Abort() from e 

327 finally: 

328 # Clean up temporary files if CDF was extracted 

329 if temp_csv_files: 

330 console.print("[dim]Cleaning up temporary files...[/dim]") 

331 for temp_file in temp_csv_files: 

332 try: 

333 temp_file.unlink() 

334 except Exception as e: 

335 console.print(f"[yellow]Warning: Could not delete {temp_file}: {e}[/yellow]") 

336 

337 # Clean up temporary directory 

338 if temp_dir and temp_dir.exists(): 

339 try: 

340 temp_dir.rmdir() 

341 except Exception as e: 

342 console.print(f"[yellow]Warning: Could not delete temp directory: {e}[/yellow]")