Coverage for llm_dataset_engine/orchestration/observers.py: 52%
85 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Execution observers for monitoring and logging.
4Implements Observer pattern for decoupled event notification.
5"""
7from abc import ABC, abstractmethod
8from typing import Any
10from tqdm import tqdm
12from llm_dataset_engine.core.models import ExecutionResult
13from llm_dataset_engine.orchestration.execution_context import ExecutionContext
14from llm_dataset_engine.stages.pipeline_stage import PipelineStage
15from llm_dataset_engine.utils import get_logger
18class ExecutionObserver(ABC):
19 """
20 Abstract base for execution observers.
22 Observers can monitor pipeline execution without coupling
23 to the executor implementation.
24 """
26 @abstractmethod
27 def on_pipeline_start(
28 self, pipeline: Any, context: ExecutionContext
29 ) -> None:
30 """Called before first stage execution."""
31 pass
33 @abstractmethod
34 def on_stage_start(
35 self, stage: PipelineStage, context: ExecutionContext
36 ) -> None:
37 """Called before each stage."""
38 pass
40 @abstractmethod
41 def on_stage_complete(
42 self, stage: PipelineStage, context: ExecutionContext, result: Any
43 ) -> None:
44 """Called after successful stage completion."""
45 pass
47 @abstractmethod
48 def on_stage_error(
49 self, stage: PipelineStage, context: ExecutionContext, error: Exception
50 ) -> None:
51 """Called on stage failure."""
52 pass
54 @abstractmethod
55 def on_pipeline_complete(
56 self, context: ExecutionContext, result: ExecutionResult
57 ) -> None:
58 """Called after all stages complete."""
59 pass
61 @abstractmethod
62 def on_pipeline_error(
63 self, context: ExecutionContext, error: Exception
64 ) -> None:
65 """Called on fatal pipeline failure."""
66 pass
69class ProgressBarObserver(ExecutionObserver):
70 """Observer that displays progress bar with tqdm."""
72 def __init__(self):
73 """Initialize progress bar observer."""
74 self.progress_bar: tqdm | None = None
76 def on_pipeline_start(
77 self, pipeline: Any, context: ExecutionContext
78 ) -> None:
79 """Initialize progress bar."""
80 if context.total_rows > 0:
81 self.progress_bar = tqdm(
82 total=context.total_rows,
83 desc="Processing",
84 unit="rows",
85 )
87 def on_stage_start(
88 self, stage: PipelineStage, context: ExecutionContext
89 ) -> None:
90 """Update progress bar description."""
91 if self.progress_bar:
92 self.progress_bar.set_description(f"Stage: {stage.name}")
94 def on_stage_complete(
95 self, stage: PipelineStage, context: ExecutionContext, result: Any
96 ) -> None:
97 """Update progress bar."""
98 if self.progress_bar:
99 self.progress_bar.n = context.last_processed_row
100 self.progress_bar.refresh()
102 def on_stage_error(
103 self, stage: PipelineStage, context: ExecutionContext, error: Exception
104 ) -> None:
105 """Handle error in progress bar."""
106 if self.progress_bar:
107 self.progress_bar.set_description(f"Error in {stage.name}")
109 def on_pipeline_complete(
110 self, context: ExecutionContext, result: ExecutionResult
111 ) -> None:
112 """Close progress bar."""
113 if self.progress_bar:
114 self.progress_bar.close()
115 self.progress_bar = None
117 def on_pipeline_error(
118 self, context: ExecutionContext, error: Exception
119 ) -> None:
120 """Close progress bar on error."""
121 if self.progress_bar:
122 self.progress_bar.close()
123 self.progress_bar = None
126class LoggingObserver(ExecutionObserver):
127 """Observer that logs execution events."""
129 def __init__(self):
130 """Initialize logging observer."""
131 self.logger = get_logger(__name__)
133 def on_pipeline_start(
134 self, pipeline: Any, context: ExecutionContext
135 ) -> None:
136 """Log pipeline start."""
137 self.logger.info(
138 f"Pipeline execution started (session: {context.session_id})"
139 )
141 def on_stage_start(
142 self, stage: PipelineStage, context: ExecutionContext
143 ) -> None:
144 """Log stage start."""
145 self.logger.info(
146 f"Starting stage: {stage.name} "
147 f"(progress: {context.get_progress():.1f}%)"
148 )
150 def on_stage_complete(
151 self, stage: PipelineStage, context: ExecutionContext, result: Any
152 ) -> None:
153 """Log stage completion."""
154 self.logger.info(
155 f"Completed stage: {stage.name} "
156 f"(cost: ${context.accumulated_cost:.4f})"
157 )
159 def on_stage_error(
160 self, stage: PipelineStage, context: ExecutionContext, error: Exception
161 ) -> None:
162 """Log stage error."""
163 self.logger.error(f"Stage {stage.name} failed: {error}")
165 def on_pipeline_complete(
166 self, context: ExecutionContext, result: ExecutionResult
167 ) -> None:
168 """Log pipeline completion."""
169 self.logger.info(
170 f"Pipeline execution completed successfully\n"
171 f" Processed: {result.metrics.processed_rows} rows\n"
172 f" Duration: {result.metrics.total_duration_seconds:.2f}s\n"
173 f" Total cost: ${result.costs.total_cost:.4f}\n"
174 f" Errors: {result.metrics.failed_rows}"
175 )
177 def on_pipeline_error(
178 self, context: ExecutionContext, error: Exception
179 ) -> None:
180 """Log pipeline error."""
181 self.logger.error(f"Pipeline execution failed: {error}")
184class CostTrackingObserver(ExecutionObserver):
185 """Observer that tracks and warns about costs."""
187 def __init__(self, warning_threshold: float = 0.75):
188 """
189 Initialize cost tracking observer.
191 Args:
192 warning_threshold: Warn when this fraction of budget used
193 """
194 self.logger = get_logger(__name__)
195 self.warning_threshold = warning_threshold
196 self.max_budget: float | None = None
198 def on_pipeline_start(
199 self, pipeline: Any, context: ExecutionContext
200 ) -> None:
201 """Set max budget if available."""
202 # Could extract from pipeline specs
203 pass
205 def on_stage_start(
206 self, stage: PipelineStage, context: ExecutionContext
207 ) -> None:
208 """No action on stage start."""
209 pass
211 def on_stage_complete(
212 self, stage: PipelineStage, context: ExecutionContext, result: Any
213 ) -> None:
214 """Check cost after stage completion."""
215 if self.max_budget:
216 usage_ratio = float(context.accumulated_cost) / self.max_budget
218 if usage_ratio >= self.warning_threshold:
219 self.logger.warning(
220 f"Cost warning: {usage_ratio * 100:.1f}% of budget used "
221 f"(${context.accumulated_cost:.4f} / ${self.max_budget:.2f})"
222 )
224 def on_stage_error(
225 self, stage: PipelineStage, context: ExecutionContext, error: Exception
226 ) -> None:
227 """No action on error."""
228 pass
230 def on_pipeline_complete(
231 self, context: ExecutionContext, result: ExecutionResult
232 ) -> None:
233 """Log final cost summary."""
234 self.logger.info(
235 f"Cost summary:\n"
236 f" Total: ${result.costs.total_cost:.4f}\n"
237 f" Input tokens: {result.costs.input_tokens:,}\n"
238 f" Output tokens: {result.costs.output_tokens:,}\n"
239 f" Cost per row: ${float(result.costs.total_cost) / result.metrics.total_rows:.6f}"
240 )
242 def on_pipeline_error(
243 self, context: ExecutionContext, error: Exception
244 ) -> None:
245 """Log cost at failure."""
246 self.logger.info(
247 f"Cost at failure: ${context.accumulated_cost:.4f}"
248 )