Coverage for llm_dataset_engine/cli/main.py: 32%

289 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Main CLI entry point for LLM Dataset Engine. 

3 

4Provides command-line interface for processing datasets, estimating costs, 

5and managing pipeline execution. 

6""" 

7 

8import sys 

9from pathlib import Path 

10from typing import Optional 

11from uuid import UUID 

12 

13import click 

14from rich.console import Console 

15from rich.table import Table 

16 

17from llm_dataset_engine import __version__ 

18from llm_dataset_engine.api import Pipeline 

19from llm_dataset_engine.config import ConfigLoader 

20from llm_dataset_engine.core.specifications import ( 

21 DataSourceType, 

22 LLMProvider, 

23 PipelineSpecifications, 

24) 

25 

26console = Console() 

27 

28 

29@click.group() 

30@click.version_option(version=__version__, prog_name="llm-dataset") 

31def cli(): 

32 """ 

33 LLM Dataset Engine - Process tabular datasets using LLMs. 

34  

35 A production-grade SDK for processing CSV, Excel, and Parquet files 

36 with Large Language Models, featuring reliability, cost control, and 

37 observability. 

38  

39 Examples: 

40  

41 # Process a dataset 

42 llm-dataset process --config config.yaml --input data.csv --output result.csv 

43  

44 # Estimate cost before processing 

45 llm-dataset estimate --config config.yaml --input data.csv 

46  

47 # Resume from checkpoint 

48 llm-dataset resume --session-id abc-123 --checkpoint-dir .checkpoints 

49  

50 # Validate configuration 

51 llm-dataset validate --config config.yaml 

52 """ 

53 pass 

54 

55 

56@cli.command() 

57@click.option( 

58 "--config", 

59 "-c", 

60 required=True, 

61 type=click.Path(exists=True, path_type=Path), 

62 help="Path to YAML/JSON configuration file", 

63) 

64@click.option( 

65 "--input", 

66 "-i", 

67 required=True, 

68 type=click.Path(exists=True, path_type=Path), 

69 help="Path to input data file (CSV, Excel, Parquet)", 

70) 

71@click.option( 

72 "--output", 

73 "-o", 

74 required=True, 

75 type=click.Path(path_type=Path), 

76 help="Path to output file", 

77) 

78@click.option( 

79 "--provider", 

80 type=click.Choice(["openai", "azure_openai", "anthropic", "groq"]), 

81 help="Override LLM provider from config", 

82) 

83@click.option( 

84 "--model", 

85 help="Override model name from config", 

86) 

87@click.option( 

88 "--max-budget", 

89 type=float, 

90 help="Override maximum budget (USD) from config", 

91) 

92@click.option( 

93 "--batch-size", 

94 type=int, 

95 help="Override batch size from config", 

96) 

97@click.option( 

98 "--concurrency", 

99 type=int, 

100 help="Override concurrency from config", 

101) 

102@click.option( 

103 "--checkpoint-dir", 

104 type=click.Path(path_type=Path), 

105 help="Override checkpoint directory from config", 

106) 

107@click.option( 

108 "--dry-run", 

109 is_flag=True, 

110 help="Validate and estimate only, don't execute", 

111) 

112@click.option( 

113 "--verbose", 

114 "-v", 

115 is_flag=True, 

116 help="Enable verbose logging", 

117) 

118def process( 

119 config: Path, 

120 input: Path, 

121 output: Path, 

122 provider: Optional[str], 

123 model: Optional[str], 

124 max_budget: Optional[float], 

125 batch_size: Optional[int], 

126 concurrency: Optional[int], 

127 checkpoint_dir: Optional[Path], 

128 dry_run: bool, 

129 verbose: bool, 

130): 

131 """ 

132 Process a dataset using LLM transformations. 

133  

134 Reads data from INPUT file, applies LLM transformations according to CONFIG, 

135 and writes results to OUTPUT file. 

136  

137 Examples: 

138  

139 # Basic usage 

140 llm-dataset process -c config.yaml -i data.csv -o result.csv 

141  

142 # Override provider and model 

143 llm-dataset process -c config.yaml -i data.csv -o result.csv \\ 

144 --provider groq --model openai/gpt-oss-120b 

145  

146 # Set budget limit 

147 llm-dataset process -c config.yaml -i data.csv -o result.csv \\ 

148 --max-budget 10.0 

149  

150 # Dry run (estimate only) 

151 llm-dataset process -c config.yaml -i data.csv -o result.csv --dry-run 

152 """ 

153 try: 

154 # Load configuration 

155 console.print(f"[cyan]Loading configuration from {config}...[/cyan]") 

156 specs = ConfigLoader.from_yaml(str(config)) 

157 

158 # Override with CLI arguments 

159 specs.dataset.source_path = input 

160 

161 # Set output configuration 

162 if specs.output: 

163 specs.output.destination_path = output 

164 else: 

165 # Create output spec if not in config 

166 from llm_dataset_engine.core.specifications import OutputSpec, MergeStrategy 

167 

168 # Detect output type from extension 

169 output_suffix = output.suffix.lower() 

170 if output_suffix == ".csv": 

171 output_type = DataSourceType.CSV 

172 elif output_suffix in [".xlsx", ".xls"]: 

173 output_type = DataSourceType.EXCEL 

174 elif output_suffix == ".parquet": 

175 output_type = DataSourceType.PARQUET 

176 else: 

177 output_type = DataSourceType.CSV # Default 

178 

179 specs.output = OutputSpec( 

180 destination_type=output_type, 

181 destination_path=output, 

182 merge_strategy=MergeStrategy.REPLACE, 

183 ) 

184 

185 if provider: 

186 specs.llm.provider = LLMProvider(provider) 

187 

188 if model: 

189 specs.llm.model = model 

190 

191 if max_budget is not None: 

192 from decimal import Decimal 

193 specs.processing.max_budget = Decimal(str(max_budget)) 

194 

195 if batch_size is not None: 

196 specs.processing.batch_size = batch_size 

197 

198 if concurrency is not None: 

199 specs.processing.concurrency = concurrency 

200 

201 if checkpoint_dir is not None: 

202 specs.processing.checkpoint_dir = checkpoint_dir 

203 

204 # Create pipeline 

205 console.print("[cyan]Creating pipeline...[/cyan]") 

206 pipeline = Pipeline(specs) 

207 

208 # Validate 

209 console.print("[cyan]Validating pipeline...[/cyan]") 

210 validation = pipeline.validate() 

211 

212 if not validation.is_valid: 

213 console.print("[red]❌ Validation failed:[/red]") 

214 for error in validation.errors: 

215 console.print(f" [red]• {error}[/red]") 

216 sys.exit(1) 

217 

218 console.print("[green]✅ Validation passed[/green]") 

219 

220 # Estimate cost 

221 console.print("\n[cyan]Estimating cost...[/cyan]") 

222 estimate = pipeline.estimate_cost() 

223 

224 table = Table(title="Cost Estimate") 

225 table.add_column("Metric", style="cyan") 

226 table.add_column("Value", style="green") 

227 

228 table.add_row("Total Cost", f"${estimate.total_cost}") 

229 table.add_row("Total Tokens", f"{estimate.total_tokens:,}") 

230 table.add_row("Input Tokens", f"{estimate.input_tokens:,}") 

231 table.add_row("Output Tokens", f"{estimate.output_tokens:,}") 

232 table.add_row("Rows", f"{estimate.rows:,}") 

233 

234 console.print(table) 

235 

236 if dry_run: 

237 console.print("\n[yellow]Dry run mode - skipping execution[/yellow]") 

238 return 

239 

240 # Execute 

241 console.print("\n[cyan]Processing dataset...[/cyan]") 

242 result = pipeline.execute() 

243 

244 # Display results 

245 console.print("\n[green]✅ Processing complete![/green]") 

246 

247 results_table = Table(title="Execution Results") 

248 results_table.add_column("Metric", style="cyan") 

249 results_table.add_column("Value", style="green") 

250 

251 results_table.add_row("Total Rows", str(result.metrics.total_rows)) 

252 results_table.add_row("Processed", str(result.metrics.processed_rows)) 

253 results_table.add_row("Failed", str(result.metrics.failed_rows)) 

254 results_table.add_row("Skipped", str(result.metrics.skipped_rows)) 

255 results_table.add_row("Duration", f"{result.duration:.2f}s") 

256 results_table.add_row("Total Cost", f"${result.costs.total_cost}") 

257 results_table.add_row("Cost per Row", f"${result.costs.total_cost / result.metrics.total_rows:.6f}") 

258 

259 console.print(results_table) 

260 

261 console.print(f"\n[green]Output written to: {output}[/green]") 

262 

263 except Exception as e: 

264 console.print(f"[red]❌ Error: {e}[/red]") 

265 if verbose: 

266 console.print_exception() 

267 sys.exit(1) 

268 

269 

270@cli.command() 

271@click.option( 

272 "--config", 

273 "-c", 

274 required=True, 

275 type=click.Path(exists=True, path_type=Path), 

276 help="Path to YAML/JSON configuration file", 

277) 

278@click.option( 

279 "--input", 

280 "-i", 

281 required=True, 

282 type=click.Path(exists=True, path_type=Path), 

283 help="Path to input data file", 

284) 

285@click.option( 

286 "--provider", 

287 type=click.Choice(["openai", "azure_openai", "anthropic", "groq"]), 

288 help="Override LLM provider from config", 

289) 

290@click.option( 

291 "--model", 

292 help="Override model name from config", 

293) 

294def estimate( 

295 config: Path, 

296 input: Path, 

297 provider: Optional[str], 

298 model: Optional[str], 

299): 

300 """ 

301 Estimate processing cost without executing. 

302  

303 Useful for budget planning and cost validation before running 

304 expensive operations. 

305  

306 Examples: 

307  

308 # Estimate cost 

309 llm-dataset estimate -c config.yaml -i data.csv 

310  

311 # Estimate with different model 

312 llm-dataset estimate -c config.yaml -i data.csv --model gpt-4o 

313 """ 

314 try: 

315 # Load configuration 

316 console.print(f"[cyan]Loading configuration from {config}...[/cyan]") 

317 specs = ConfigLoader.from_yaml(str(config)) 

318 

319 # Override 

320 specs.dataset.source_path = input 

321 

322 if provider: 

323 specs.llm.provider = LLMProvider(provider) 

324 

325 if model: 

326 specs.llm.model = model 

327 

328 # Create pipeline 

329 pipeline = Pipeline(specs) 

330 

331 # Validate 

332 validation = pipeline.validate() 

333 if not validation.is_valid: 

334 console.print("[red]❌ Validation failed:[/red]") 

335 for error in validation.errors: 

336 console.print(f" [red]• {error}[/red]") 

337 sys.exit(1) 

338 

339 # Estimate 

340 console.print("[cyan]Estimating cost...[/cyan]") 

341 estimate = pipeline.estimate_cost() 

342 

343 # Display results 

344 table = Table(title="Cost Estimate", show_header=True) 

345 table.add_column("Metric", style="cyan", width=20) 

346 table.add_column("Value", style="green", width=20) 

347 

348 table.add_row("Total Cost", f"${estimate.total_cost}") 

349 table.add_row("Total Tokens", f"{estimate.total_tokens:,}") 

350 table.add_row("Input Tokens", f"{estimate.input_tokens:,}") 

351 table.add_row("Output Tokens", f"{estimate.output_tokens:,}") 

352 table.add_row("Rows to Process", f"{estimate.rows:,}") 

353 table.add_row("Confidence", estimate.confidence) 

354 

355 console.print("\n") 

356 console.print(table) 

357 

358 # Cost per row 

359 if estimate.rows > 0: 

360 cost_per_row = estimate.total_cost / estimate.rows 

361 console.print(f"\n[cyan]Cost per row: ${cost_per_row:.6f}[/cyan]") 

362 

363 # Warning if expensive 

364 if estimate.total_cost > 10.0: 

365 console.print(f"\n[yellow]⚠️ Warning: Estimated cost (${estimate.total_cost}) exceeds $10[/yellow]") 

366 

367 except Exception as e: 

368 console.print(f"[red]❌ Error: {e}[/red]") 

369 sys.exit(1) 

370 

371 

372@cli.command() 

373@click.option( 

374 "--session-id", 

375 "-s", 

376 required=True, 

377 help="Session ID to resume (UUID)", 

378) 

379@click.option( 

380 "--checkpoint-dir", 

381 type=click.Path(exists=True, path_type=Path), 

382 default=".checkpoints", 

383 help="Checkpoint directory (default: .checkpoints)", 

384) 

385@click.option( 

386 "--output", 

387 "-o", 

388 type=click.Path(path_type=Path), 

389 help="Override output path", 

390) 

391def resume( 

392 session_id: str, 

393 checkpoint_dir: Path, 

394 output: Optional[Path], 

395): 

396 """ 

397 Resume pipeline execution from checkpoint. 

398  

399 Useful for recovering from failures or continuing interrupted processing. 

400  

401 Examples: 

402  

403 # Resume from checkpoint 

404 llm-dataset resume --session-id abc-123-def 

405  

406 # Resume with custom checkpoint directory 

407 llm-dataset resume -s abc-123 --checkpoint-dir /path/to/checkpoints 

408 """ 

409 try: 

410 from llm_dataset_engine.adapters import LocalFileCheckpointStorage 

411 from llm_dataset_engine.orchestration import StateManager 

412 

413 # Load checkpoint 

414 console.print(f"[cyan]Looking for checkpoint in {checkpoint_dir}...[/cyan]") 

415 

416 storage = LocalFileCheckpointStorage(str(checkpoint_dir)) 

417 state_manager = StateManager(storage) 

418 

419 session_uuid = UUID(session_id) 

420 

421 if not state_manager.can_resume(session_uuid): 

422 console.print(f"[red]❌ No checkpoint found for session {session_id}[/red]") 

423 console.print(f"[yellow]Check checkpoint directory: {checkpoint_dir}[/yellow]") 

424 sys.exit(1) 

425 

426 # Load checkpoint 

427 checkpoint_info = state_manager.get_latest_checkpoint(session_uuid) 

428 console.print(f"[green]✅ Found checkpoint at row {checkpoint_info.row_index}[/green]") 

429 

430 # Resume execution 

431 console.print("[cyan]Resuming execution...[/cyan]") 

432 

433 # Note: Full resume implementation would load the original pipeline 

434 # and continue from checkpoint. For now, we show the checkpoint info. 

435 console.print("\n[yellow]⚠️ Full resume functionality requires the original pipeline configuration[/yellow]") 

436 console.print("[yellow]Please use Pipeline.execute(resume_from=session_id) in Python code[/yellow]") 

437 

438 # Display checkpoint info 

439 table = Table(title="Checkpoint Information") 

440 table.add_column("Property", style="cyan") 

441 table.add_column("Value", style="green") 

442 

443 table.add_row("Session ID", str(checkpoint_info.session_id)) 

444 table.add_row("Checkpoint Path", checkpoint_info.checkpoint_path) 

445 table.add_row("Last Row", str(checkpoint_info.row_index)) 

446 table.add_row("Last Stage", str(checkpoint_info.stage_index)) 

447 table.add_row("Timestamp", str(checkpoint_info.timestamp)) 

448 table.add_row("Size", f"{checkpoint_info.size_bytes:,} bytes") 

449 

450 console.print(table) 

451 

452 except ValueError: 

453 console.print(f"[red]❌ Invalid session ID format: {session_id}[/red]") 

454 console.print("[yellow]Session ID should be a UUID (e.g., abc-123-def-456)[/yellow]") 

455 sys.exit(1) 

456 except Exception as e: 

457 console.print(f"[red]❌ Error: {e}[/red]") 

458 sys.exit(1) 

459 

460 

461@cli.command() 

462@click.option( 

463 "--config", 

464 "-c", 

465 required=True, 

466 type=click.Path(exists=True, path_type=Path), 

467 help="Path to YAML/JSON configuration file", 

468) 

469@click.option( 

470 "--verbose", 

471 "-v", 

472 is_flag=True, 

473 help="Show detailed validation results", 

474) 

475def validate(config: Path, verbose: bool): 

476 """ 

477 Validate pipeline configuration. 

478  

479 Checks configuration file for errors and warnings without executing 

480 the pipeline. 

481  

482 Examples: 

483  

484 # Validate configuration 

485 llm-dataset validate -c config.yaml 

486  

487 # Verbose validation 

488 llm-dataset validate -c config.yaml --verbose 

489 """ 

490 try: 

491 # Load configuration 

492 console.print(f"[cyan]Loading configuration from {config}...[/cyan]") 

493 specs = ConfigLoader.from_yaml(str(config)) 

494 

495 console.print("[green]✅ Configuration loaded successfully[/green]") 

496 

497 # Display configuration summary 

498 if verbose: 

499 table = Table(title="Configuration Summary") 

500 table.add_column("Component", style="cyan") 

501 table.add_column("Details", style="green") 

502 

503 table.add_row("Dataset", f"{specs.dataset.source_type.value}") 

504 table.add_row("Input Columns", ", ".join(specs.dataset.input_columns)) 

505 table.add_row("Output Columns", ", ".join(specs.dataset.output_columns)) 

506 table.add_row("LLM Provider", specs.llm.provider.value) 

507 table.add_row("Model", specs.llm.model) 

508 table.add_row("Batch Size", str(specs.processing.batch_size)) 

509 table.add_row("Concurrency", str(specs.processing.concurrency)) 

510 

511 if specs.processing.max_budget: 

512 table.add_row("Max Budget", f"${specs.processing.max_budget}") 

513 

514 console.print("\n") 

515 console.print(table) 

516 

517 # Create pipeline for validation 

518 console.print("\n[cyan]Validating pipeline...[/cyan]") 

519 pipeline = Pipeline(specs) 

520 validation = pipeline.validate() 

521 

522 if validation.is_valid: 

523 console.print("[green]✅ Pipeline configuration is valid[/green]") 

524 

525 if validation.warnings: 

526 console.print("\n[yellow]Warnings:[/yellow]") 

527 for warning in validation.warnings: 

528 console.print(f" [yellow]• {warning}[/yellow]") 

529 else: 

530 console.print("[red]❌ Pipeline configuration is invalid[/red]") 

531 console.print("\n[red]Errors:[/red]") 

532 for error in validation.errors: 

533 console.print(f" [red]• {error}[/red]") 

534 

535 if validation.warnings: 

536 console.print("\n[yellow]Warnings:[/yellow]") 

537 for warning in validation.warnings: 

538 console.print(f" [yellow]• {warning}[/yellow]") 

539 

540 sys.exit(1) 

541 

542 except Exception as e: 

543 console.print(f"[red]❌ Error: {e}[/red]") 

544 sys.exit(1) 

545 

546 

547@cli.command() 

548@click.option( 

549 "--checkpoint-dir", 

550 type=click.Path(exists=True, path_type=Path), 

551 default=".checkpoints", 

552 help="Checkpoint directory to list (default: .checkpoints)", 

553) 

554def list_checkpoints(checkpoint_dir: Path): 

555 """ 

556 List available checkpoints. 

557  

558 Shows all saved checkpoints in the specified directory. 

559  

560 Examples: 

561  

562 # List checkpoints 

563 llm-dataset list-checkpoints 

564  

565 # List from custom directory 

566 llm-dataset list-checkpoints --checkpoint-dir /path/to/checkpoints 

567 """ 

568 try: 

569 from llm_dataset_engine.adapters import LocalFileCheckpointStorage 

570 

571 console.print(f"[cyan]Scanning {checkpoint_dir} for checkpoints...[/cyan]") 

572 

573 storage = LocalFileCheckpointStorage(checkpoint_dir) 

574 checkpoints = storage.list_checkpoints() 

575 

576 if not checkpoints: 

577 console.print("[yellow]No checkpoints found[/yellow]") 

578 return 

579 

580 # Display checkpoints 

581 table = Table(title=f"Checkpoints in {checkpoint_dir}") 

582 table.add_column("Session ID", style="cyan") 

583 table.add_column("Row", style="green") 

584 table.add_column("Stage", style="green") 

585 table.add_column("Timestamp", style="yellow") 

586 table.add_column("Size", style="magenta") 

587 

588 for cp in checkpoints: 

589 table.add_row( 

590 str(cp.session_id)[:8] + "...", 

591 str(cp.row_index), 

592 str(cp.stage_index), 

593 cp.timestamp.strftime("%Y-%m-%d %H:%M:%S"), 

594 f"{cp.size_bytes:,} bytes", 

595 ) 

596 

597 console.print("\n") 

598 console.print(table) 

599 console.print(f"\n[cyan]Total checkpoints: {len(checkpoints)}[/cyan]") 

600 

601 except Exception as e: 

602 console.print(f"[red]❌ Error: {e}[/red]") 

603 sys.exit(1) 

604 

605 

606@cli.command() 

607@click.option( 

608 "--input", 

609 "-i", 

610 required=True, 

611 type=click.Path(exists=True, path_type=Path), 

612 help="Path to input file to inspect", 

613) 

614@click.option( 

615 "--head", 

616 type=int, 

617 default=5, 

618 help="Number of rows to show (default: 5)", 

619) 

620def inspect(input: Path, head: int): 

621 """ 

622 Inspect input data file. 

623  

624 Shows file info and preview of first N rows. 

625  

626 Examples: 

627  

628 # Inspect CSV file 

629 llm-dataset inspect -i data.csv 

630  

631 # Show first 10 rows 

632 llm-dataset inspect -i data.csv --head 10 

633 """ 

634 try: 

635 import pandas as pd 

636 

637 console.print(f"[cyan]Inspecting {input}...[/cyan]") 

638 

639 # Detect file type 

640 suffix = input.suffix.lower() 

641 

642 if suffix == ".csv": 

643 df = pd.read_csv(input) 

644 elif suffix in [".xlsx", ".xls"]: 

645 df = pd.read_excel(input) 

646 elif suffix == ".parquet": 

647 df = pd.read_parquet(input) 

648 else: 

649 console.print(f"[red]❌ Unsupported file type: {suffix}[/red]") 

650 sys.exit(1) 

651 

652 # File info 

653 info_table = Table(title="File Information") 

654 info_table.add_column("Property", style="cyan") 

655 info_table.add_column("Value", style="green") 

656 

657 info_table.add_row("File Path", str(input)) 

658 info_table.add_row("File Type", suffix[1:].upper()) 

659 info_table.add_row("Total Rows", f"{len(df):,}") 

660 info_table.add_row("Total Columns", str(len(df.columns))) 

661 info_table.add_row("Memory Usage", f"{df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB") 

662 

663 console.print("\n") 

664 console.print(info_table) 

665 

666 # Columns 

667 console.print("\n[cyan]Columns:[/cyan]") 

668 for col in df.columns: 

669 dtype = df[col].dtype 

670 null_count = df[col].isnull().sum() 

671 console.print(f" • {col} ({dtype}) - {null_count} nulls") 

672 

673 # Preview 

674 console.print(f"\n[cyan]First {head} rows:[/cyan]") 

675 console.print(df.head(head).to_string()) 

676 

677 except Exception as e: 

678 console.print(f"[red]❌ Error: {e}[/red]") 

679 sys.exit(1) 

680 

681 

682if __name__ == "__main__": 

683 cli() 

684