Coverage for llm_dataset_engine/orchestration/pipeline_executor.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Pipeline executor for orchestrating stage execution. 

3 

4Implements the complete execution flow with state machine management 

5as specified in the design document. 

6""" 

7 

8from datetime import datetime 

9from enum import Enum 

10from typing import Any, List 

11from uuid import UUID, uuid4 

12 

13import pandas as pd 

14 

15from llm_dataset_engine.core.models import ExecutionResult 

16from llm_dataset_engine.orchestration.execution_context import ExecutionContext 

17from llm_dataset_engine.orchestration.observers import ExecutionObserver 

18from llm_dataset_engine.orchestration.state_manager import StateManager 

19from llm_dataset_engine.stages.pipeline_stage import PipelineStage 

20from llm_dataset_engine.utils import get_logger 

21 

22logger = get_logger(__name__) 

23 

24 

25class ExecutionState(str, Enum): 

26 """Pipeline execution states.""" 

27 

28 IDLE = "idle" 

29 INITIALIZING = "initializing" 

30 EXECUTING = "executing" 

31 PAUSED = "paused" 

32 COMPLETED = "completed" 

33 FAILED = "failed" 

34 

35 

36class PipelineExecutor: 

37 """ 

38 Orchestrates pipeline execution with state management. 

39  

40 Implements Command and Mediator patterns for coordinating 

41 stages, observers, and state management. 

42  

43 State Machine: 

44 IDLE → INITIALIZING → EXECUTING → [PAUSED ↔ EXECUTING] → COMPLETED 

45 

46 FAILED 

47 """ 

48 

49 def __init__( 

50 self, 

51 stages: List[PipelineStage], 

52 state_manager: StateManager, 

53 observers: List[ExecutionObserver] | None = None, 

54 ): 

55 """ 

56 Initialize pipeline executor. 

57 

58 Args: 

59 stages: Ordered list of processing stages 

60 state_manager: State manager for checkpointing 

61 observers: Optional execution observers 

62 """ 

63 self.execution_id = uuid4() 

64 self.stages = stages 

65 self.state_manager = state_manager 

66 self.observers = observers or [] 

67 self.state = ExecutionState.IDLE 

68 self.context: ExecutionContext | None = None 

69 self.logger = get_logger(f"{__name__}.{self.execution_id}") 

70 

71 def add_observer(self, observer: ExecutionObserver) -> "PipelineExecutor": 

72 """ 

73 Add execution observer. 

74 

75 Args: 

76 observer: Observer to add 

77 

78 Returns: 

79 Self for chaining 

80 """ 

81 self.observers.append(observer) 

82 return self 

83 

84 def execute(self, pipeline: Any) -> ExecutionResult: 

85 """ 

86 Execute pipeline end-to-end. 

87 

88 Args: 

89 pipeline: Pipeline instance to execute 

90 

91 Returns: 

92 ExecutionResult with data and metrics 

93 

94 Raises: 

95 RuntimeError: If pipeline in invalid state 

96 """ 

97 if self.state not in [ExecutionState.IDLE, ExecutionState.FAILED]: 

98 raise RuntimeError( 

99 f"Cannot execute from state: {self.state}" 

100 ) 

101 

102 try: 

103 # Initialize 

104 self.state = ExecutionState.INITIALIZING 

105 self.context = self._initialize_context() 

106 

107 # Check for existing checkpoint 

108 if self.state_manager.can_resume(self.context.session_id): 

109 self.logger.info("Found existing checkpoint, resuming...") 

110 self.context = self.state_manager.load_checkpoint( 

111 self.context.session_id 

112 ) 

113 

114 # Notify observers 

115 self._notify_pipeline_start(pipeline) 

116 

117 # Execute stages 

118 self.state = ExecutionState.EXECUTING 

119 result_data = self._execute_all_stages(pipeline) 

120 

121 # Mark completion 

122 self.state = ExecutionState.COMPLETED 

123 self.context.end_time = datetime.now() 

124 

125 # Create result 

126 result = self._create_execution_result(result_data) 

127 

128 # Cleanup checkpoints 

129 self.state_manager.cleanup_checkpoints(self.context.session_id) 

130 

131 # Notify observers 

132 self._notify_pipeline_complete(result) 

133 

134 return result 

135 

136 except Exception as e: 

137 self.state = ExecutionState.FAILED 

138 self._notify_pipeline_error(e) 

139 

140 # Save checkpoint on failure 

141 if self.context: 

142 self.state_manager.save_checkpoint(self.context) 

143 

144 raise 

145 

146 def pause(self) -> None: 

147 """ 

148 Gracefully pause execution. 

149  

150 Finishes current batch and saves checkpoint. 

151 """ 

152 if self.state != ExecutionState.EXECUTING: 

153 raise RuntimeError( 

154 f"Cannot pause from state: {self.state}" 

155 ) 

156 

157 self.logger.info("Pausing execution...") 

158 self.state = ExecutionState.PAUSED 

159 

160 # Save checkpoint 

161 if self.context: 

162 self.state_manager.save_checkpoint(self.context) 

163 

164 def resume(self, session_id: UUID) -> ExecutionResult: 

165 """ 

166 Resume from saved checkpoint. 

167 

168 Args: 

169 session_id: Session ID to resume 

170 

171 Returns: 

172 ExecutionResult 

173 

174 Raises: 

175 ValueError: If no checkpoint found 

176 """ 

177 if not self.state_manager.can_resume(session_id): 

178 raise ValueError(f"No checkpoint found for session {session_id}") 

179 

180 self.context = self.state_manager.load_checkpoint(session_id) 

181 if not self.context: 

182 raise ValueError("Failed to load checkpoint") 

183 

184 self.logger.info( 

185 f"Resuming from row {self.context.last_processed_row}" 

186 ) 

187 

188 # Continue execution 

189 # Note: Would need to reconstruct pipeline and skip processed rows 

190 raise NotImplementedError("Resume functionality coming soon") 

191 

192 def cancel(self) -> None: 

193 """ 

194 Immediately stop and save checkpoint. 

195 """ 

196 self.logger.info("Cancelling execution...") 

197 

198 # Save checkpoint 

199 if self.context: 

200 self.state_manager.save_checkpoint(self.context) 

201 

202 self.state = ExecutionState.IDLE 

203 

204 def _initialize_context(self) -> ExecutionContext: 

205 """Initialize execution context.""" 

206 return ExecutionContext(session_id=self.execution_id) 

207 

208 def _execute_all_stages(self, pipeline: Any) -> pd.DataFrame: 

209 """ 

210 Execute all pipeline stages sequentially. 

211 

212 Args: 

213 pipeline: Pipeline instance 

214 

215 Returns: 

216 Final DataFrame result 

217 """ 

218 # This will be implemented with actual stage orchestration 

219 # For now, delegate to pipeline's execution logic 

220 # In a future iteration, we'll move all execution here 

221 raise NotImplementedError( 

222 "Stage orchestration implemented in Pipeline.execute() currently" 

223 ) 

224 

225 def _create_execution_result( 

226 self, data: pd.DataFrame 

227 ) -> ExecutionResult: 

228 """ 

229 Create execution result from context and data. 

230 

231 Args: 

232 data: Final processed data 

233 

234 Returns: 

235 ExecutionResult 

236 """ 

237 if not self.context: 

238 raise RuntimeError("No execution context available") 

239 

240 return ExecutionResult( 

241 data=data, 

242 metrics=self.context.get_stats(), 

243 costs=self.context.accumulated_cost, 

244 execution_id=self.context.session_id, 

245 start_time=self.context.start_time, 

246 end_time=self.context.end_time, 

247 success=True, 

248 ) 

249 

250 def _notify_pipeline_start(self, pipeline: Any) -> None: 

251 """Notify all observers of pipeline start.""" 

252 if self.context: 

253 for observer in self.observers: 

254 try: 

255 observer.on_pipeline_start(pipeline, self.context) 

256 except Exception as e: 

257 self.logger.error( 

258 f"Observer {observer.__class__.__name__} failed: {e}" 

259 ) 

260 

261 def _notify_pipeline_complete(self, result: ExecutionResult) -> None: 

262 """Notify all observers of pipeline completion.""" 

263 if self.context: 

264 for observer in self.observers: 

265 try: 

266 observer.on_pipeline_complete(self.context, result) 

267 except Exception as e: 

268 self.logger.error( 

269 f"Observer {observer.__class__.__name__} failed: {e}" 

270 ) 

271 

272 def _notify_pipeline_error(self, error: Exception) -> None: 

273 """Notify all observers of pipeline error.""" 

274 if self.context: 

275 for observer in self.observers: 

276 try: 

277 observer.on_pipeline_error(self.context, error) 

278 except Exception as e: 

279 self.logger.error( 

280 f"Observer {observer.__class__.__name__} failed: {e}" 

281 ) 

282