Coverage for src/dataknobs_llm/prompts/versioning/metrics.py: 71%

112 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-31 16:05 -0600

1"""Metrics tracking for prompt versions. 

2 

3This module provides: 

4- Event-based metrics collection 

5- Aggregated metrics computation 

6- Performance comparison across versions 

7- Experiment metrics analysis 

8""" 

9 

10from typing import Any, Dict, List, Optional 

11from datetime import datetime 

12 

13from .types import ( 

14 PromptMetrics, 

15 MetricEvent, 

16 VersioningError, 

17) 

18 

19 

20class MetricsCollector: 

21 """Collects and aggregates metrics for prompt versions. 

22 

23 Tracks usage, performance, and user feedback for each version. 

24 Supports both real-time event recording and aggregated metrics retrieval. 

25 

26 Example: 

27 ```python 

28 collector = MetricsCollector(storage_backend) 

29 

30 # Record a usage event 

31 await collector.record_event( 

32 version_id="v1", 

33 success=True, 

34 response_time=0.5, 

35 tokens=150, 

36 user_rating=4.5 

37 ) 

38 

39 # Get aggregated metrics 

40 metrics = await collector.get_metrics("v1") 

41 print(f"Success rate: {metrics.success_rate:.2%}") 

42 print(f"Avg response time: {metrics.avg_response_time:.2f}s") 

43 

44 # Compare variants in experiment 

45 comparison = await collector.compare_variants( 

46 experiment_id="exp1" 

47 ) 

48 ``` 

49 """ 

50 

51 def __init__(self, storage: Optional[Any] = None): 

52 """Initialize metrics collector. 

53 

54 Args: 

55 storage: Backend storage (dict for in-memory, database for persistence) 

56 If None, uses in-memory dictionary 

57 """ 

58 self.storage = storage if storage is not None else {} 

59 self._metrics: Dict[str, PromptMetrics] = {} # version_id -> PromptMetrics 

60 self._events: Dict[str, List[MetricEvent]] = {} # version_id -> [events] 

61 

62 async def record_event( 

63 self, 

64 version_id: str, 

65 success: bool = True, 

66 response_time: Optional[float] = None, 

67 tokens: Optional[int] = None, 

68 user_rating: Optional[float] = None, 

69 metadata: Optional[Dict[str, Any]] = None, 

70 ) -> MetricEvent: 

71 """Record a single usage event. 

72 

73 Args: 

74 version_id: Version ID this event belongs to 

75 success: Whether the use was successful 

76 response_time: Response time in seconds (None if not applicable) 

77 tokens: Number of tokens used (None if not applicable) 

78 user_rating: User rating 1-5 (None if not provided) 

79 metadata: Additional event metadata 

80 

81 Returns: 

82 Created MetricEvent 

83 

84 Raises: 

85 ValueError: If user_rating is not in valid range 

86 """ 

87 if user_rating is not None and not (1.0 <= user_rating <= 5.0): 

88 raise ValueError(f"User rating must be between 1.0 and 5.0, got {user_rating}") 

89 

90 # Create event 

91 event = MetricEvent( 

92 version_id=version_id, 

93 timestamp=datetime.utcnow(), 

94 success=success, 

95 response_time=response_time, 

96 tokens=tokens, 

97 user_rating=user_rating, 

98 metadata=metadata or {}, 

99 ) 

100 

101 # Store event 

102 if version_id not in self._events: 

103 self._events[version_id] = [] 

104 self._events[version_id].append(event) 

105 

106 # Update aggregated metrics 

107 await self._update_metrics(version_id, event) 

108 

109 # Persist event if backend available 

110 if hasattr(self.storage, "append"): 

111 await self._persist_event(event) 

112 

113 return event 

114 

115 async def get_metrics( 

116 self, 

117 version_id: str, 

118 ) -> PromptMetrics: 

119 """Get aggregated metrics for a version. 

120 

121 If no events have been recorded, returns empty metrics. 

122 

123 Args: 

124 version_id: Version ID 

125 

126 Returns: 

127 PromptMetrics with aggregated statistics 

128 """ 

129 if version_id not in self._metrics: 

130 # Return empty metrics 

131 return PromptMetrics(version_id=version_id) 

132 

133 return self._metrics[version_id] 

134 

135 async def get_events( 

136 self, 

137 version_id: str, 

138 start_time: Optional[datetime] = None, 

139 end_time: Optional[datetime] = None, 

140 limit: Optional[int] = None, 

141 ) -> List[MetricEvent]: 

142 """Get raw events for a version. 

143 

144 Args: 

145 version_id: Version ID 

146 start_time: Filter events after this time 

147 end_time: Filter events before this time 

148 limit: Maximum number of events to return (most recent first) 

149 

150 Returns: 

151 List of MetricEvent objects 

152 """ 

153 events = self._events.get(version_id, []) 

154 

155 # Apply time filters 

156 if start_time: 

157 events = [e for e in events if e.timestamp >= start_time] 

158 if end_time: 

159 events = [e for e in events if e.timestamp <= end_time] 

160 

161 # Sort by timestamp (most recent first) 

162 events = sorted(events, key=lambda e: e.timestamp, reverse=True) 

163 

164 # Apply limit 

165 if limit: 

166 events = events[:limit] 

167 

168 return events 

169 

170 async def compare_variants( 

171 self, 

172 version_ids: List[str], 

173 ) -> Dict[str, PromptMetrics]: 

174 """Compare metrics across multiple versions. 

175 

176 Args: 

177 version_ids: List of version IDs to compare 

178 

179 Returns: 

180 Dictionary mapping version_id to PromptMetrics 

181 """ 

182 comparison = {} 

183 for version_id in version_ids: 

184 comparison[version_id] = await self.get_metrics(version_id) 

185 return comparison 

186 

187 async def get_experiment_metrics( 

188 self, 

189 experiment_id: str, 

190 variant_versions: List[str], 

191 ) -> Dict[str, PromptMetrics]: 

192 """Get metrics for all variants in an experiment. 

193 

194 Convenience method for comparing experiment variants. 

195 

196 Args: 

197 experiment_id: Experiment ID (for metadata) 

198 variant_versions: List of version strings in the experiment 

199 

200 Returns: 

201 Dictionary mapping version to PromptMetrics 

202 """ 

203 return await self.compare_variants(variant_versions) 

204 

205 async def reset_metrics( 

206 self, 

207 version_id: str, 

208 ) -> bool: 

209 """Reset metrics for a version. 

210 

211 Warning: This permanently deletes all events and metrics for this version. 

212 

213 Args: 

214 version_id: Version ID 

215 

216 Returns: 

217 True if reset, False if version not found 

218 """ 

219 if version_id not in self._metrics and version_id not in self._events: 

220 return False 

221 

222 # Clear metrics and events 

223 if version_id in self._metrics: 

224 del self._metrics[version_id] 

225 if version_id in self._events: 

226 del self._events[version_id] 

227 

228 # Persist deletion if backend available 

229 if hasattr(self.storage, "delete"): 

230 await self.storage.delete(f"metrics:{version_id}") 

231 

232 return True 

233 

234 async def get_summary( 

235 self, 

236 version_ids: List[str], 

237 ) -> Dict[str, Any]: 

238 """Get summary statistics across multiple versions. 

239 

240 Args: 

241 version_ids: List of version IDs 

242 

243 Returns: 

244 Summary dictionary with aggregated statistics 

245 """ 

246 all_metrics = await self.compare_variants(version_ids) 

247 

248 total_uses = sum(m.total_uses for m in all_metrics.values()) 

249 total_successes = sum(m.success_count for m in all_metrics.values()) 

250 total_errors = sum(m.error_count for m in all_metrics.values()) 

251 

252 return { 

253 "total_versions": len(version_ids), 

254 "total_uses": total_uses, 

255 "total_successes": total_successes, 

256 "total_errors": total_errors, 

257 "overall_success_rate": total_successes / total_uses if total_uses > 0 else 0.0, 

258 "versions": { 

259 vid: { 

260 "uses": m.total_uses, 

261 "success_rate": m.success_rate, 

262 "avg_response_time": m.avg_response_time, 

263 "avg_tokens": m.avg_tokens, 

264 "avg_rating": m.avg_rating, 

265 } 

266 for vid, m in all_metrics.items() 

267 } 

268 } 

269 

270 # ===== Helper Methods ===== 

271 

272 async def _update_metrics( 

273 self, 

274 version_id: str, 

275 event: MetricEvent, 

276 ): 

277 """Update aggregated metrics with new event.""" 

278 # Get or create metrics 

279 if version_id not in self._metrics: 

280 self._metrics[version_id] = PromptMetrics(version_id=version_id) 

281 

282 metrics = self._metrics[version_id] 

283 

284 # Update counters 

285 metrics.total_uses += 1 

286 if event.success: 

287 metrics.success_count += 1 

288 else: 

289 metrics.error_count += 1 

290 

291 # Update response time 

292 if event.response_time is not None: 

293 metrics.total_response_time += event.response_time 

294 

295 # Update tokens 

296 if event.tokens is not None: 

297 metrics.total_tokens += event.tokens 

298 

299 # Update ratings 

300 if event.user_rating is not None: 

301 metrics.user_ratings.append(event.user_rating) 

302 

303 # Update last used timestamp 

304 metrics.last_used = event.timestamp 

305 

306 # Persist if backend available 

307 if hasattr(self.storage, "set"): 

308 await self._persist_metrics(metrics) 

309 

310 async def _persist_event(self, event: MetricEvent): 

311 """Persist event to backend storage.""" 

312 if hasattr(self.storage, "append"): 

313 key = f"events:{event.version_id}" 

314 await self.storage.append(key, event.to_dict()) 

315 

316 async def _persist_metrics(self, metrics: PromptMetrics): 

317 """Persist metrics to backend storage.""" 

318 if hasattr(self.storage, "set"): 

319 key = f"metrics:{metrics.version_id}" 

320 await self.storage.set(key, metrics.to_dict()) 

321 

322 async def get_top_versions( 

323 self, 

324 version_ids: List[str], 

325 metric: str = "success_rate", 

326 limit: int = 5, 

327 ) -> List[tuple[str, float]]: 

328 """Get top performing versions by a specific metric. 

329 

330 Args: 

331 version_ids: List of version IDs to rank 

332 metric: Metric to rank by ("success_rate", "avg_rating", "avg_response_time") 

333 limit: Number of top versions to return 

334 

335 Returns: 

336 List of (version_id, metric_value) tuples, sorted by metric 

337 

338 Raises: 

339 ValueError: If metric name is invalid 

340 """ 

341 valid_metrics = ["success_rate", "avg_rating", "avg_response_time", "avg_tokens"] 

342 if metric not in valid_metrics: 

343 raise ValueError( 

344 f"Invalid metric: {metric}. Valid metrics: {', '.join(valid_metrics)}" 

345 ) 

346 

347 # Get metrics for all versions 

348 all_metrics = await self.compare_variants(version_ids) 

349 

350 # Extract metric values 

351 metric_values = [ 

352 (vid, getattr(metrics, metric)) 

353 for vid, metrics in all_metrics.items() 

354 if metrics.total_uses > 0 # Only include versions with data 

355 ] 

356 

357 # Sort by metric value 

358 # For response_time, lower is better (reverse=False) 

359 # For success_rate, rating, higher is better (reverse=True) 

360 reverse = metric != "avg_response_time" 

361 sorted_versions = sorted(metric_values, key=lambda x: x[1], reverse=reverse) 

362 

363 return sorted_versions[:limit] 

364 

365 async def get_version_performance_over_time( 

366 self, 

367 version_id: str, 

368 bucket_size: str = "hour", 

369 ) -> List[Dict[str, Any]]: 

370 """Get performance metrics bucketed by time period. 

371 

372 Args: 

373 version_id: Version ID 

374 bucket_size: Time bucket size ("hour", "day", "week") 

375 

376 Returns: 

377 List of time-bucketed metrics 

378 

379 Note: 

380 This is a simplified implementation. Production would use 

381 proper time-series bucketing. 

382 """ 

383 events = await self.get_events(version_id) 

384 

385 if not events: 

386 return [] 

387 

388 # Group events by time bucket 

389 buckets: Dict[str, List[MetricEvent]] = {} 

390 

391 for event in events: 

392 # Create bucket key based on bucket_size 

393 if bucket_size == "hour": 

394 bucket_key = event.timestamp.strftime("%Y-%m-%d %H:00") 

395 elif bucket_size == "day": 

396 bucket_key = event.timestamp.strftime("%Y-%m-%d") 

397 elif bucket_size == "week": 

398 bucket_key = event.timestamp.strftime("%Y-W%W") 

399 else: 

400 bucket_key = event.timestamp.strftime("%Y-%m-%d") 

401 

402 if bucket_key not in buckets: 

403 buckets[bucket_key] = [] 

404 buckets[bucket_key].append(event) 

405 

406 # Compute metrics for each bucket 

407 result = [] 

408 for bucket_key, bucket_events in sorted(buckets.items()): 

409 total = len(bucket_events) 

410 successes = sum(1 for e in bucket_events if e.success) 

411 

412 result.append({ 

413 "time_bucket": bucket_key, 

414 "total_uses": total, 

415 "success_count": successes, 

416 "success_rate": successes / total if total > 0 else 0.0, 

417 "avg_response_time": sum( 

418 e.response_time for e in bucket_events if e.response_time 

419 ) / total if total > 0 else 0.0, 

420 }) 

421 

422 return result