Coverage for src/dataknobs_llm/prompts/versioning/metrics.py: 71%
112 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Metrics tracking for prompt versions.
3This module provides:
4- Event-based metrics collection
5- Aggregated metrics computation
6- Performance comparison across versions
7- Experiment metrics analysis
8"""
10from typing import Any, Dict, List
11from datetime import datetime
13from .types import (
14 PromptMetrics,
15 MetricEvent,
16)
19class MetricsCollector:
20 """Collects and aggregates metrics for prompt versions.
22 Tracks usage, performance, and user feedback for each version.
23 Supports both real-time event recording and aggregated metrics retrieval.
25 Example:
26 ```python
27 collector = MetricsCollector(storage_backend)
29 # Record a usage event
30 await collector.record_event(
31 version_id="v1",
32 success=True,
33 response_time=0.5,
34 tokens=150,
35 user_rating=4.5
36 )
38 # Get aggregated metrics
39 metrics = await collector.get_metrics("v1")
40 print(f"Success rate: {metrics.success_rate:.2%}")
41 print(f"Avg response time: {metrics.avg_response_time:.2f}s")
43 # Compare variants in experiment
44 comparison = await collector.compare_variants(
45 experiment_id="exp1"
46 )
47 ```
48 """
50 def __init__(self, storage: Any | None = None):
51 """Initialize metrics collector.
53 Args:
54 storage: Backend storage (dict for in-memory, database for persistence)
55 If None, uses in-memory dictionary
56 """
57 self.storage = storage if storage is not None else {}
58 self._metrics: Dict[str, PromptMetrics] = {} # version_id -> PromptMetrics
59 self._events: Dict[str, List[MetricEvent]] = {} # version_id -> [events]
61 async def record_event(
62 self,
63 version_id: str,
64 success: bool = True,
65 response_time: float | None = None,
66 tokens: int | None = None,
67 user_rating: float | None = None,
68 metadata: Dict[str, Any] | None = None,
69 ) -> MetricEvent:
70 """Record a single usage event.
72 Args:
73 version_id: Version ID this event belongs to
74 success: Whether the use was successful
75 response_time: Response time in seconds (None if not applicable)
76 tokens: Number of tokens used (None if not applicable)
77 user_rating: User rating 1-5 (None if not provided)
78 metadata: Additional event metadata
80 Returns:
81 Created MetricEvent
83 Raises:
84 ValueError: If user_rating is not in valid range
85 """
86 if user_rating is not None and not (1.0 <= user_rating <= 5.0):
87 raise ValueError(f"User rating must be between 1.0 and 5.0, got {user_rating}")
89 # Create event
90 event = MetricEvent(
91 version_id=version_id,
92 timestamp=datetime.utcnow(),
93 success=success,
94 response_time=response_time,
95 tokens=tokens,
96 user_rating=user_rating,
97 metadata=metadata or {},
98 )
100 # Store event
101 if version_id not in self._events:
102 self._events[version_id] = []
103 self._events[version_id].append(event)
105 # Update aggregated metrics
106 await self._update_metrics(version_id, event)
108 # Persist event if backend available
109 if hasattr(self.storage, "append"):
110 await self._persist_event(event)
112 return event
114 async def get_metrics(
115 self,
116 version_id: str,
117 ) -> PromptMetrics:
118 """Get aggregated metrics for a version.
120 If no events have been recorded, returns empty metrics.
122 Args:
123 version_id: Version ID
125 Returns:
126 PromptMetrics with aggregated statistics
127 """
128 if version_id not in self._metrics:
129 # Return empty metrics
130 return PromptMetrics(version_id=version_id)
132 return self._metrics[version_id]
134 async def get_events(
135 self,
136 version_id: str,
137 start_time: datetime | None = None,
138 end_time: datetime | None = None,
139 limit: int | None = None,
140 ) -> List[MetricEvent]:
141 """Get raw events for a version.
143 Args:
144 version_id: Version ID
145 start_time: Filter events after this time
146 end_time: Filter events before this time
147 limit: Maximum number of events to return (most recent first)
149 Returns:
150 List of MetricEvent objects
151 """
152 events = self._events.get(version_id, [])
154 # Apply time filters
155 if start_time:
156 events = [e for e in events if e.timestamp >= start_time]
157 if end_time:
158 events = [e for e in events if e.timestamp <= end_time]
160 # Sort by timestamp (most recent first)
161 events = sorted(events, key=lambda e: e.timestamp, reverse=True)
163 # Apply limit
164 if limit:
165 events = events[:limit]
167 return events
169 async def compare_variants(
170 self,
171 version_ids: List[str],
172 ) -> Dict[str, PromptMetrics]:
173 """Compare metrics across multiple versions.
175 Args:
176 version_ids: List of version IDs to compare
178 Returns:
179 Dictionary mapping version_id to PromptMetrics
180 """
181 comparison = {}
182 for version_id in version_ids:
183 comparison[version_id] = await self.get_metrics(version_id)
184 return comparison
186 async def get_experiment_metrics(
187 self,
188 experiment_id: str,
189 variant_versions: List[str],
190 ) -> Dict[str, PromptMetrics]:
191 """Get metrics for all variants in an experiment.
193 Convenience method for comparing experiment variants.
195 Args:
196 experiment_id: Experiment ID (for metadata)
197 variant_versions: List of version strings in the experiment
199 Returns:
200 Dictionary mapping version to PromptMetrics
201 """
202 return await self.compare_variants(variant_versions)
204 async def reset_metrics(
205 self,
206 version_id: str,
207 ) -> bool:
208 """Reset metrics for a version.
210 Warning: This permanently deletes all events and metrics for this version.
212 Args:
213 version_id: Version ID
215 Returns:
216 True if reset, False if version not found
217 """
218 if version_id not in self._metrics and version_id not in self._events:
219 return False
221 # Clear metrics and events
222 if version_id in self._metrics:
223 del self._metrics[version_id]
224 if version_id in self._events:
225 del self._events[version_id]
227 # Persist deletion if backend available
228 if hasattr(self.storage, "delete"):
229 await self.storage.delete(f"metrics:{version_id}")
231 return True
233 async def get_summary(
234 self,
235 version_ids: List[str],
236 ) -> Dict[str, Any]:
237 """Get summary statistics across multiple versions.
239 Args:
240 version_ids: List of version IDs
242 Returns:
243 Summary dictionary with aggregated statistics
244 """
245 all_metrics = await self.compare_variants(version_ids)
247 total_uses = sum(m.total_uses for m in all_metrics.values())
248 total_successes = sum(m.success_count for m in all_metrics.values())
249 total_errors = sum(m.error_count for m in all_metrics.values())
251 return {
252 "total_versions": len(version_ids),
253 "total_uses": total_uses,
254 "total_successes": total_successes,
255 "total_errors": total_errors,
256 "overall_success_rate": total_successes / total_uses if total_uses > 0 else 0.0,
257 "versions": {
258 vid: {
259 "uses": m.total_uses,
260 "success_rate": m.success_rate,
261 "avg_response_time": m.avg_response_time,
262 "avg_tokens": m.avg_tokens,
263 "avg_rating": m.avg_rating,
264 }
265 for vid, m in all_metrics.items()
266 }
267 }
269 # ===== Helper Methods =====
271 async def _update_metrics(
272 self,
273 version_id: str,
274 event: MetricEvent,
275 ):
276 """Update aggregated metrics with new event."""
277 # Get or create metrics
278 if version_id not in self._metrics:
279 self._metrics[version_id] = PromptMetrics(version_id=version_id)
281 metrics = self._metrics[version_id]
283 # Update counters
284 metrics.total_uses += 1
285 if event.success:
286 metrics.success_count += 1
287 else:
288 metrics.error_count += 1
290 # Update response time
291 if event.response_time is not None:
292 metrics.total_response_time += event.response_time
294 # Update tokens
295 if event.tokens is not None:
296 metrics.total_tokens += event.tokens
298 # Update ratings
299 if event.user_rating is not None:
300 metrics.user_ratings.append(event.user_rating)
302 # Update last used timestamp
303 metrics.last_used = event.timestamp
305 # Persist if backend available
306 if hasattr(self.storage, "set"):
307 await self._persist_metrics(metrics)
309 async def _persist_event(self, event: MetricEvent):
310 """Persist event to backend storage."""
311 if hasattr(self.storage, "append"):
312 key = f"events:{event.version_id}"
313 await self.storage.append(key, event.to_dict())
315 async def _persist_metrics(self, metrics: PromptMetrics):
316 """Persist metrics to backend storage."""
317 if hasattr(self.storage, "set"):
318 key = f"metrics:{metrics.version_id}"
319 await self.storage.set(key, metrics.to_dict())
321 async def get_top_versions(
322 self,
323 version_ids: List[str],
324 metric: str = "success_rate",
325 limit: int = 5,
326 ) -> List[tuple[str, float]]:
327 """Get top performing versions by a specific metric.
329 Args:
330 version_ids: List of version IDs to rank
331 metric: Metric to rank by ("success_rate", "avg_rating", "avg_response_time")
332 limit: Number of top versions to return
334 Returns:
335 List of (version_id, metric_value) tuples, sorted by metric
337 Raises:
338 ValueError: If metric name is invalid
339 """
340 valid_metrics = ["success_rate", "avg_rating", "avg_response_time", "avg_tokens"]
341 if metric not in valid_metrics:
342 raise ValueError(
343 f"Invalid metric: {metric}. Valid metrics: {', '.join(valid_metrics)}"
344 )
346 # Get metrics for all versions
347 all_metrics = await self.compare_variants(version_ids)
349 # Extract metric values
350 metric_values = [
351 (vid, getattr(metrics, metric))
352 for vid, metrics in all_metrics.items()
353 if metrics.total_uses > 0 # Only include versions with data
354 ]
356 # Sort by metric value
357 # For response_time, lower is better (reverse=False)
358 # For success_rate, rating, higher is better (reverse=True)
359 reverse = metric != "avg_response_time"
360 sorted_versions = sorted(metric_values, key=lambda x: x[1], reverse=reverse)
362 return sorted_versions[:limit]
364 async def get_version_performance_over_time(
365 self,
366 version_id: str,
367 bucket_size: str = "hour",
368 ) -> List[Dict[str, Any]]:
369 """Get performance metrics bucketed by time period.
371 Args:
372 version_id: Version ID
373 bucket_size: Time bucket size ("hour", "day", "week")
375 Returns:
376 List of time-bucketed metrics
378 Note:
379 This is a simplified implementation. Production would use
380 proper time-series bucketing.
381 """
382 events = await self.get_events(version_id)
384 if not events:
385 return []
387 # Group events by time bucket
388 buckets: Dict[str, List[MetricEvent]] = {}
390 for event in events:
391 # Create bucket key based on bucket_size
392 if bucket_size == "hour":
393 bucket_key = event.timestamp.strftime("%Y-%m-%d %H:00")
394 elif bucket_size == "day":
395 bucket_key = event.timestamp.strftime("%Y-%m-%d")
396 elif bucket_size == "week":
397 bucket_key = event.timestamp.strftime("%Y-W%W")
398 else:
399 bucket_key = event.timestamp.strftime("%Y-%m-%d")
401 if bucket_key not in buckets:
402 buckets[bucket_key] = []
403 buckets[bucket_key].append(event)
405 # Compute metrics for each bucket
406 result = []
407 for bucket_key, bucket_events in sorted(buckets.items()):
408 total = len(bucket_events)
409 successes = sum(1 for e in bucket_events if e.success)
411 result.append({
412 "time_bucket": bucket_key,
413 "total_uses": total,
414 "success_count": successes,
415 "success_rate": successes / total if total > 0 else 0.0,
416 "avg_response_time": sum(
417 e.response_time for e in bucket_events if e.response_time
418 ) / total if total > 0 else 0.0,
419 })
421 return result