Coverage for llm_dataset_engine/api/pipeline.py: 22%

127 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Main Pipeline class - the Facade for the entire system. 

3 

4This is the primary entry point that users interact with. 

5""" 

6 

7from datetime import datetime 

8from decimal import Decimal 

9from typing import Any, Iterator, List 

10from uuid import UUID, uuid4 

11 

12import pandas as pd 

13 

14from llm_dataset_engine.adapters import ( 

15 LocalFileCheckpointStorage, 

16 create_llm_client, 

17) 

18from llm_dataset_engine.core.models import ( 

19 CostEstimate, 

20 ExecutionResult, 

21 ValidationResult, 

22) 

23from llm_dataset_engine.core.specifications import ( 

24 DatasetSpec, 

25 LLMSpec, 

26 OutputSpec, 

27 PipelineSpecifications, 

28 ProcessingSpec, 

29 PromptSpec, 

30) 

31from llm_dataset_engine.orchestration import ( 

32 AsyncExecutor, 

33 CostTrackingObserver, 

34 ExecutionContext, 

35 ExecutionObserver, 

36 ExecutionStrategy, 

37 LoggingObserver, 

38 ProgressBarObserver, 

39 StateManager, 

40 StreamingExecutor, 

41 SyncExecutor, 

42) 

43from llm_dataset_engine.stages import ( 

44 DataLoaderStage, 

45 LLMInvocationStage, 

46 PromptFormatterStage, 

47 RawTextParser, 

48 ResponseParserStage, 

49 ResultWriterStage, 

50) 

51from llm_dataset_engine.utils import RateLimiter, RetryHandler, get_logger 

52 

53logger = get_logger(__name__) 

54 

55 

56class Pipeline: 

57 """ 

58 Main pipeline class - Facade for dataset processing. 

59  

60 Provides high-level interface for building and executing 

61 LLM-powered data transformations. 

62  

63 Example: 

64 pipeline = Pipeline(specifications) 

65 result = pipeline.execute() 

66 """ 

67 

68 def __init__( 

69 self, 

70 specifications: PipelineSpecifications, 

71 dataframe: pd.DataFrame | None = None, 

72 executor: ExecutionStrategy | None = None, 

73 ): 

74 """ 

75 Initialize pipeline with specifications. 

76 

77 Args: 

78 specifications: Complete pipeline configuration 

79 dataframe: Optional pre-loaded DataFrame 

80 executor: Optional execution strategy (default: SyncExecutor) 

81 """ 

82 self.id = uuid4() 

83 self.specifications = specifications 

84 self.dataframe = dataframe 

85 self.executor = executor or SyncExecutor() 

86 self.observers: List[ExecutionObserver] = [] 

87 self.logger = get_logger(f"{__name__}.{self.id}") 

88 

89 def add_observer(self, observer: ExecutionObserver) -> "Pipeline": 

90 """ 

91 Add execution observer. 

92 

93 Args: 

94 observer: Observer to add 

95 

96 Returns: 

97 Self for chaining 

98 """ 

99 self.observers.append(observer) 

100 return self 

101 

102 def validate(self) -> ValidationResult: 

103 """ 

104 Validate pipeline configuration. 

105 

106 Returns: 

107 ValidationResult with any errors/warnings 

108 """ 

109 result = ValidationResult(is_valid=True) 

110 

111 # Validate dataset spec 

112 if not self.specifications.dataset.input_columns: 

113 result.add_error("No input columns specified") 

114 

115 if not self.specifications.dataset.output_columns: 

116 result.add_error("No output columns specified") 

117 

118 # Validate prompt spec 

119 if not self.specifications.prompt.template: 

120 result.add_error("No prompt template specified") 

121 

122 # Validate LLM spec 

123 if not self.specifications.llm.model: 

124 result.add_error("No LLM model specified") 

125 

126 return result 

127 

128 def estimate_cost(self) -> CostEstimate: 

129 """ 

130 Estimate total processing cost. 

131 

132 Returns: 

133 Cost estimate 

134 """ 

135 # Create stages 

136 loader = DataLoaderStage(self.dataframe) 

137 

138 # Load first few rows for estimation 

139 df = loader.process(self.specifications.dataset, ExecutionContext()) 

140 sample_size = min(10, len(df)) 

141 sample_df = df.head(sample_size) 

142 

143 # Create formatter and get prompts 

144 formatter = PromptFormatterStage( 

145 self.specifications.processing.batch_size 

146 ) 

147 batches = formatter.process( 

148 (sample_df, self.specifications.prompt), ExecutionContext() 

149 ) 

150 

151 # Create LLM client and estimate 

152 llm_client = create_llm_client(self.specifications.llm) 

153 llm_stage = LLMInvocationStage(llm_client) 

154 

155 sample_estimate = llm_stage.estimate_cost(batches) 

156 

157 # Scale to full dataset 

158 scale_factor = Decimal(len(df)) / Decimal(sample_size) 

159 

160 return CostEstimate( 

161 total_cost=sample_estimate.total_cost * scale_factor, 

162 total_tokens=int(sample_estimate.total_tokens * float(scale_factor)), 

163 input_tokens=int(sample_estimate.input_tokens * float(scale_factor)), 

164 output_tokens=int(sample_estimate.output_tokens * float(scale_factor)), 

165 rows=len(df), 

166 confidence="sample-based", 

167 ) 

168 

169 def execute( 

170 self, resume_from: UUID | None = None 

171 ) -> ExecutionResult: 

172 """ 

173 Execute pipeline end-to-end. 

174 

175 Args: 

176 resume_from: Optional session ID to resume from checkpoint 

177 

178 Returns: 

179 ExecutionResult with data and metrics 

180 """ 

181 # Validate first 

182 validation = self.validate() 

183 if not validation.is_valid: 

184 raise ValueError(f"Pipeline validation failed: {validation.errors}") 

185 

186 # Create or restore execution context 

187 state_manager = StateManager( 

188 storage=LocalFileCheckpointStorage( 

189 self.specifications.processing.checkpoint_dir 

190 ), 

191 checkpoint_interval=self.specifications.processing.checkpoint_interval, 

192 ) 

193 

194 if resume_from: 

195 # Resume from checkpoint 

196 context = state_manager.load_checkpoint(resume_from) 

197 if not context: 

198 raise ValueError( 

199 f"No checkpoint found for session {resume_from}" 

200 ) 

201 self.logger.info( 

202 f"Resuming from checkpoint at row {context.last_processed_row}" 

203 ) 

204 else: 

205 # Create new context 

206 context = ExecutionContext(pipeline_id=self.id) 

207 

208 # Add default observers if none specified 

209 if not self.observers: 

210 self.observers = [ 

211 ProgressBarObserver(), 

212 LoggingObserver(), 

213 CostTrackingObserver(), 

214 ] 

215 

216 # Notify observers of start 

217 for observer in self.observers: 

218 observer.on_pipeline_start(self, context) 

219 

220 try: 

221 # Execute stages 

222 result_df = self._execute_stages(context, state_manager) 

223 

224 # Mark completion 

225 context.end_time = datetime.now() 

226 

227 # Create execution result 

228 result = ExecutionResult( 

229 data=result_df, 

230 metrics=context.get_stats(), 

231 costs=CostEstimate( 

232 total_cost=context.accumulated_cost, 

233 total_tokens=context.accumulated_tokens, 

234 input_tokens=0, 

235 output_tokens=0, 

236 rows=context.total_rows, 

237 confidence="actual", 

238 ), 

239 execution_id=context.session_id, 

240 start_time=context.start_time, 

241 end_time=context.end_time, 

242 success=True, 

243 ) 

244 

245 # Cleanup checkpoints on success 

246 state_manager.cleanup_checkpoints(context.session_id) 

247 

248 # Notify observers of completion 

249 for observer in self.observers: 

250 observer.on_pipeline_complete(context, result) 

251 

252 return result 

253 

254 except Exception as e: 

255 # Save checkpoint on error 

256 state_manager.save_checkpoint(context) 

257 self.logger.error( 

258 f"Pipeline failed. Checkpoint saved. " 

259 f"Resume with: pipeline.execute(resume_from=UUID('{context.session_id}'))" 

260 ) 

261 

262 # Notify observers of error 

263 for observer in self.observers: 

264 observer.on_pipeline_error(context, e) 

265 raise 

266 

267 def _execute_stages( 

268 self, context: ExecutionContext, state_manager: StateManager 

269 ) -> pd.DataFrame: 

270 """Execute all pipeline stages with checkpointing.""" 

271 specs = self.specifications 

272 

273 # Create budget controller if max_budget specified 

274 budget_controller = None 

275 if specs.processing.max_budget: 

276 from llm_dataset_engine.utils import BudgetController 

277 

278 budget_controller = BudgetController( 

279 max_budget=specs.processing.max_budget, 

280 warn_at_75=True, 

281 warn_at_90=True, 

282 fail_on_exceed=True, 

283 ) 

284 

285 # Stage 1: Load data 

286 loader = DataLoaderStage(self.dataframe) 

287 self._execute_stage(loader, specs.dataset, context) 

288 df = context.intermediate_data["loaded_data"] = ( 

289 loader.process(specs.dataset, context) 

290 ) 

291 

292 # Stage 2: Format prompts 

293 formatter = PromptFormatterStage(specs.processing.batch_size) 

294 self._execute_stage(formatter, (df, specs.prompt), context) 

295 batches = context.intermediate_data["prompt_batches"] = ( 

296 formatter.process((df, specs.prompt), context) 

297 ) 

298 

299 # Stage 3: Invoke LLM 

300 llm_client = create_llm_client(specs.llm) 

301 rate_limiter = ( 

302 RateLimiter(specs.processing.rate_limit_rpm) 

303 if specs.processing.rate_limit_rpm 

304 else None 

305 ) 

306 retry_handler = RetryHandler( 

307 max_attempts=specs.processing.max_retries, 

308 initial_delay=specs.processing.retry_delay, 

309 ) 

310 

311 llm_stage = LLMInvocationStage( 

312 llm_client, 

313 concurrency=specs.processing.concurrency, 

314 rate_limiter=rate_limiter, 

315 retry_handler=retry_handler, 

316 error_policy=specs.processing.error_policy, 

317 max_retries=specs.processing.max_retries, 

318 ) 

319 self._execute_stage(llm_stage, batches, context) 

320 response_batches = context.intermediate_data["response_batches"] = ( 

321 llm_stage.process(batches, context) 

322 ) 

323 

324 # Check budget after LLM invocation 

325 if budget_controller: 

326 budget_controller.check_budget(context.accumulated_cost) 

327 

328 # Save checkpoint after expensive LLM stage 

329 if state_manager.should_checkpoint(context.last_processed_row): 

330 state_manager.save_checkpoint(context) 

331 

332 # Stage 4: Parse responses 

333 parser_stage = ResponseParserStage( 

334 parser=RawTextParser(), 

335 output_columns=specs.dataset.output_columns, 

336 ) 

337 self._execute_stage( 

338 parser_stage, 

339 (response_batches, specs.dataset.output_columns), 

340 context, 

341 ) 

342 results_df = parser_stage.process( 

343 (response_batches, specs.dataset.output_columns), context 

344 ) 

345 

346 # Stage 5: Write results (if output spec provided) 

347 if specs.output: 

348 writer = ResultWriterStage() 

349 self._execute_stage( 

350 writer, (df, results_df, specs.output), context 

351 ) 

352 final_df = writer.process((df, results_df, specs.output), context) 

353 return final_df 

354 else: 

355 # Merge results with original 

356 for col in results_df.columns: 

357 df[col] = results_df[col] 

358 return df 

359 

360 async def execute_async( 

361 self, resume_from: UUID | None = None 

362 ) -> ExecutionResult: 

363 """ 

364 Execute pipeline asynchronously. 

365  

366 Uses AsyncExecutor for non-blocking execution. Ideal for integration 

367 with FastAPI, aiohttp, and other async frameworks. 

368 

369 Args: 

370 resume_from: Optional session ID to resume from checkpoint 

371 

372 Returns: 

373 ExecutionResult with data and metrics 

374  

375 Raises: 

376 ValueError: If executor doesn't support async 

377 """ 

378 if not self.executor.supports_async(): 

379 raise ValueError( 

380 "Current executor doesn't support async. " 

381 "Use AsyncExecutor: Pipeline(specs, executor=AsyncExecutor())" 

382 ) 

383 

384 # Use executor's async execute method 

385 return await self.executor.execute([], ExecutionContext()) 

386 

387 def execute_stream( 

388 self, chunk_size: int = 1000 

389 ) -> Iterator[pd.DataFrame]: 

390 """ 

391 Execute pipeline in streaming mode. 

392  

393 Processes data in chunks for memory-efficient handling of large datasets. 

394 Ideal for datasets that don't fit in memory. 

395 

396 Args: 

397 chunk_size: Number of rows per chunk 

398 

399 Yields: 

400 DataFrames with processed chunks 

401  

402 Raises: 

403 ValueError: If executor doesn't support streaming 

404 """ 

405 if not self.executor.supports_streaming(): 

406 raise ValueError( 

407 "Current executor doesn't support streaming. " 

408 f"Use StreamingExecutor: Pipeline(specs, executor=StreamingExecutor({chunk_size}))" 

409 ) 

410 

411 # Use executor's streaming execute method 

412 return self.executor.execute([], ExecutionContext()) 

413 

414 def _execute_stage( 

415 self, stage: Any, input_data: Any, context: ExecutionContext 

416 ) -> None: 

417 """Execute a single stage with observer notifications.""" 

418 # Notify observers of stage start 

419 for observer in self.observers: 

420 observer.on_stage_start(stage, context) 

421 

422 try: 

423 # Execute stage 

424 result = stage.execute(input_data, context) 

425 

426 # Notify observers of completion 

427 for observer in self.observers: 

428 observer.on_stage_complete(stage, context, result) 

429 

430 except Exception as e: 

431 # Notify observers of error 

432 for observer in self.observers: 

433 observer.on_stage_error(stage, context, e) 

434 raise 

435