Coverage for llm_dataset_engine/integrations/prefect.py: 0%

42 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Prefect integration - Pre-built tasks for Prefect workflows. 

3 

4Provides llm_transform_task for easy integration into Prefect flows. 

5""" 

6 

7from pathlib import Path 

8from typing import Optional 

9 

10import pandas as pd 

11 

12try: 

13 from prefect import task 

14 

15 PREFECT_AVAILABLE = True 

16except ImportError: 

17 PREFECT_AVAILABLE = False 

18 

19 # Placeholder decorator 

20 def task(*args, **kwargs): 

21 def decorator(func): 

22 return func 

23 return decorator 

24 

25from llm_dataset_engine.api import Pipeline 

26from llm_dataset_engine.config import ConfigLoader 

27 

28 

29@task(name="llm_transform") 

30def llm_transform_task( 

31 config_path: str, 

32 input_data: Optional[pd.DataFrame] = None, 

33 input_file: Optional[str] = None, 

34 output_file: Optional[str] = None, 

35 max_budget: Optional[float] = None, 

36 provider_override: Optional[str] = None, 

37 model_override: Optional[str] = None, 

38) -> pd.DataFrame: 

39 """ 

40 Prefect task for LLM dataset transformations. 

41  

42 Integrates LLM Dataset Engine into Prefect flows. 

43  

44 Example: 

45 from prefect import flow 

46 from llm_dataset_engine.integrations.prefect import llm_transform_task 

47  

48 @flow 

49 def data_pipeline(): 

50 raw_data = load_data() 

51 enriched = llm_transform_task( 

52 config_path='configs/llm_config.yaml', 

53 input_data=raw_data, 

54 max_budget=10.0, 

55 ) 

56 save_data(enriched) 

57 

58 Args: 

59 config_path: Path to YAML/JSON configuration 

60 input_data: Input DataFrame (from previous task) 

61 input_file: Path to input file (alternative to input_data) 

62 output_file: Path to output file (optional) 

63 max_budget: Override maximum budget 

64 provider_override: Override LLM provider 

65 model_override: Override model name 

66 

67 Returns: 

68 Result DataFrame 

69  

70 Raises: 

71 ValueError: If neither input_data nor input_file provided 

72 """ 

73 if not PREFECT_AVAILABLE: 

74 raise ImportError( 

75 "Prefect is required to use llm_transform_task. " 

76 "Install with: pip install prefect" 

77 ) 

78 

79 # Load configuration 

80 specs = ConfigLoader.from_yaml(config_path) 

81 

82 # Override settings 

83 if max_budget is not None: 

84 from decimal import Decimal 

85 specs.processing.max_budget = Decimal(str(max_budget)) 

86 

87 if provider_override: 

88 from llm_dataset_engine.core.specifications import LLMProvider 

89 specs.llm.provider = LLMProvider(provider_override) 

90 

91 if model_override: 

92 specs.llm.model = model_override 

93 

94 # Get input 

95 if input_data is not None: 

96 pipeline = Pipeline(specs, dataframe=input_data) 

97 elif input_file: 

98 specs.dataset.source_path = Path(input_file) 

99 pipeline = Pipeline(specs) 

100 else: 

101 raise ValueError("Either input_data or input_file required") 

102 

103 # Set output if specified 

104 if output_file: 

105 from llm_dataset_engine.core.specifications import ( 

106 DataSourceType, 

107 MergeStrategy, 

108 OutputSpec, 

109 ) 

110 

111 specs.output = OutputSpec( 

112 destination_type=DataSourceType.CSV, 

113 destination_path=Path(output_file), 

114 merge_strategy=MergeStrategy.REPLACE, 

115 ) 

116 

117 # Execute 

118 result = pipeline.execute() 

119 

120 # Log metrics (Prefect will capture) 

121 print(f"✅ Processed {result.metrics.total_rows} rows") 

122 print(f"💰 Cost: ${result.costs.total_cost}") 

123 print(f"⏱️ Duration: {result.duration:.2f}s") 

124 

125 return result.data 

126 

127 

128__all__ = ["llm_transform_task"] 

129