Coverage for llm_dataset_engine/orchestration/observers.py: 52%

85 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Execution observers for monitoring and logging. 

3 

4Implements Observer pattern for decoupled event notification. 

5""" 

6 

7from abc import ABC, abstractmethod 

8from typing import Any 

9 

10from tqdm import tqdm 

11 

12from llm_dataset_engine.core.models import ExecutionResult 

13from llm_dataset_engine.orchestration.execution_context import ExecutionContext 

14from llm_dataset_engine.stages.pipeline_stage import PipelineStage 

15from llm_dataset_engine.utils import get_logger 

16 

17 

18class ExecutionObserver(ABC): 

19 """ 

20 Abstract base for execution observers. 

21  

22 Observers can monitor pipeline execution without coupling 

23 to the executor implementation. 

24 """ 

25 

26 @abstractmethod 

27 def on_pipeline_start( 

28 self, pipeline: Any, context: ExecutionContext 

29 ) -> None: 

30 """Called before first stage execution.""" 

31 pass 

32 

33 @abstractmethod 

34 def on_stage_start( 

35 self, stage: PipelineStage, context: ExecutionContext 

36 ) -> None: 

37 """Called before each stage.""" 

38 pass 

39 

40 @abstractmethod 

41 def on_stage_complete( 

42 self, stage: PipelineStage, context: ExecutionContext, result: Any 

43 ) -> None: 

44 """Called after successful stage completion.""" 

45 pass 

46 

47 @abstractmethod 

48 def on_stage_error( 

49 self, stage: PipelineStage, context: ExecutionContext, error: Exception 

50 ) -> None: 

51 """Called on stage failure.""" 

52 pass 

53 

54 @abstractmethod 

55 def on_pipeline_complete( 

56 self, context: ExecutionContext, result: ExecutionResult 

57 ) -> None: 

58 """Called after all stages complete.""" 

59 pass 

60 

61 @abstractmethod 

62 def on_pipeline_error( 

63 self, context: ExecutionContext, error: Exception 

64 ) -> None: 

65 """Called on fatal pipeline failure.""" 

66 pass 

67 

68 

69class ProgressBarObserver(ExecutionObserver): 

70 """Observer that displays progress bar with tqdm.""" 

71 

72 def __init__(self): 

73 """Initialize progress bar observer.""" 

74 self.progress_bar: tqdm | None = None 

75 

76 def on_pipeline_start( 

77 self, pipeline: Any, context: ExecutionContext 

78 ) -> None: 

79 """Initialize progress bar.""" 

80 if context.total_rows > 0: 

81 self.progress_bar = tqdm( 

82 total=context.total_rows, 

83 desc="Processing", 

84 unit="rows", 

85 ) 

86 

87 def on_stage_start( 

88 self, stage: PipelineStage, context: ExecutionContext 

89 ) -> None: 

90 """Update progress bar description.""" 

91 if self.progress_bar: 

92 self.progress_bar.set_description(f"Stage: {stage.name}") 

93 

94 def on_stage_complete( 

95 self, stage: PipelineStage, context: ExecutionContext, result: Any 

96 ) -> None: 

97 """Update progress bar.""" 

98 if self.progress_bar: 

99 self.progress_bar.n = context.last_processed_row 

100 self.progress_bar.refresh() 

101 

102 def on_stage_error( 

103 self, stage: PipelineStage, context: ExecutionContext, error: Exception 

104 ) -> None: 

105 """Handle error in progress bar.""" 

106 if self.progress_bar: 

107 self.progress_bar.set_description(f"Error in {stage.name}") 

108 

109 def on_pipeline_complete( 

110 self, context: ExecutionContext, result: ExecutionResult 

111 ) -> None: 

112 """Close progress bar.""" 

113 if self.progress_bar: 

114 self.progress_bar.close() 

115 self.progress_bar = None 

116 

117 def on_pipeline_error( 

118 self, context: ExecutionContext, error: Exception 

119 ) -> None: 

120 """Close progress bar on error.""" 

121 if self.progress_bar: 

122 self.progress_bar.close() 

123 self.progress_bar = None 

124 

125 

126class LoggingObserver(ExecutionObserver): 

127 """Observer that logs execution events.""" 

128 

129 def __init__(self): 

130 """Initialize logging observer.""" 

131 self.logger = get_logger(__name__) 

132 

133 def on_pipeline_start( 

134 self, pipeline: Any, context: ExecutionContext 

135 ) -> None: 

136 """Log pipeline start.""" 

137 self.logger.info( 

138 f"Pipeline execution started (session: {context.session_id})" 

139 ) 

140 

141 def on_stage_start( 

142 self, stage: PipelineStage, context: ExecutionContext 

143 ) -> None: 

144 """Log stage start.""" 

145 self.logger.info( 

146 f"Starting stage: {stage.name} " 

147 f"(progress: {context.get_progress():.1f}%)" 

148 ) 

149 

150 def on_stage_complete( 

151 self, stage: PipelineStage, context: ExecutionContext, result: Any 

152 ) -> None: 

153 """Log stage completion.""" 

154 self.logger.info( 

155 f"Completed stage: {stage.name} " 

156 f"(cost: ${context.accumulated_cost:.4f})" 

157 ) 

158 

159 def on_stage_error( 

160 self, stage: PipelineStage, context: ExecutionContext, error: Exception 

161 ) -> None: 

162 """Log stage error.""" 

163 self.logger.error(f"Stage {stage.name} failed: {error}") 

164 

165 def on_pipeline_complete( 

166 self, context: ExecutionContext, result: ExecutionResult 

167 ) -> None: 

168 """Log pipeline completion.""" 

169 self.logger.info( 

170 f"Pipeline execution completed successfully\n" 

171 f" Processed: {result.metrics.processed_rows} rows\n" 

172 f" Duration: {result.metrics.total_duration_seconds:.2f}s\n" 

173 f" Total cost: ${result.costs.total_cost:.4f}\n" 

174 f" Errors: {result.metrics.failed_rows}" 

175 ) 

176 

177 def on_pipeline_error( 

178 self, context: ExecutionContext, error: Exception 

179 ) -> None: 

180 """Log pipeline error.""" 

181 self.logger.error(f"Pipeline execution failed: {error}") 

182 

183 

184class CostTrackingObserver(ExecutionObserver): 

185 """Observer that tracks and warns about costs.""" 

186 

187 def __init__(self, warning_threshold: float = 0.75): 

188 """ 

189 Initialize cost tracking observer. 

190 

191 Args: 

192 warning_threshold: Warn when this fraction of budget used 

193 """ 

194 self.logger = get_logger(__name__) 

195 self.warning_threshold = warning_threshold 

196 self.max_budget: float | None = None 

197 

198 def on_pipeline_start( 

199 self, pipeline: Any, context: ExecutionContext 

200 ) -> None: 

201 """Set max budget if available.""" 

202 # Could extract from pipeline specs 

203 pass 

204 

205 def on_stage_start( 

206 self, stage: PipelineStage, context: ExecutionContext 

207 ) -> None: 

208 """No action on stage start.""" 

209 pass 

210 

211 def on_stage_complete( 

212 self, stage: PipelineStage, context: ExecutionContext, result: Any 

213 ) -> None: 

214 """Check cost after stage completion.""" 

215 if self.max_budget: 

216 usage_ratio = float(context.accumulated_cost) / self.max_budget 

217 

218 if usage_ratio >= self.warning_threshold: 

219 self.logger.warning( 

220 f"Cost warning: {usage_ratio * 100:.1f}% of budget used " 

221 f"(${context.accumulated_cost:.4f} / ${self.max_budget:.2f})" 

222 ) 

223 

224 def on_stage_error( 

225 self, stage: PipelineStage, context: ExecutionContext, error: Exception 

226 ) -> None: 

227 """No action on error.""" 

228 pass 

229 

230 def on_pipeline_complete( 

231 self, context: ExecutionContext, result: ExecutionResult 

232 ) -> None: 

233 """Log final cost summary.""" 

234 self.logger.info( 

235 f"Cost summary:\n" 

236 f" Total: ${result.costs.total_cost:.4f}\n" 

237 f" Input tokens: {result.costs.input_tokens:,}\n" 

238 f" Output tokens: {result.costs.output_tokens:,}\n" 

239 f" Cost per row: ${float(result.costs.total_cost) / result.metrics.total_rows:.6f}" 

240 ) 

241 

242 def on_pipeline_error( 

243 self, context: ExecutionContext, error: Exception 

244 ) -> None: 

245 """Log cost at failure.""" 

246 self.logger.info( 

247 f"Cost at failure: ${context.accumulated_cost:.4f}" 

248 ) 

249