Coverage for llm_dataset_engine/api/dataset_processor.py: 21%

38 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2DatasetProcessor - Simplified convenience wrapper. 

3 

4For users who just want to process data with minimal configuration. 

5""" 

6 

7from typing import Dict, Optional 

8 

9import pandas as pd 

10 

11from llm_dataset_engine.api.pipeline_builder import PipelineBuilder 

12 

13 

14class DatasetProcessor: 

15 """ 

16 Simplified API for single-prompt, single-column use cases. 

17  

18 This is a convenience wrapper around PipelineBuilder for users 

19 who don't need fine-grained control. 

20  

21 Example: 

22 processor = DatasetProcessor( 

23 data="data.csv", 

24 input_column="description", 

25 output_column="cleaned", 

26 prompt="Clean this text: {description}", 

27 llm_config={"provider": "openai", "model": "gpt-4o-mini"} 

28 ) 

29 result = processor.run() 

30 """ 

31 

32 def __init__( 

33 self, 

34 data: str | pd.DataFrame, 

35 input_column: str, 

36 output_column: str, 

37 prompt: str, 

38 llm_config: Dict[str, any], 

39 ): 

40 """ 

41 Initialize dataset processor. 

42 

43 Args: 

44 data: CSV file path or DataFrame 

45 input_column: Input column name 

46 output_column: Output column name 

47 prompt: Prompt template 

48 llm_config: LLM configuration dict 

49 """ 

50 self.data = data 

51 self.input_column = input_column 

52 self.output_column = output_column 

53 self.prompt = prompt 

54 self.llm_config = llm_config 

55 

56 # Build pipeline internally 

57 builder = PipelineBuilder.create() 

58 

59 # Configure data source 

60 if isinstance(data, str): 

61 builder.from_csv( 

62 data, 

63 input_columns=[input_column], 

64 output_columns=[output_column], 

65 ) 

66 elif isinstance(data, pd.DataFrame): 

67 builder.from_dataframe( 

68 data, 

69 input_columns=[input_column], 

70 output_columns=[output_column], 

71 ) 

72 else: 

73 raise ValueError("data must be file path or DataFrame") 

74 

75 # Configure prompt 

76 builder.with_prompt(prompt) 

77 

78 # Configure LLM 

79 provider = llm_config.get("provider", "openai") 

80 model = llm_config.get("model", "gpt-4o-mini") 

81 api_key = llm_config.get("api_key") 

82 temperature = llm_config.get("temperature", 0.0) 

83 max_tokens = llm_config.get("max_tokens") 

84 

85 builder.with_llm( 

86 provider=provider, 

87 model=model, 

88 api_key=api_key, 

89 temperature=temperature, 

90 max_tokens=max_tokens, 

91 ) 

92 

93 # Build pipeline 

94 self.pipeline = builder.build() 

95 

96 def run(self) -> pd.DataFrame: 

97 """ 

98 Execute processing and return results. 

99 

100 Returns: 

101 DataFrame with results 

102 """ 

103 result = self.pipeline.execute() 

104 return result.data 

105 

106 def run_sample(self, n: int = 10) -> pd.DataFrame: 

107 """ 

108 Test on first N rows. 

109 

110 Args: 

111 n: Number of rows to process 

112 

113 Returns: 

114 DataFrame with sample results 

115 """ 

116 # Create sample pipeline 

117 if isinstance(self.data, str): 

118 df = pd.read_csv(self.data).head(n) 

119 else: 

120 df = self.data.head(n) 

121 

122 builder = ( 

123 PipelineBuilder.create() 

124 .from_dataframe( 

125 df, 

126 input_columns=[self.input_column], 

127 output_columns=[self.output_column], 

128 ) 

129 .with_prompt(self.prompt) 

130 .with_llm( 

131 provider=self.llm_config.get("provider", "openai"), 

132 model=self.llm_config.get("model", "gpt-4o-mini"), 

133 api_key=self.llm_config.get("api_key"), 

134 temperature=self.llm_config.get("temperature", 0.0), 

135 ) 

136 ) 

137 

138 sample_pipeline = builder.build() 

139 result = sample_pipeline.execute() 

140 return result.data 

141 

142 def estimate_cost(self) -> float: 

143 """ 

144 Estimate total processing cost. 

145 

146 Returns: 

147 Estimated cost in USD 

148 """ 

149 estimate = self.pipeline.estimate_cost() 

150 return float(estimate.total_cost) 

151