Coverage for llm_dataset_engine/integrations/prefect.py: 0%
42 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Prefect integration - Pre-built tasks for Prefect workflows.
4Provides llm_transform_task for easy integration into Prefect flows.
5"""
7from pathlib import Path
8from typing import Optional
10import pandas as pd
12try:
13 from prefect import task
15 PREFECT_AVAILABLE = True
16except ImportError:
17 PREFECT_AVAILABLE = False
19 # Placeholder decorator
20 def task(*args, **kwargs):
21 def decorator(func):
22 return func
23 return decorator
25from llm_dataset_engine.api import Pipeline
26from llm_dataset_engine.config import ConfigLoader
29@task(name="llm_transform")
30def llm_transform_task(
31 config_path: str,
32 input_data: Optional[pd.DataFrame] = None,
33 input_file: Optional[str] = None,
34 output_file: Optional[str] = None,
35 max_budget: Optional[float] = None,
36 provider_override: Optional[str] = None,
37 model_override: Optional[str] = None,
38) -> pd.DataFrame:
39 """
40 Prefect task for LLM dataset transformations.
42 Integrates LLM Dataset Engine into Prefect flows.
44 Example:
45 from prefect import flow
46 from llm_dataset_engine.integrations.prefect import llm_transform_task
48 @flow
49 def data_pipeline():
50 raw_data = load_data()
51 enriched = llm_transform_task(
52 config_path='configs/llm_config.yaml',
53 input_data=raw_data,
54 max_budget=10.0,
55 )
56 save_data(enriched)
58 Args:
59 config_path: Path to YAML/JSON configuration
60 input_data: Input DataFrame (from previous task)
61 input_file: Path to input file (alternative to input_data)
62 output_file: Path to output file (optional)
63 max_budget: Override maximum budget
64 provider_override: Override LLM provider
65 model_override: Override model name
67 Returns:
68 Result DataFrame
70 Raises:
71 ValueError: If neither input_data nor input_file provided
72 """
73 if not PREFECT_AVAILABLE:
74 raise ImportError(
75 "Prefect is required to use llm_transform_task. "
76 "Install with: pip install prefect"
77 )
79 # Load configuration
80 specs = ConfigLoader.from_yaml(config_path)
82 # Override settings
83 if max_budget is not None:
84 from decimal import Decimal
85 specs.processing.max_budget = Decimal(str(max_budget))
87 if provider_override:
88 from llm_dataset_engine.core.specifications import LLMProvider
89 specs.llm.provider = LLMProvider(provider_override)
91 if model_override:
92 specs.llm.model = model_override
94 # Get input
95 if input_data is not None:
96 pipeline = Pipeline(specs, dataframe=input_data)
97 elif input_file:
98 specs.dataset.source_path = Path(input_file)
99 pipeline = Pipeline(specs)
100 else:
101 raise ValueError("Either input_data or input_file required")
103 # Set output if specified
104 if output_file:
105 from llm_dataset_engine.core.specifications import (
106 DataSourceType,
107 MergeStrategy,
108 OutputSpec,
109 )
111 specs.output = OutputSpec(
112 destination_type=DataSourceType.CSV,
113 destination_path=Path(output_file),
114 merge_strategy=MergeStrategy.REPLACE,
115 )
117 # Execute
118 result = pipeline.execute()
120 # Log metrics (Prefect will capture)
121 print(f"✅ Processed {result.metrics.total_rows} rows")
122 print(f"💰 Cost: ${result.costs.total_cost}")
123 print(f"⏱️ Duration: {result.duration:.2f}s")
125 return result.data
128__all__ = ["llm_transform_task"]