Coverage for llm_dataset_engine/api/pipeline_builder.py: 82%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Pipeline Builder - Fluent API for constructing pipelines. 

3 

4Implements Builder pattern for intuitive pipeline creation. 

5""" 

6 

7from decimal import Decimal 

8from pathlib import Path 

9from typing import List, Optional 

10 

11import pandas as pd 

12 

13from llm_dataset_engine.api.pipeline import Pipeline 

14from llm_dataset_engine.core.specifications import ( 

15 DatasetSpec, 

16 DataSourceType, 

17 LLMProvider, 

18 LLMSpec, 

19 MergeStrategy, 

20 OutputSpec, 

21 PipelineSpecifications, 

22 ProcessingSpec, 

23 PromptSpec, 

24) 

25from llm_dataset_engine.orchestration import ( 

26 AsyncExecutor, 

27 ExecutionStrategy, 

28 StreamingExecutor, 

29 SyncExecutor, 

30) 

31 

32 

33class PipelineBuilder: 

34 """ 

35 Fluent builder for constructing pipelines. 

36  

37 Provides an intuitive, chainable API for common use cases. 

38  

39 Example: 

40 pipeline = ( 

41 PipelineBuilder.create() 

42 .from_csv("data.csv", input_columns=["text"], output_columns=["result"]) 

43 .with_prompt("Process: {text}") 

44 .with_llm(provider="openai", model="gpt-4o-mini") 

45 .build() 

46 ) 

47 """ 

48 

49 def __init__(self): 

50 """Initialize builder with None values.""" 

51 self._dataset_spec: Optional[DatasetSpec] = None 

52 self._prompt_spec: Optional[PromptSpec] = None 

53 self._llm_spec: Optional[LLMSpec] = None 

54 self._processing_spec: ProcessingSpec = ProcessingSpec() 

55 self._output_spec: Optional[OutputSpec] = None 

56 self._dataframe: Optional[pd.DataFrame] = None 

57 self._executor: Optional[ExecutionStrategy] = None 

58 

59 @staticmethod 

60 def create() -> "PipelineBuilder": 

61 """ 

62 Start builder chain. 

63 

64 Returns: 

65 New PipelineBuilder instance 

66 """ 

67 return PipelineBuilder() 

68 

69 def from_csv( 

70 self, 

71 path: str, 

72 input_columns: List[str], 

73 output_columns: List[str], 

74 delimiter: str = ",", 

75 encoding: str = "utf-8", 

76 ) -> "PipelineBuilder": 

77 """ 

78 Configure CSV data source. 

79 

80 Args: 

81 path: Path to CSV file 

82 input_columns: Input column names 

83 output_columns: Output column names 

84 delimiter: CSV delimiter 

85 encoding: File encoding 

86 

87 Returns: 

88 Self for chaining 

89 """ 

90 self._dataset_spec = DatasetSpec( 

91 source_type=DataSourceType.CSV, 

92 source_path=Path(path), 

93 input_columns=input_columns, 

94 output_columns=output_columns, 

95 delimiter=delimiter, 

96 encoding=encoding, 

97 ) 

98 return self 

99 

100 def from_excel( 

101 self, 

102 path: str, 

103 input_columns: List[str], 

104 output_columns: List[str], 

105 sheet_name: str | int = 0, 

106 ) -> "PipelineBuilder": 

107 """ 

108 Configure Excel data source. 

109 

110 Args: 

111 path: Path to Excel file 

112 input_columns: Input column names 

113 output_columns: Output column names 

114 sheet_name: Sheet name or index 

115 

116 Returns: 

117 Self for chaining 

118 """ 

119 self._dataset_spec = DatasetSpec( 

120 source_type=DataSourceType.EXCEL, 

121 source_path=Path(path), 

122 input_columns=input_columns, 

123 output_columns=output_columns, 

124 sheet_name=sheet_name, 

125 ) 

126 return self 

127 

128 def from_parquet( 

129 self, 

130 path: str, 

131 input_columns: List[str], 

132 output_columns: List[str], 

133 ) -> "PipelineBuilder": 

134 """ 

135 Configure Parquet data source. 

136 

137 Args: 

138 path: Path to Parquet file 

139 input_columns: Input column names 

140 output_columns: Output column names 

141 

142 Returns: 

143 Self for chaining 

144 """ 

145 self._dataset_spec = DatasetSpec( 

146 source_type=DataSourceType.PARQUET, 

147 source_path=Path(path), 

148 input_columns=input_columns, 

149 output_columns=output_columns, 

150 ) 

151 return self 

152 

153 def from_dataframe( 

154 self, 

155 df: pd.DataFrame, 

156 input_columns: List[str], 

157 output_columns: List[str], 

158 ) -> "PipelineBuilder": 

159 """ 

160 Configure DataFrame source. 

161 

162 Args: 

163 df: Pandas DataFrame 

164 input_columns: Input column names 

165 output_columns: Output column names 

166 

167 Returns: 

168 Self for chaining 

169 """ 

170 self._dataset_spec = DatasetSpec( 

171 source_type=DataSourceType.DATAFRAME, 

172 input_columns=input_columns, 

173 output_columns=output_columns, 

174 ) 

175 self._dataframe = df 

176 return self 

177 

178 def with_prompt( 

179 self, 

180 template: str, 

181 system_message: Optional[str] = None, 

182 ) -> "PipelineBuilder": 

183 """ 

184 Configure prompt template. 

185 

186 Args: 

187 template: Prompt template with {variable} placeholders 

188 system_message: Optional system message 

189 

190 Returns: 

191 Self for chaining 

192 """ 

193 self._prompt_spec = PromptSpec( 

194 template=template, 

195 system_message=system_message, 

196 ) 

197 return self 

198 

199 def with_llm( 

200 self, 

201 provider: str, 

202 model: str, 

203 api_key: Optional[str] = None, 

204 temperature: float = 0.0, 

205 max_tokens: Optional[int] = None, 

206 **kwargs: any, 

207 ) -> "PipelineBuilder": 

208 """ 

209 Configure LLM provider. 

210 

211 Args: 

212 provider: Provider name (openai, azure_openai, anthropic) 

213 model: Model identifier 

214 api_key: API key (or from env) 

215 temperature: Sampling temperature 

216 max_tokens: Max output tokens 

217 **kwargs: Provider-specific parameters 

218 

219 Returns: 

220 Self for chaining 

221 """ 

222 provider_enum = LLMProvider(provider.lower()) 

223 

224 self._llm_spec = LLMSpec( 

225 provider=provider_enum, 

226 model=model, 

227 api_key=api_key, 

228 temperature=temperature, 

229 max_tokens=max_tokens, 

230 **kwargs, 

231 ) 

232 return self 

233 

234 def with_batch_size(self, size: int) -> "PipelineBuilder": 

235 """ 

236 Configure batch size. 

237 

238 Args: 

239 size: Rows per batch 

240 

241 Returns: 

242 Self for chaining 

243 """ 

244 self._processing_spec.batch_size = size 

245 return self 

246 

247 def with_concurrency(self, threads: int) -> "PipelineBuilder": 

248 """ 

249 Configure concurrent requests. 

250 

251 Args: 

252 threads: Number of concurrent threads 

253 

254 Returns: 

255 Self for chaining 

256 """ 

257 self._processing_spec.concurrency = threads 

258 return self 

259 

260 def with_checkpoint_interval(self, rows: int) -> "PipelineBuilder": 

261 """ 

262 Configure checkpoint frequency. 

263 

264 Args: 

265 rows: Rows between checkpoints 

266 

267 Returns: 

268 Self for chaining 

269 """ 

270 self._processing_spec.checkpoint_interval = rows 

271 return self 

272 

273 def with_rate_limit(self, rpm: int) -> "PipelineBuilder": 

274 """ 

275 Configure rate limiting. 

276 

277 Args: 

278 rpm: Requests per minute 

279 

280 Returns: 

281 Self for chaining 

282 """ 

283 self._processing_spec.rate_limit_rpm = rpm 

284 return self 

285 

286 def with_max_budget(self, budget: float) -> "PipelineBuilder": 

287 """ 

288 Configure maximum budget. 

289 

290 Args: 

291 budget: Maximum budget in USD 

292 

293 Returns: 

294 Self for chaining 

295 """ 

296 self._processing_spec.max_budget = Decimal(str(budget)) 

297 return self 

298 

299 def with_output( 

300 self, 

301 path: str, 

302 format: str = "csv", 

303 merge_strategy: str = "replace", 

304 ) -> "PipelineBuilder": 

305 """ 

306 Configure output destination. 

307 

308 Args: 

309 path: Output file path 

310 format: Output format (csv, excel, parquet) 

311 merge_strategy: Merge strategy (replace, append, update) 

312 

313 Returns: 

314 Self for chaining 

315 """ 

316 format_map = { 

317 "csv": DataSourceType.CSV, 

318 "excel": DataSourceType.EXCEL, 

319 "parquet": DataSourceType.PARQUET, 

320 } 

321 

322 merge_map = { 

323 "replace": MergeStrategy.REPLACE, 

324 "append": MergeStrategy.APPEND, 

325 "update": MergeStrategy.UPDATE, 

326 } 

327 

328 self._output_spec = OutputSpec( 

329 destination_type=format_map[format.lower()], 

330 destination_path=Path(path), 

331 merge_strategy=merge_map[merge_strategy.lower()], 

332 ) 

333 return self 

334 

335 def with_executor(self, executor: ExecutionStrategy) -> "PipelineBuilder": 

336 """ 

337 Set custom execution strategy. 

338 

339 Args: 

340 executor: ExecutionStrategy instance 

341 

342 Returns: 

343 Self for chaining 

344 """ 

345 self._executor = executor 

346 return self 

347 

348 def with_async_execution( 

349 self, max_concurrency: int = 10 

350 ) -> "PipelineBuilder": 

351 """ 

352 Use async execution strategy. 

353  

354 Enables async/await for non-blocking execution. 

355 Ideal for FastAPI, aiohttp, and async frameworks. 

356 

357 Args: 

358 max_concurrency: Maximum concurrent async tasks 

359 

360 Returns: 

361 Self for chaining 

362 """ 

363 self._executor = AsyncExecutor(max_concurrency=max_concurrency) 

364 return self 

365 

366 def with_streaming(self, chunk_size: int = 1000) -> "PipelineBuilder": 

367 """ 

368 Use streaming execution strategy. 

369  

370 Processes data in chunks for memory-efficient handling. 

371 Ideal for large datasets (100K+ rows). 

372 

373 Args: 

374 chunk_size: Number of rows per chunk 

375 

376 Returns: 

377 Self for chaining 

378 """ 

379 self._executor = StreamingExecutor(chunk_size=chunk_size) 

380 return self 

381 

382 def build(self) -> Pipeline: 

383 """ 

384 Build final Pipeline. 

385 

386 Returns: 

387 Configured Pipeline 

388 

389 Raises: 

390 ValueError: If required specifications missing 

391 """ 

392 # Validate required specs 

393 if not self._dataset_spec: 

394 raise ValueError("Dataset specification required") 

395 if not self._prompt_spec: 

396 raise ValueError("Prompt specification required") 

397 if not self._llm_spec: 

398 raise ValueError("LLM specification required") 

399 

400 # Create specifications bundle 

401 specifications = PipelineSpecifications( 

402 dataset=self._dataset_spec, 

403 prompt=self._prompt_spec, 

404 llm=self._llm_spec, 

405 processing=self._processing_spec, 

406 output=self._output_spec, 

407 ) 

408 

409 # Create and return pipeline 

410 return Pipeline( 

411 specifications, 

412 dataframe=self._dataframe, 

413 executor=self._executor, 

414 ) 

415