Coverage for src/dataknobs_llm/prompts/versioning/metrics.py: 71%
112 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:05 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:05 -0600
1"""Metrics tracking for prompt versions.
3This module provides:
4- Event-based metrics collection
5- Aggregated metrics computation
6- Performance comparison across versions
7- Experiment metrics analysis
8"""
10from typing import Any, Dict, List, Optional
11from datetime import datetime
13from .types import (
14 PromptMetrics,
15 MetricEvent,
16 VersioningError,
17)
20class MetricsCollector:
21 """Collects and aggregates metrics for prompt versions.
23 Tracks usage, performance, and user feedback for each version.
24 Supports both real-time event recording and aggregated metrics retrieval.
26 Example:
27 ```python
28 collector = MetricsCollector(storage_backend)
30 # Record a usage event
31 await collector.record_event(
32 version_id="v1",
33 success=True,
34 response_time=0.5,
35 tokens=150,
36 user_rating=4.5
37 )
39 # Get aggregated metrics
40 metrics = await collector.get_metrics("v1")
41 print(f"Success rate: {metrics.success_rate:.2%}")
42 print(f"Avg response time: {metrics.avg_response_time:.2f}s")
44 # Compare variants in experiment
45 comparison = await collector.compare_variants(
46 experiment_id="exp1"
47 )
48 ```
49 """
51 def __init__(self, storage: Optional[Any] = None):
52 """Initialize metrics collector.
54 Args:
55 storage: Backend storage (dict for in-memory, database for persistence)
56 If None, uses in-memory dictionary
57 """
58 self.storage = storage if storage is not None else {}
59 self._metrics: Dict[str, PromptMetrics] = {} # version_id -> PromptMetrics
60 self._events: Dict[str, List[MetricEvent]] = {} # version_id -> [events]
62 async def record_event(
63 self,
64 version_id: str,
65 success: bool = True,
66 response_time: Optional[float] = None,
67 tokens: Optional[int] = None,
68 user_rating: Optional[float] = None,
69 metadata: Optional[Dict[str, Any]] = None,
70 ) -> MetricEvent:
71 """Record a single usage event.
73 Args:
74 version_id: Version ID this event belongs to
75 success: Whether the use was successful
76 response_time: Response time in seconds (None if not applicable)
77 tokens: Number of tokens used (None if not applicable)
78 user_rating: User rating 1-5 (None if not provided)
79 metadata: Additional event metadata
81 Returns:
82 Created MetricEvent
84 Raises:
85 ValueError: If user_rating is not in valid range
86 """
87 if user_rating is not None and not (1.0 <= user_rating <= 5.0):
88 raise ValueError(f"User rating must be between 1.0 and 5.0, got {user_rating}")
90 # Create event
91 event = MetricEvent(
92 version_id=version_id,
93 timestamp=datetime.utcnow(),
94 success=success,
95 response_time=response_time,
96 tokens=tokens,
97 user_rating=user_rating,
98 metadata=metadata or {},
99 )
101 # Store event
102 if version_id not in self._events:
103 self._events[version_id] = []
104 self._events[version_id].append(event)
106 # Update aggregated metrics
107 await self._update_metrics(version_id, event)
109 # Persist event if backend available
110 if hasattr(self.storage, "append"):
111 await self._persist_event(event)
113 return event
115 async def get_metrics(
116 self,
117 version_id: str,
118 ) -> PromptMetrics:
119 """Get aggregated metrics for a version.
121 If no events have been recorded, returns empty metrics.
123 Args:
124 version_id: Version ID
126 Returns:
127 PromptMetrics with aggregated statistics
128 """
129 if version_id not in self._metrics:
130 # Return empty metrics
131 return PromptMetrics(version_id=version_id)
133 return self._metrics[version_id]
135 async def get_events(
136 self,
137 version_id: str,
138 start_time: Optional[datetime] = None,
139 end_time: Optional[datetime] = None,
140 limit: Optional[int] = None,
141 ) -> List[MetricEvent]:
142 """Get raw events for a version.
144 Args:
145 version_id: Version ID
146 start_time: Filter events after this time
147 end_time: Filter events before this time
148 limit: Maximum number of events to return (most recent first)
150 Returns:
151 List of MetricEvent objects
152 """
153 events = self._events.get(version_id, [])
155 # Apply time filters
156 if start_time:
157 events = [e for e in events if e.timestamp >= start_time]
158 if end_time:
159 events = [e for e in events if e.timestamp <= end_time]
161 # Sort by timestamp (most recent first)
162 events = sorted(events, key=lambda e: e.timestamp, reverse=True)
164 # Apply limit
165 if limit:
166 events = events[:limit]
168 return events
170 async def compare_variants(
171 self,
172 version_ids: List[str],
173 ) -> Dict[str, PromptMetrics]:
174 """Compare metrics across multiple versions.
176 Args:
177 version_ids: List of version IDs to compare
179 Returns:
180 Dictionary mapping version_id to PromptMetrics
181 """
182 comparison = {}
183 for version_id in version_ids:
184 comparison[version_id] = await self.get_metrics(version_id)
185 return comparison
187 async def get_experiment_metrics(
188 self,
189 experiment_id: str,
190 variant_versions: List[str],
191 ) -> Dict[str, PromptMetrics]:
192 """Get metrics for all variants in an experiment.
194 Convenience method for comparing experiment variants.
196 Args:
197 experiment_id: Experiment ID (for metadata)
198 variant_versions: List of version strings in the experiment
200 Returns:
201 Dictionary mapping version to PromptMetrics
202 """
203 return await self.compare_variants(variant_versions)
205 async def reset_metrics(
206 self,
207 version_id: str,
208 ) -> bool:
209 """Reset metrics for a version.
211 Warning: This permanently deletes all events and metrics for this version.
213 Args:
214 version_id: Version ID
216 Returns:
217 True if reset, False if version not found
218 """
219 if version_id not in self._metrics and version_id not in self._events:
220 return False
222 # Clear metrics and events
223 if version_id in self._metrics:
224 del self._metrics[version_id]
225 if version_id in self._events:
226 del self._events[version_id]
228 # Persist deletion if backend available
229 if hasattr(self.storage, "delete"):
230 await self.storage.delete(f"metrics:{version_id}")
232 return True
234 async def get_summary(
235 self,
236 version_ids: List[str],
237 ) -> Dict[str, Any]:
238 """Get summary statistics across multiple versions.
240 Args:
241 version_ids: List of version IDs
243 Returns:
244 Summary dictionary with aggregated statistics
245 """
246 all_metrics = await self.compare_variants(version_ids)
248 total_uses = sum(m.total_uses for m in all_metrics.values())
249 total_successes = sum(m.success_count for m in all_metrics.values())
250 total_errors = sum(m.error_count for m in all_metrics.values())
252 return {
253 "total_versions": len(version_ids),
254 "total_uses": total_uses,
255 "total_successes": total_successes,
256 "total_errors": total_errors,
257 "overall_success_rate": total_successes / total_uses if total_uses > 0 else 0.0,
258 "versions": {
259 vid: {
260 "uses": m.total_uses,
261 "success_rate": m.success_rate,
262 "avg_response_time": m.avg_response_time,
263 "avg_tokens": m.avg_tokens,
264 "avg_rating": m.avg_rating,
265 }
266 for vid, m in all_metrics.items()
267 }
268 }
270 # ===== Helper Methods =====
272 async def _update_metrics(
273 self,
274 version_id: str,
275 event: MetricEvent,
276 ):
277 """Update aggregated metrics with new event."""
278 # Get or create metrics
279 if version_id not in self._metrics:
280 self._metrics[version_id] = PromptMetrics(version_id=version_id)
282 metrics = self._metrics[version_id]
284 # Update counters
285 metrics.total_uses += 1
286 if event.success:
287 metrics.success_count += 1
288 else:
289 metrics.error_count += 1
291 # Update response time
292 if event.response_time is not None:
293 metrics.total_response_time += event.response_time
295 # Update tokens
296 if event.tokens is not None:
297 metrics.total_tokens += event.tokens
299 # Update ratings
300 if event.user_rating is not None:
301 metrics.user_ratings.append(event.user_rating)
303 # Update last used timestamp
304 metrics.last_used = event.timestamp
306 # Persist if backend available
307 if hasattr(self.storage, "set"):
308 await self._persist_metrics(metrics)
310 async def _persist_event(self, event: MetricEvent):
311 """Persist event to backend storage."""
312 if hasattr(self.storage, "append"):
313 key = f"events:{event.version_id}"
314 await self.storage.append(key, event.to_dict())
316 async def _persist_metrics(self, metrics: PromptMetrics):
317 """Persist metrics to backend storage."""
318 if hasattr(self.storage, "set"):
319 key = f"metrics:{metrics.version_id}"
320 await self.storage.set(key, metrics.to_dict())
322 async def get_top_versions(
323 self,
324 version_ids: List[str],
325 metric: str = "success_rate",
326 limit: int = 5,
327 ) -> List[tuple[str, float]]:
328 """Get top performing versions by a specific metric.
330 Args:
331 version_ids: List of version IDs to rank
332 metric: Metric to rank by ("success_rate", "avg_rating", "avg_response_time")
333 limit: Number of top versions to return
335 Returns:
336 List of (version_id, metric_value) tuples, sorted by metric
338 Raises:
339 ValueError: If metric name is invalid
340 """
341 valid_metrics = ["success_rate", "avg_rating", "avg_response_time", "avg_tokens"]
342 if metric not in valid_metrics:
343 raise ValueError(
344 f"Invalid metric: {metric}. Valid metrics: {', '.join(valid_metrics)}"
345 )
347 # Get metrics for all versions
348 all_metrics = await self.compare_variants(version_ids)
350 # Extract metric values
351 metric_values = [
352 (vid, getattr(metrics, metric))
353 for vid, metrics in all_metrics.items()
354 if metrics.total_uses > 0 # Only include versions with data
355 ]
357 # Sort by metric value
358 # For response_time, lower is better (reverse=False)
359 # For success_rate, rating, higher is better (reverse=True)
360 reverse = metric != "avg_response_time"
361 sorted_versions = sorted(metric_values, key=lambda x: x[1], reverse=reverse)
363 return sorted_versions[:limit]
365 async def get_version_performance_over_time(
366 self,
367 version_id: str,
368 bucket_size: str = "hour",
369 ) -> List[Dict[str, Any]]:
370 """Get performance metrics bucketed by time period.
372 Args:
373 version_id: Version ID
374 bucket_size: Time bucket size ("hour", "day", "week")
376 Returns:
377 List of time-bucketed metrics
379 Note:
380 This is a simplified implementation. Production would use
381 proper time-series bucketing.
382 """
383 events = await self.get_events(version_id)
385 if not events:
386 return []
388 # Group events by time bucket
389 buckets: Dict[str, List[MetricEvent]] = {}
391 for event in events:
392 # Create bucket key based on bucket_size
393 if bucket_size == "hour":
394 bucket_key = event.timestamp.strftime("%Y-%m-%d %H:00")
395 elif bucket_size == "day":
396 bucket_key = event.timestamp.strftime("%Y-%m-%d")
397 elif bucket_size == "week":
398 bucket_key = event.timestamp.strftime("%Y-W%W")
399 else:
400 bucket_key = event.timestamp.strftime("%Y-%m-%d")
402 if bucket_key not in buckets:
403 buckets[bucket_key] = []
404 buckets[bucket_key].append(event)
406 # Compute metrics for each bucket
407 result = []
408 for bucket_key, bucket_events in sorted(buckets.items()):
409 total = len(bucket_events)
410 successes = sum(1 for e in bucket_events if e.success)
412 result.append({
413 "time_bucket": bucket_key,
414 "total_uses": total,
415 "success_count": successes,
416 "success_rate": successes / total if total > 0 else 0.0,
417 "avg_response_time": sum(
418 e.response_time for e in bucket_events if e.response_time
419 ) / total if total > 0 else 0.0,
420 })
422 return result