Coverage for llm_dataset_engine/orchestration/pipeline_executor.py: 0%
104 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Pipeline executor for orchestrating stage execution.
4Implements the complete execution flow with state machine management
5as specified in the design document.
6"""
8from datetime import datetime
9from enum import Enum
10from typing import Any, List
11from uuid import UUID, uuid4
13import pandas as pd
15from llm_dataset_engine.core.models import ExecutionResult
16from llm_dataset_engine.orchestration.execution_context import ExecutionContext
17from llm_dataset_engine.orchestration.observers import ExecutionObserver
18from llm_dataset_engine.orchestration.state_manager import StateManager
19from llm_dataset_engine.stages.pipeline_stage import PipelineStage
20from llm_dataset_engine.utils import get_logger
22logger = get_logger(__name__)
25class ExecutionState(str, Enum):
26 """Pipeline execution states."""
28 IDLE = "idle"
29 INITIALIZING = "initializing"
30 EXECUTING = "executing"
31 PAUSED = "paused"
32 COMPLETED = "completed"
33 FAILED = "failed"
36class PipelineExecutor:
37 """
38 Orchestrates pipeline execution with state management.
40 Implements Command and Mediator patterns for coordinating
41 stages, observers, and state management.
43 State Machine:
44 IDLE → INITIALIZING → EXECUTING → [PAUSED ↔ EXECUTING] → COMPLETED
45 ↓
46 FAILED
47 """
49 def __init__(
50 self,
51 stages: List[PipelineStage],
52 state_manager: StateManager,
53 observers: List[ExecutionObserver] | None = None,
54 ):
55 """
56 Initialize pipeline executor.
58 Args:
59 stages: Ordered list of processing stages
60 state_manager: State manager for checkpointing
61 observers: Optional execution observers
62 """
63 self.execution_id = uuid4()
64 self.stages = stages
65 self.state_manager = state_manager
66 self.observers = observers or []
67 self.state = ExecutionState.IDLE
68 self.context: ExecutionContext | None = None
69 self.logger = get_logger(f"{__name__}.{self.execution_id}")
71 def add_observer(self, observer: ExecutionObserver) -> "PipelineExecutor":
72 """
73 Add execution observer.
75 Args:
76 observer: Observer to add
78 Returns:
79 Self for chaining
80 """
81 self.observers.append(observer)
82 return self
84 def execute(self, pipeline: Any) -> ExecutionResult:
85 """
86 Execute pipeline end-to-end.
88 Args:
89 pipeline: Pipeline instance to execute
91 Returns:
92 ExecutionResult with data and metrics
94 Raises:
95 RuntimeError: If pipeline in invalid state
96 """
97 if self.state not in [ExecutionState.IDLE, ExecutionState.FAILED]:
98 raise RuntimeError(
99 f"Cannot execute from state: {self.state}"
100 )
102 try:
103 # Initialize
104 self.state = ExecutionState.INITIALIZING
105 self.context = self._initialize_context()
107 # Check for existing checkpoint
108 if self.state_manager.can_resume(self.context.session_id):
109 self.logger.info("Found existing checkpoint, resuming...")
110 self.context = self.state_manager.load_checkpoint(
111 self.context.session_id
112 )
114 # Notify observers
115 self._notify_pipeline_start(pipeline)
117 # Execute stages
118 self.state = ExecutionState.EXECUTING
119 result_data = self._execute_all_stages(pipeline)
121 # Mark completion
122 self.state = ExecutionState.COMPLETED
123 self.context.end_time = datetime.now()
125 # Create result
126 result = self._create_execution_result(result_data)
128 # Cleanup checkpoints
129 self.state_manager.cleanup_checkpoints(self.context.session_id)
131 # Notify observers
132 self._notify_pipeline_complete(result)
134 return result
136 except Exception as e:
137 self.state = ExecutionState.FAILED
138 self._notify_pipeline_error(e)
140 # Save checkpoint on failure
141 if self.context:
142 self.state_manager.save_checkpoint(self.context)
144 raise
146 def pause(self) -> None:
147 """
148 Gracefully pause execution.
150 Finishes current batch and saves checkpoint.
151 """
152 if self.state != ExecutionState.EXECUTING:
153 raise RuntimeError(
154 f"Cannot pause from state: {self.state}"
155 )
157 self.logger.info("Pausing execution...")
158 self.state = ExecutionState.PAUSED
160 # Save checkpoint
161 if self.context:
162 self.state_manager.save_checkpoint(self.context)
164 def resume(self, session_id: UUID) -> ExecutionResult:
165 """
166 Resume from saved checkpoint.
168 Args:
169 session_id: Session ID to resume
171 Returns:
172 ExecutionResult
174 Raises:
175 ValueError: If no checkpoint found
176 """
177 if not self.state_manager.can_resume(session_id):
178 raise ValueError(f"No checkpoint found for session {session_id}")
180 self.context = self.state_manager.load_checkpoint(session_id)
181 if not self.context:
182 raise ValueError("Failed to load checkpoint")
184 self.logger.info(
185 f"Resuming from row {self.context.last_processed_row}"
186 )
188 # Continue execution
189 # Note: Would need to reconstruct pipeline and skip processed rows
190 raise NotImplementedError("Resume functionality coming soon")
192 def cancel(self) -> None:
193 """
194 Immediately stop and save checkpoint.
195 """
196 self.logger.info("Cancelling execution...")
198 # Save checkpoint
199 if self.context:
200 self.state_manager.save_checkpoint(self.context)
202 self.state = ExecutionState.IDLE
204 def _initialize_context(self) -> ExecutionContext:
205 """Initialize execution context."""
206 return ExecutionContext(session_id=self.execution_id)
208 def _execute_all_stages(self, pipeline: Any) -> pd.DataFrame:
209 """
210 Execute all pipeline stages sequentially.
212 Args:
213 pipeline: Pipeline instance
215 Returns:
216 Final DataFrame result
217 """
218 # This will be implemented with actual stage orchestration
219 # For now, delegate to pipeline's execution logic
220 # In a future iteration, we'll move all execution here
221 raise NotImplementedError(
222 "Stage orchestration implemented in Pipeline.execute() currently"
223 )
225 def _create_execution_result(
226 self, data: pd.DataFrame
227 ) -> ExecutionResult:
228 """
229 Create execution result from context and data.
231 Args:
232 data: Final processed data
234 Returns:
235 ExecutionResult
236 """
237 if not self.context:
238 raise RuntimeError("No execution context available")
240 return ExecutionResult(
241 data=data,
242 metrics=self.context.get_stats(),
243 costs=self.context.accumulated_cost,
244 execution_id=self.context.session_id,
245 start_time=self.context.start_time,
246 end_time=self.context.end_time,
247 success=True,
248 )
250 def _notify_pipeline_start(self, pipeline: Any) -> None:
251 """Notify all observers of pipeline start."""
252 if self.context:
253 for observer in self.observers:
254 try:
255 observer.on_pipeline_start(pipeline, self.context)
256 except Exception as e:
257 self.logger.error(
258 f"Observer {observer.__class__.__name__} failed: {e}"
259 )
261 def _notify_pipeline_complete(self, result: ExecutionResult) -> None:
262 """Notify all observers of pipeline completion."""
263 if self.context:
264 for observer in self.observers:
265 try:
266 observer.on_pipeline_complete(self.context, result)
267 except Exception as e:
268 self.logger.error(
269 f"Observer {observer.__class__.__name__} failed: {e}"
270 )
272 def _notify_pipeline_error(self, error: Exception) -> None:
273 """Notify all observers of pipeline error."""
274 if self.context:
275 for observer in self.observers:
276 try:
277 observer.on_pipeline_error(self.context, error)
278 except Exception as e:
279 self.logger.error(
280 f"Observer {observer.__class__.__name__} failed: {e}"
281 )