Coverage for llm_dataset_engine/api/pipeline.py: 22%
127 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Main Pipeline class - the Facade for the entire system.
4This is the primary entry point that users interact with.
5"""
7from datetime import datetime
8from decimal import Decimal
9from typing import Any, Iterator, List
10from uuid import UUID, uuid4
12import pandas as pd
14from llm_dataset_engine.adapters import (
15 LocalFileCheckpointStorage,
16 create_llm_client,
17)
18from llm_dataset_engine.core.models import (
19 CostEstimate,
20 ExecutionResult,
21 ValidationResult,
22)
23from llm_dataset_engine.core.specifications import (
24 DatasetSpec,
25 LLMSpec,
26 OutputSpec,
27 PipelineSpecifications,
28 ProcessingSpec,
29 PromptSpec,
30)
31from llm_dataset_engine.orchestration import (
32 AsyncExecutor,
33 CostTrackingObserver,
34 ExecutionContext,
35 ExecutionObserver,
36 ExecutionStrategy,
37 LoggingObserver,
38 ProgressBarObserver,
39 StateManager,
40 StreamingExecutor,
41 SyncExecutor,
42)
43from llm_dataset_engine.stages import (
44 DataLoaderStage,
45 LLMInvocationStage,
46 PromptFormatterStage,
47 RawTextParser,
48 ResponseParserStage,
49 ResultWriterStage,
50)
51from llm_dataset_engine.utils import RateLimiter, RetryHandler, get_logger
53logger = get_logger(__name__)
56class Pipeline:
57 """
58 Main pipeline class - Facade for dataset processing.
60 Provides high-level interface for building and executing
61 LLM-powered data transformations.
63 Example:
64 pipeline = Pipeline(specifications)
65 result = pipeline.execute()
66 """
68 def __init__(
69 self,
70 specifications: PipelineSpecifications,
71 dataframe: pd.DataFrame | None = None,
72 executor: ExecutionStrategy | None = None,
73 ):
74 """
75 Initialize pipeline with specifications.
77 Args:
78 specifications: Complete pipeline configuration
79 dataframe: Optional pre-loaded DataFrame
80 executor: Optional execution strategy (default: SyncExecutor)
81 """
82 self.id = uuid4()
83 self.specifications = specifications
84 self.dataframe = dataframe
85 self.executor = executor or SyncExecutor()
86 self.observers: List[ExecutionObserver] = []
87 self.logger = get_logger(f"{__name__}.{self.id}")
89 def add_observer(self, observer: ExecutionObserver) -> "Pipeline":
90 """
91 Add execution observer.
93 Args:
94 observer: Observer to add
96 Returns:
97 Self for chaining
98 """
99 self.observers.append(observer)
100 return self
102 def validate(self) -> ValidationResult:
103 """
104 Validate pipeline configuration.
106 Returns:
107 ValidationResult with any errors/warnings
108 """
109 result = ValidationResult(is_valid=True)
111 # Validate dataset spec
112 if not self.specifications.dataset.input_columns:
113 result.add_error("No input columns specified")
115 if not self.specifications.dataset.output_columns:
116 result.add_error("No output columns specified")
118 # Validate prompt spec
119 if not self.specifications.prompt.template:
120 result.add_error("No prompt template specified")
122 # Validate LLM spec
123 if not self.specifications.llm.model:
124 result.add_error("No LLM model specified")
126 return result
128 def estimate_cost(self) -> CostEstimate:
129 """
130 Estimate total processing cost.
132 Returns:
133 Cost estimate
134 """
135 # Create stages
136 loader = DataLoaderStage(self.dataframe)
138 # Load first few rows for estimation
139 df = loader.process(self.specifications.dataset, ExecutionContext())
140 sample_size = min(10, len(df))
141 sample_df = df.head(sample_size)
143 # Create formatter and get prompts
144 formatter = PromptFormatterStage(
145 self.specifications.processing.batch_size
146 )
147 batches = formatter.process(
148 (sample_df, self.specifications.prompt), ExecutionContext()
149 )
151 # Create LLM client and estimate
152 llm_client = create_llm_client(self.specifications.llm)
153 llm_stage = LLMInvocationStage(llm_client)
155 sample_estimate = llm_stage.estimate_cost(batches)
157 # Scale to full dataset
158 scale_factor = Decimal(len(df)) / Decimal(sample_size)
160 return CostEstimate(
161 total_cost=sample_estimate.total_cost * scale_factor,
162 total_tokens=int(sample_estimate.total_tokens * float(scale_factor)),
163 input_tokens=int(sample_estimate.input_tokens * float(scale_factor)),
164 output_tokens=int(sample_estimate.output_tokens * float(scale_factor)),
165 rows=len(df),
166 confidence="sample-based",
167 )
169 def execute(
170 self, resume_from: UUID | None = None
171 ) -> ExecutionResult:
172 """
173 Execute pipeline end-to-end.
175 Args:
176 resume_from: Optional session ID to resume from checkpoint
178 Returns:
179 ExecutionResult with data and metrics
180 """
181 # Validate first
182 validation = self.validate()
183 if not validation.is_valid:
184 raise ValueError(f"Pipeline validation failed: {validation.errors}")
186 # Create or restore execution context
187 state_manager = StateManager(
188 storage=LocalFileCheckpointStorage(
189 self.specifications.processing.checkpoint_dir
190 ),
191 checkpoint_interval=self.specifications.processing.checkpoint_interval,
192 )
194 if resume_from:
195 # Resume from checkpoint
196 context = state_manager.load_checkpoint(resume_from)
197 if not context:
198 raise ValueError(
199 f"No checkpoint found for session {resume_from}"
200 )
201 self.logger.info(
202 f"Resuming from checkpoint at row {context.last_processed_row}"
203 )
204 else:
205 # Create new context
206 context = ExecutionContext(pipeline_id=self.id)
208 # Add default observers if none specified
209 if not self.observers:
210 self.observers = [
211 ProgressBarObserver(),
212 LoggingObserver(),
213 CostTrackingObserver(),
214 ]
216 # Notify observers of start
217 for observer in self.observers:
218 observer.on_pipeline_start(self, context)
220 try:
221 # Execute stages
222 result_df = self._execute_stages(context, state_manager)
224 # Mark completion
225 context.end_time = datetime.now()
227 # Create execution result
228 result = ExecutionResult(
229 data=result_df,
230 metrics=context.get_stats(),
231 costs=CostEstimate(
232 total_cost=context.accumulated_cost,
233 total_tokens=context.accumulated_tokens,
234 input_tokens=0,
235 output_tokens=0,
236 rows=context.total_rows,
237 confidence="actual",
238 ),
239 execution_id=context.session_id,
240 start_time=context.start_time,
241 end_time=context.end_time,
242 success=True,
243 )
245 # Cleanup checkpoints on success
246 state_manager.cleanup_checkpoints(context.session_id)
248 # Notify observers of completion
249 for observer in self.observers:
250 observer.on_pipeline_complete(context, result)
252 return result
254 except Exception as e:
255 # Save checkpoint on error
256 state_manager.save_checkpoint(context)
257 self.logger.error(
258 f"Pipeline failed. Checkpoint saved. "
259 f"Resume with: pipeline.execute(resume_from=UUID('{context.session_id}'))"
260 )
262 # Notify observers of error
263 for observer in self.observers:
264 observer.on_pipeline_error(context, e)
265 raise
267 def _execute_stages(
268 self, context: ExecutionContext, state_manager: StateManager
269 ) -> pd.DataFrame:
270 """Execute all pipeline stages with checkpointing."""
271 specs = self.specifications
273 # Create budget controller if max_budget specified
274 budget_controller = None
275 if specs.processing.max_budget:
276 from llm_dataset_engine.utils import BudgetController
278 budget_controller = BudgetController(
279 max_budget=specs.processing.max_budget,
280 warn_at_75=True,
281 warn_at_90=True,
282 fail_on_exceed=True,
283 )
285 # Stage 1: Load data
286 loader = DataLoaderStage(self.dataframe)
287 self._execute_stage(loader, specs.dataset, context)
288 df = context.intermediate_data["loaded_data"] = (
289 loader.process(specs.dataset, context)
290 )
292 # Stage 2: Format prompts
293 formatter = PromptFormatterStage(specs.processing.batch_size)
294 self._execute_stage(formatter, (df, specs.prompt), context)
295 batches = context.intermediate_data["prompt_batches"] = (
296 formatter.process((df, specs.prompt), context)
297 )
299 # Stage 3: Invoke LLM
300 llm_client = create_llm_client(specs.llm)
301 rate_limiter = (
302 RateLimiter(specs.processing.rate_limit_rpm)
303 if specs.processing.rate_limit_rpm
304 else None
305 )
306 retry_handler = RetryHandler(
307 max_attempts=specs.processing.max_retries,
308 initial_delay=specs.processing.retry_delay,
309 )
311 llm_stage = LLMInvocationStage(
312 llm_client,
313 concurrency=specs.processing.concurrency,
314 rate_limiter=rate_limiter,
315 retry_handler=retry_handler,
316 error_policy=specs.processing.error_policy,
317 max_retries=specs.processing.max_retries,
318 )
319 self._execute_stage(llm_stage, batches, context)
320 response_batches = context.intermediate_data["response_batches"] = (
321 llm_stage.process(batches, context)
322 )
324 # Check budget after LLM invocation
325 if budget_controller:
326 budget_controller.check_budget(context.accumulated_cost)
328 # Save checkpoint after expensive LLM stage
329 if state_manager.should_checkpoint(context.last_processed_row):
330 state_manager.save_checkpoint(context)
332 # Stage 4: Parse responses
333 parser_stage = ResponseParserStage(
334 parser=RawTextParser(),
335 output_columns=specs.dataset.output_columns,
336 )
337 self._execute_stage(
338 parser_stage,
339 (response_batches, specs.dataset.output_columns),
340 context,
341 )
342 results_df = parser_stage.process(
343 (response_batches, specs.dataset.output_columns), context
344 )
346 # Stage 5: Write results (if output spec provided)
347 if specs.output:
348 writer = ResultWriterStage()
349 self._execute_stage(
350 writer, (df, results_df, specs.output), context
351 )
352 final_df = writer.process((df, results_df, specs.output), context)
353 return final_df
354 else:
355 # Merge results with original
356 for col in results_df.columns:
357 df[col] = results_df[col]
358 return df
360 async def execute_async(
361 self, resume_from: UUID | None = None
362 ) -> ExecutionResult:
363 """
364 Execute pipeline asynchronously.
366 Uses AsyncExecutor for non-blocking execution. Ideal for integration
367 with FastAPI, aiohttp, and other async frameworks.
369 Args:
370 resume_from: Optional session ID to resume from checkpoint
372 Returns:
373 ExecutionResult with data and metrics
375 Raises:
376 ValueError: If executor doesn't support async
377 """
378 if not self.executor.supports_async():
379 raise ValueError(
380 "Current executor doesn't support async. "
381 "Use AsyncExecutor: Pipeline(specs, executor=AsyncExecutor())"
382 )
384 # Use executor's async execute method
385 return await self.executor.execute([], ExecutionContext())
387 def execute_stream(
388 self, chunk_size: int = 1000
389 ) -> Iterator[pd.DataFrame]:
390 """
391 Execute pipeline in streaming mode.
393 Processes data in chunks for memory-efficient handling of large datasets.
394 Ideal for datasets that don't fit in memory.
396 Args:
397 chunk_size: Number of rows per chunk
399 Yields:
400 DataFrames with processed chunks
402 Raises:
403 ValueError: If executor doesn't support streaming
404 """
405 if not self.executor.supports_streaming():
406 raise ValueError(
407 "Current executor doesn't support streaming. "
408 f"Use StreamingExecutor: Pipeline(specs, executor=StreamingExecutor({chunk_size}))"
409 )
411 # Use executor's streaming execute method
412 return self.executor.execute([], ExecutionContext())
414 def _execute_stage(
415 self, stage: Any, input_data: Any, context: ExecutionContext
416 ) -> None:
417 """Execute a single stage with observer notifications."""
418 # Notify observers of stage start
419 for observer in self.observers:
420 observer.on_stage_start(stage, context)
422 try:
423 # Execute stage
424 result = stage.execute(input_data, context)
426 # Notify observers of completion
427 for observer in self.observers:
428 observer.on_stage_complete(stage, context, result)
430 except Exception as e:
431 # Notify observers of error
432 for observer in self.observers:
433 observer.on_stage_error(stage, context, e)
434 raise