Coverage for llm_dataset_engine/cli/main.py: 32%
289 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Main CLI entry point for LLM Dataset Engine.
4Provides command-line interface for processing datasets, estimating costs,
5and managing pipeline execution.
6"""
8import sys
9from pathlib import Path
10from typing import Optional
11from uuid import UUID
13import click
14from rich.console import Console
15from rich.table import Table
17from llm_dataset_engine import __version__
18from llm_dataset_engine.api import Pipeline
19from llm_dataset_engine.config import ConfigLoader
20from llm_dataset_engine.core.specifications import (
21 DataSourceType,
22 LLMProvider,
23 PipelineSpecifications,
24)
26console = Console()
29@click.group()
30@click.version_option(version=__version__, prog_name="llm-dataset")
31def cli():
32 """
33 LLM Dataset Engine - Process tabular datasets using LLMs.
35 A production-grade SDK for processing CSV, Excel, and Parquet files
36 with Large Language Models, featuring reliability, cost control, and
37 observability.
39 Examples:
41 # Process a dataset
42 llm-dataset process --config config.yaml --input data.csv --output result.csv
44 # Estimate cost before processing
45 llm-dataset estimate --config config.yaml --input data.csv
47 # Resume from checkpoint
48 llm-dataset resume --session-id abc-123 --checkpoint-dir .checkpoints
50 # Validate configuration
51 llm-dataset validate --config config.yaml
52 """
53 pass
56@cli.command()
57@click.option(
58 "--config",
59 "-c",
60 required=True,
61 type=click.Path(exists=True, path_type=Path),
62 help="Path to YAML/JSON configuration file",
63)
64@click.option(
65 "--input",
66 "-i",
67 required=True,
68 type=click.Path(exists=True, path_type=Path),
69 help="Path to input data file (CSV, Excel, Parquet)",
70)
71@click.option(
72 "--output",
73 "-o",
74 required=True,
75 type=click.Path(path_type=Path),
76 help="Path to output file",
77)
78@click.option(
79 "--provider",
80 type=click.Choice(["openai", "azure_openai", "anthropic", "groq"]),
81 help="Override LLM provider from config",
82)
83@click.option(
84 "--model",
85 help="Override model name from config",
86)
87@click.option(
88 "--max-budget",
89 type=float,
90 help="Override maximum budget (USD) from config",
91)
92@click.option(
93 "--batch-size",
94 type=int,
95 help="Override batch size from config",
96)
97@click.option(
98 "--concurrency",
99 type=int,
100 help="Override concurrency from config",
101)
102@click.option(
103 "--checkpoint-dir",
104 type=click.Path(path_type=Path),
105 help="Override checkpoint directory from config",
106)
107@click.option(
108 "--dry-run",
109 is_flag=True,
110 help="Validate and estimate only, don't execute",
111)
112@click.option(
113 "--verbose",
114 "-v",
115 is_flag=True,
116 help="Enable verbose logging",
117)
118def process(
119 config: Path,
120 input: Path,
121 output: Path,
122 provider: Optional[str],
123 model: Optional[str],
124 max_budget: Optional[float],
125 batch_size: Optional[int],
126 concurrency: Optional[int],
127 checkpoint_dir: Optional[Path],
128 dry_run: bool,
129 verbose: bool,
130):
131 """
132 Process a dataset using LLM transformations.
134 Reads data from INPUT file, applies LLM transformations according to CONFIG,
135 and writes results to OUTPUT file.
137 Examples:
139 # Basic usage
140 llm-dataset process -c config.yaml -i data.csv -o result.csv
142 # Override provider and model
143 llm-dataset process -c config.yaml -i data.csv -o result.csv \\
144 --provider groq --model openai/gpt-oss-120b
146 # Set budget limit
147 llm-dataset process -c config.yaml -i data.csv -o result.csv \\
148 --max-budget 10.0
150 # Dry run (estimate only)
151 llm-dataset process -c config.yaml -i data.csv -o result.csv --dry-run
152 """
153 try:
154 # Load configuration
155 console.print(f"[cyan]Loading configuration from {config}...[/cyan]")
156 specs = ConfigLoader.from_yaml(str(config))
158 # Override with CLI arguments
159 specs.dataset.source_path = input
161 # Set output configuration
162 if specs.output:
163 specs.output.destination_path = output
164 else:
165 # Create output spec if not in config
166 from llm_dataset_engine.core.specifications import OutputSpec, MergeStrategy
168 # Detect output type from extension
169 output_suffix = output.suffix.lower()
170 if output_suffix == ".csv":
171 output_type = DataSourceType.CSV
172 elif output_suffix in [".xlsx", ".xls"]:
173 output_type = DataSourceType.EXCEL
174 elif output_suffix == ".parquet":
175 output_type = DataSourceType.PARQUET
176 else:
177 output_type = DataSourceType.CSV # Default
179 specs.output = OutputSpec(
180 destination_type=output_type,
181 destination_path=output,
182 merge_strategy=MergeStrategy.REPLACE,
183 )
185 if provider:
186 specs.llm.provider = LLMProvider(provider)
188 if model:
189 specs.llm.model = model
191 if max_budget is not None:
192 from decimal import Decimal
193 specs.processing.max_budget = Decimal(str(max_budget))
195 if batch_size is not None:
196 specs.processing.batch_size = batch_size
198 if concurrency is not None:
199 specs.processing.concurrency = concurrency
201 if checkpoint_dir is not None:
202 specs.processing.checkpoint_dir = checkpoint_dir
204 # Create pipeline
205 console.print("[cyan]Creating pipeline...[/cyan]")
206 pipeline = Pipeline(specs)
208 # Validate
209 console.print("[cyan]Validating pipeline...[/cyan]")
210 validation = pipeline.validate()
212 if not validation.is_valid:
213 console.print("[red]❌ Validation failed:[/red]")
214 for error in validation.errors:
215 console.print(f" [red]• {error}[/red]")
216 sys.exit(1)
218 console.print("[green]✅ Validation passed[/green]")
220 # Estimate cost
221 console.print("\n[cyan]Estimating cost...[/cyan]")
222 estimate = pipeline.estimate_cost()
224 table = Table(title="Cost Estimate")
225 table.add_column("Metric", style="cyan")
226 table.add_column("Value", style="green")
228 table.add_row("Total Cost", f"${estimate.total_cost}")
229 table.add_row("Total Tokens", f"{estimate.total_tokens:,}")
230 table.add_row("Input Tokens", f"{estimate.input_tokens:,}")
231 table.add_row("Output Tokens", f"{estimate.output_tokens:,}")
232 table.add_row("Rows", f"{estimate.rows:,}")
234 console.print(table)
236 if dry_run:
237 console.print("\n[yellow]Dry run mode - skipping execution[/yellow]")
238 return
240 # Execute
241 console.print("\n[cyan]Processing dataset...[/cyan]")
242 result = pipeline.execute()
244 # Display results
245 console.print("\n[green]✅ Processing complete![/green]")
247 results_table = Table(title="Execution Results")
248 results_table.add_column("Metric", style="cyan")
249 results_table.add_column("Value", style="green")
251 results_table.add_row("Total Rows", str(result.metrics.total_rows))
252 results_table.add_row("Processed", str(result.metrics.processed_rows))
253 results_table.add_row("Failed", str(result.metrics.failed_rows))
254 results_table.add_row("Skipped", str(result.metrics.skipped_rows))
255 results_table.add_row("Duration", f"{result.duration:.2f}s")
256 results_table.add_row("Total Cost", f"${result.costs.total_cost}")
257 results_table.add_row("Cost per Row", f"${result.costs.total_cost / result.metrics.total_rows:.6f}")
259 console.print(results_table)
261 console.print(f"\n[green]Output written to: {output}[/green]")
263 except Exception as e:
264 console.print(f"[red]❌ Error: {e}[/red]")
265 if verbose:
266 console.print_exception()
267 sys.exit(1)
270@cli.command()
271@click.option(
272 "--config",
273 "-c",
274 required=True,
275 type=click.Path(exists=True, path_type=Path),
276 help="Path to YAML/JSON configuration file",
277)
278@click.option(
279 "--input",
280 "-i",
281 required=True,
282 type=click.Path(exists=True, path_type=Path),
283 help="Path to input data file",
284)
285@click.option(
286 "--provider",
287 type=click.Choice(["openai", "azure_openai", "anthropic", "groq"]),
288 help="Override LLM provider from config",
289)
290@click.option(
291 "--model",
292 help="Override model name from config",
293)
294def estimate(
295 config: Path,
296 input: Path,
297 provider: Optional[str],
298 model: Optional[str],
299):
300 """
301 Estimate processing cost without executing.
303 Useful for budget planning and cost validation before running
304 expensive operations.
306 Examples:
308 # Estimate cost
309 llm-dataset estimate -c config.yaml -i data.csv
311 # Estimate with different model
312 llm-dataset estimate -c config.yaml -i data.csv --model gpt-4o
313 """
314 try:
315 # Load configuration
316 console.print(f"[cyan]Loading configuration from {config}...[/cyan]")
317 specs = ConfigLoader.from_yaml(str(config))
319 # Override
320 specs.dataset.source_path = input
322 if provider:
323 specs.llm.provider = LLMProvider(provider)
325 if model:
326 specs.llm.model = model
328 # Create pipeline
329 pipeline = Pipeline(specs)
331 # Validate
332 validation = pipeline.validate()
333 if not validation.is_valid:
334 console.print("[red]❌ Validation failed:[/red]")
335 for error in validation.errors:
336 console.print(f" [red]• {error}[/red]")
337 sys.exit(1)
339 # Estimate
340 console.print("[cyan]Estimating cost...[/cyan]")
341 estimate = pipeline.estimate_cost()
343 # Display results
344 table = Table(title="Cost Estimate", show_header=True)
345 table.add_column("Metric", style="cyan", width=20)
346 table.add_column("Value", style="green", width=20)
348 table.add_row("Total Cost", f"${estimate.total_cost}")
349 table.add_row("Total Tokens", f"{estimate.total_tokens:,}")
350 table.add_row("Input Tokens", f"{estimate.input_tokens:,}")
351 table.add_row("Output Tokens", f"{estimate.output_tokens:,}")
352 table.add_row("Rows to Process", f"{estimate.rows:,}")
353 table.add_row("Confidence", estimate.confidence)
355 console.print("\n")
356 console.print(table)
358 # Cost per row
359 if estimate.rows > 0:
360 cost_per_row = estimate.total_cost / estimate.rows
361 console.print(f"\n[cyan]Cost per row: ${cost_per_row:.6f}[/cyan]")
363 # Warning if expensive
364 if estimate.total_cost > 10.0:
365 console.print(f"\n[yellow]⚠️ Warning: Estimated cost (${estimate.total_cost}) exceeds $10[/yellow]")
367 except Exception as e:
368 console.print(f"[red]❌ Error: {e}[/red]")
369 sys.exit(1)
372@cli.command()
373@click.option(
374 "--session-id",
375 "-s",
376 required=True,
377 help="Session ID to resume (UUID)",
378)
379@click.option(
380 "--checkpoint-dir",
381 type=click.Path(exists=True, path_type=Path),
382 default=".checkpoints",
383 help="Checkpoint directory (default: .checkpoints)",
384)
385@click.option(
386 "--output",
387 "-o",
388 type=click.Path(path_type=Path),
389 help="Override output path",
390)
391def resume(
392 session_id: str,
393 checkpoint_dir: Path,
394 output: Optional[Path],
395):
396 """
397 Resume pipeline execution from checkpoint.
399 Useful for recovering from failures or continuing interrupted processing.
401 Examples:
403 # Resume from checkpoint
404 llm-dataset resume --session-id abc-123-def
406 # Resume with custom checkpoint directory
407 llm-dataset resume -s abc-123 --checkpoint-dir /path/to/checkpoints
408 """
409 try:
410 from llm_dataset_engine.adapters import LocalFileCheckpointStorage
411 from llm_dataset_engine.orchestration import StateManager
413 # Load checkpoint
414 console.print(f"[cyan]Looking for checkpoint in {checkpoint_dir}...[/cyan]")
416 storage = LocalFileCheckpointStorage(str(checkpoint_dir))
417 state_manager = StateManager(storage)
419 session_uuid = UUID(session_id)
421 if not state_manager.can_resume(session_uuid):
422 console.print(f"[red]❌ No checkpoint found for session {session_id}[/red]")
423 console.print(f"[yellow]Check checkpoint directory: {checkpoint_dir}[/yellow]")
424 sys.exit(1)
426 # Load checkpoint
427 checkpoint_info = state_manager.get_latest_checkpoint(session_uuid)
428 console.print(f"[green]✅ Found checkpoint at row {checkpoint_info.row_index}[/green]")
430 # Resume execution
431 console.print("[cyan]Resuming execution...[/cyan]")
433 # Note: Full resume implementation would load the original pipeline
434 # and continue from checkpoint. For now, we show the checkpoint info.
435 console.print("\n[yellow]⚠️ Full resume functionality requires the original pipeline configuration[/yellow]")
436 console.print("[yellow]Please use Pipeline.execute(resume_from=session_id) in Python code[/yellow]")
438 # Display checkpoint info
439 table = Table(title="Checkpoint Information")
440 table.add_column("Property", style="cyan")
441 table.add_column("Value", style="green")
443 table.add_row("Session ID", str(checkpoint_info.session_id))
444 table.add_row("Checkpoint Path", checkpoint_info.checkpoint_path)
445 table.add_row("Last Row", str(checkpoint_info.row_index))
446 table.add_row("Last Stage", str(checkpoint_info.stage_index))
447 table.add_row("Timestamp", str(checkpoint_info.timestamp))
448 table.add_row("Size", f"{checkpoint_info.size_bytes:,} bytes")
450 console.print(table)
452 except ValueError:
453 console.print(f"[red]❌ Invalid session ID format: {session_id}[/red]")
454 console.print("[yellow]Session ID should be a UUID (e.g., abc-123-def-456)[/yellow]")
455 sys.exit(1)
456 except Exception as e:
457 console.print(f"[red]❌ Error: {e}[/red]")
458 sys.exit(1)
461@cli.command()
462@click.option(
463 "--config",
464 "-c",
465 required=True,
466 type=click.Path(exists=True, path_type=Path),
467 help="Path to YAML/JSON configuration file",
468)
469@click.option(
470 "--verbose",
471 "-v",
472 is_flag=True,
473 help="Show detailed validation results",
474)
475def validate(config: Path, verbose: bool):
476 """
477 Validate pipeline configuration.
479 Checks configuration file for errors and warnings without executing
480 the pipeline.
482 Examples:
484 # Validate configuration
485 llm-dataset validate -c config.yaml
487 # Verbose validation
488 llm-dataset validate -c config.yaml --verbose
489 """
490 try:
491 # Load configuration
492 console.print(f"[cyan]Loading configuration from {config}...[/cyan]")
493 specs = ConfigLoader.from_yaml(str(config))
495 console.print("[green]✅ Configuration loaded successfully[/green]")
497 # Display configuration summary
498 if verbose:
499 table = Table(title="Configuration Summary")
500 table.add_column("Component", style="cyan")
501 table.add_column("Details", style="green")
503 table.add_row("Dataset", f"{specs.dataset.source_type.value}")
504 table.add_row("Input Columns", ", ".join(specs.dataset.input_columns))
505 table.add_row("Output Columns", ", ".join(specs.dataset.output_columns))
506 table.add_row("LLM Provider", specs.llm.provider.value)
507 table.add_row("Model", specs.llm.model)
508 table.add_row("Batch Size", str(specs.processing.batch_size))
509 table.add_row("Concurrency", str(specs.processing.concurrency))
511 if specs.processing.max_budget:
512 table.add_row("Max Budget", f"${specs.processing.max_budget}")
514 console.print("\n")
515 console.print(table)
517 # Create pipeline for validation
518 console.print("\n[cyan]Validating pipeline...[/cyan]")
519 pipeline = Pipeline(specs)
520 validation = pipeline.validate()
522 if validation.is_valid:
523 console.print("[green]✅ Pipeline configuration is valid[/green]")
525 if validation.warnings:
526 console.print("\n[yellow]Warnings:[/yellow]")
527 for warning in validation.warnings:
528 console.print(f" [yellow]• {warning}[/yellow]")
529 else:
530 console.print("[red]❌ Pipeline configuration is invalid[/red]")
531 console.print("\n[red]Errors:[/red]")
532 for error in validation.errors:
533 console.print(f" [red]• {error}[/red]")
535 if validation.warnings:
536 console.print("\n[yellow]Warnings:[/yellow]")
537 for warning in validation.warnings:
538 console.print(f" [yellow]• {warning}[/yellow]")
540 sys.exit(1)
542 except Exception as e:
543 console.print(f"[red]❌ Error: {e}[/red]")
544 sys.exit(1)
547@cli.command()
548@click.option(
549 "--checkpoint-dir",
550 type=click.Path(exists=True, path_type=Path),
551 default=".checkpoints",
552 help="Checkpoint directory to list (default: .checkpoints)",
553)
554def list_checkpoints(checkpoint_dir: Path):
555 """
556 List available checkpoints.
558 Shows all saved checkpoints in the specified directory.
560 Examples:
562 # List checkpoints
563 llm-dataset list-checkpoints
565 # List from custom directory
566 llm-dataset list-checkpoints --checkpoint-dir /path/to/checkpoints
567 """
568 try:
569 from llm_dataset_engine.adapters import LocalFileCheckpointStorage
571 console.print(f"[cyan]Scanning {checkpoint_dir} for checkpoints...[/cyan]")
573 storage = LocalFileCheckpointStorage(checkpoint_dir)
574 checkpoints = storage.list_checkpoints()
576 if not checkpoints:
577 console.print("[yellow]No checkpoints found[/yellow]")
578 return
580 # Display checkpoints
581 table = Table(title=f"Checkpoints in {checkpoint_dir}")
582 table.add_column("Session ID", style="cyan")
583 table.add_column("Row", style="green")
584 table.add_column("Stage", style="green")
585 table.add_column("Timestamp", style="yellow")
586 table.add_column("Size", style="magenta")
588 for cp in checkpoints:
589 table.add_row(
590 str(cp.session_id)[:8] + "...",
591 str(cp.row_index),
592 str(cp.stage_index),
593 cp.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
594 f"{cp.size_bytes:,} bytes",
595 )
597 console.print("\n")
598 console.print(table)
599 console.print(f"\n[cyan]Total checkpoints: {len(checkpoints)}[/cyan]")
601 except Exception as e:
602 console.print(f"[red]❌ Error: {e}[/red]")
603 sys.exit(1)
606@cli.command()
607@click.option(
608 "--input",
609 "-i",
610 required=True,
611 type=click.Path(exists=True, path_type=Path),
612 help="Path to input file to inspect",
613)
614@click.option(
615 "--head",
616 type=int,
617 default=5,
618 help="Number of rows to show (default: 5)",
619)
620def inspect(input: Path, head: int):
621 """
622 Inspect input data file.
624 Shows file info and preview of first N rows.
626 Examples:
628 # Inspect CSV file
629 llm-dataset inspect -i data.csv
631 # Show first 10 rows
632 llm-dataset inspect -i data.csv --head 10
633 """
634 try:
635 import pandas as pd
637 console.print(f"[cyan]Inspecting {input}...[/cyan]")
639 # Detect file type
640 suffix = input.suffix.lower()
642 if suffix == ".csv":
643 df = pd.read_csv(input)
644 elif suffix in [".xlsx", ".xls"]:
645 df = pd.read_excel(input)
646 elif suffix == ".parquet":
647 df = pd.read_parquet(input)
648 else:
649 console.print(f"[red]❌ Unsupported file type: {suffix}[/red]")
650 sys.exit(1)
652 # File info
653 info_table = Table(title="File Information")
654 info_table.add_column("Property", style="cyan")
655 info_table.add_column("Value", style="green")
657 info_table.add_row("File Path", str(input))
658 info_table.add_row("File Type", suffix[1:].upper())
659 info_table.add_row("Total Rows", f"{len(df):,}")
660 info_table.add_row("Total Columns", str(len(df.columns)))
661 info_table.add_row("Memory Usage", f"{df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
663 console.print("\n")
664 console.print(info_table)
666 # Columns
667 console.print("\n[cyan]Columns:[/cyan]")
668 for col in df.columns:
669 dtype = df[col].dtype
670 null_count = df[col].isnull().sum()
671 console.print(f" • {col} ({dtype}) - {null_count} nulls")
673 # Preview
674 console.print(f"\n[cyan]First {head} rows:[/cyan]")
675 console.print(df.head(head).to_string())
677 except Exception as e:
678 console.print(f"[red]❌ Error: {e}[/red]")
679 sys.exit(1)
682if __name__ == "__main__":
683 cli()