Coverage for src / dataknobs_llm / prompts / versioning / metrics.py: 15%

112 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 10:28 -0700

1"""Metrics tracking for prompt versions. 

2 

3This module provides: 

4- Event-based metrics collection 

5- Aggregated metrics computation 

6- Performance comparison across versions 

7- Experiment metrics analysis 

8""" 

9 

10from typing import Any, Dict, List 

11from datetime import datetime 

12 

13from .types import ( 

14 PromptMetrics, 

15 MetricEvent, 

16) 

17 

18 

19class MetricsCollector: 

20 """Collects and aggregates metrics for prompt versions. 

21 

22 Tracks usage, performance, and user feedback for each version. 

23 Supports both real-time event recording and aggregated metrics retrieval. 

24 

25 Example: 

26 ```python 

27 collector = MetricsCollector(storage_backend) 

28 

29 # Record a usage event 

30 await collector.record_event( 

31 version_id="v1", 

32 success=True, 

33 response_time=0.5, 

34 tokens=150, 

35 user_rating=4.5 

36 ) 

37 

38 # Get aggregated metrics 

39 metrics = await collector.get_metrics("v1") 

40 print(f"Success rate: {metrics.success_rate:.2%}") 

41 print(f"Avg response time: {metrics.avg_response_time:.2f}s") 

42 

43 # Compare variants in experiment 

44 comparison = await collector.compare_variants( 

45 experiment_id="exp1" 

46 ) 

47 ``` 

48 """ 

49 

50 def __init__(self, storage: Any | None = None): 

51 """Initialize metrics collector. 

52 

53 Args: 

54 storage: Backend storage (dict for in-memory, database for persistence) 

55 If None, uses in-memory dictionary 

56 """ 

57 self.storage = storage if storage is not None else {} 

58 self._metrics: Dict[str, PromptMetrics] = {} # version_id -> PromptMetrics 

59 self._events: Dict[str, List[MetricEvent]] = {} # version_id -> [events] 

60 

61 async def record_event( 

62 self, 

63 version_id: str, 

64 success: bool = True, 

65 response_time: float | None = None, 

66 tokens: int | None = None, 

67 user_rating: float | None = None, 

68 metadata: Dict[str, Any] | None = None, 

69 ) -> MetricEvent: 

70 """Record a single usage event. 

71 

72 Args: 

73 version_id: Version ID this event belongs to 

74 success: Whether the use was successful 

75 response_time: Response time in seconds (None if not applicable) 

76 tokens: Number of tokens used (None if not applicable) 

77 user_rating: User rating 1-5 (None if not provided) 

78 metadata: Additional event metadata 

79 

80 Returns: 

81 Created MetricEvent 

82 

83 Raises: 

84 ValueError: If user_rating is not in valid range 

85 """ 

86 if user_rating is not None and not (1.0 <= user_rating <= 5.0): 

87 raise ValueError(f"User rating must be between 1.0 and 5.0, got {user_rating}") 

88 

89 # Create event 

90 event = MetricEvent( 

91 version_id=version_id, 

92 timestamp=datetime.utcnow(), 

93 success=success, 

94 response_time=response_time, 

95 tokens=tokens, 

96 user_rating=user_rating, 

97 metadata=metadata or {}, 

98 ) 

99 

100 # Store event 

101 if version_id not in self._events: 

102 self._events[version_id] = [] 

103 self._events[version_id].append(event) 

104 

105 # Update aggregated metrics 

106 await self._update_metrics(version_id, event) 

107 

108 # Persist event if backend available 

109 if hasattr(self.storage, "append"): 

110 await self._persist_event(event) 

111 

112 return event 

113 

114 async def get_metrics( 

115 self, 

116 version_id: str, 

117 ) -> PromptMetrics: 

118 """Get aggregated metrics for a version. 

119 

120 If no events have been recorded, returns empty metrics. 

121 

122 Args: 

123 version_id: Version ID 

124 

125 Returns: 

126 PromptMetrics with aggregated statistics 

127 """ 

128 if version_id not in self._metrics: 

129 # Return empty metrics 

130 return PromptMetrics(version_id=version_id) 

131 

132 return self._metrics[version_id] 

133 

134 async def get_events( 

135 self, 

136 version_id: str, 

137 start_time: datetime | None = None, 

138 end_time: datetime | None = None, 

139 limit: int | None = None, 

140 ) -> List[MetricEvent]: 

141 """Get raw events for a version. 

142 

143 Args: 

144 version_id: Version ID 

145 start_time: Filter events after this time 

146 end_time: Filter events before this time 

147 limit: Maximum number of events to return (most recent first) 

148 

149 Returns: 

150 List of MetricEvent objects 

151 """ 

152 events = self._events.get(version_id, []) 

153 

154 # Apply time filters 

155 if start_time: 

156 events = [e for e in events if e.timestamp >= start_time] 

157 if end_time: 

158 events = [e for e in events if e.timestamp <= end_time] 

159 

160 # Sort by timestamp (most recent first) 

161 events = sorted(events, key=lambda e: e.timestamp, reverse=True) 

162 

163 # Apply limit 

164 if limit: 

165 events = events[:limit] 

166 

167 return events 

168 

169 async def compare_variants( 

170 self, 

171 version_ids: List[str], 

172 ) -> Dict[str, PromptMetrics]: 

173 """Compare metrics across multiple versions. 

174 

175 Args: 

176 version_ids: List of version IDs to compare 

177 

178 Returns: 

179 Dictionary mapping version_id to PromptMetrics 

180 """ 

181 comparison = {} 

182 for version_id in version_ids: 

183 comparison[version_id] = await self.get_metrics(version_id) 

184 return comparison 

185 

186 async def get_experiment_metrics( 

187 self, 

188 experiment_id: str, 

189 variant_versions: List[str], 

190 ) -> Dict[str, PromptMetrics]: 

191 """Get metrics for all variants in an experiment. 

192 

193 Convenience method for comparing experiment variants. 

194 

195 Args: 

196 experiment_id: Experiment ID (for metadata) 

197 variant_versions: List of version strings in the experiment 

198 

199 Returns: 

200 Dictionary mapping version to PromptMetrics 

201 """ 

202 return await self.compare_variants(variant_versions) 

203 

204 async def reset_metrics( 

205 self, 

206 version_id: str, 

207 ) -> bool: 

208 """Reset metrics for a version. 

209 

210 Warning: This permanently deletes all events and metrics for this version. 

211 

212 Args: 

213 version_id: Version ID 

214 

215 Returns: 

216 True if reset, False if version not found 

217 """ 

218 if version_id not in self._metrics and version_id not in self._events: 

219 return False 

220 

221 # Clear metrics and events 

222 if version_id in self._metrics: 

223 del self._metrics[version_id] 

224 if version_id in self._events: 

225 del self._events[version_id] 

226 

227 # Persist deletion if backend available 

228 if hasattr(self.storage, "delete"): 

229 await self.storage.delete(f"metrics:{version_id}") 

230 

231 return True 

232 

233 async def get_summary( 

234 self, 

235 version_ids: List[str], 

236 ) -> Dict[str, Any]: 

237 """Get summary statistics across multiple versions. 

238 

239 Args: 

240 version_ids: List of version IDs 

241 

242 Returns: 

243 Summary dictionary with aggregated statistics 

244 """ 

245 all_metrics = await self.compare_variants(version_ids) 

246 

247 total_uses = sum(m.total_uses for m in all_metrics.values()) 

248 total_successes = sum(m.success_count for m in all_metrics.values()) 

249 total_errors = sum(m.error_count for m in all_metrics.values()) 

250 

251 return { 

252 "total_versions": len(version_ids), 

253 "total_uses": total_uses, 

254 "total_successes": total_successes, 

255 "total_errors": total_errors, 

256 "overall_success_rate": total_successes / total_uses if total_uses > 0 else 0.0, 

257 "versions": { 

258 vid: { 

259 "uses": m.total_uses, 

260 "success_rate": m.success_rate, 

261 "avg_response_time": m.avg_response_time, 

262 "avg_tokens": m.avg_tokens, 

263 "avg_rating": m.avg_rating, 

264 } 

265 for vid, m in all_metrics.items() 

266 } 

267 } 

268 

269 # ===== Helper Methods ===== 

270 

271 async def _update_metrics( 

272 self, 

273 version_id: str, 

274 event: MetricEvent, 

275 ): 

276 """Update aggregated metrics with new event.""" 

277 # Get or create metrics 

278 if version_id not in self._metrics: 

279 self._metrics[version_id] = PromptMetrics(version_id=version_id) 

280 

281 metrics = self._metrics[version_id] 

282 

283 # Update counters 

284 metrics.total_uses += 1 

285 if event.success: 

286 metrics.success_count += 1 

287 else: 

288 metrics.error_count += 1 

289 

290 # Update response time 

291 if event.response_time is not None: 

292 metrics.total_response_time += event.response_time 

293 

294 # Update tokens 

295 if event.tokens is not None: 

296 metrics.total_tokens += event.tokens 

297 

298 # Update ratings 

299 if event.user_rating is not None: 

300 metrics.user_ratings.append(event.user_rating) 

301 

302 # Update last used timestamp 

303 metrics.last_used = event.timestamp 

304 

305 # Persist if backend available 

306 if hasattr(self.storage, "set"): 

307 await self._persist_metrics(metrics) 

308 

309 async def _persist_event(self, event: MetricEvent): 

310 """Persist event to backend storage.""" 

311 if hasattr(self.storage, "append"): 

312 key = f"events:{event.version_id}" 

313 await self.storage.append(key, event.to_dict()) 

314 

315 async def _persist_metrics(self, metrics: PromptMetrics): 

316 """Persist metrics to backend storage.""" 

317 if hasattr(self.storage, "set"): 

318 key = f"metrics:{metrics.version_id}" 

319 await self.storage.set(key, metrics.to_dict()) 

320 

321 async def get_top_versions( 

322 self, 

323 version_ids: List[str], 

324 metric: str = "success_rate", 

325 limit: int = 5, 

326 ) -> List[tuple[str, float]]: 

327 """Get top performing versions by a specific metric. 

328 

329 Args: 

330 version_ids: List of version IDs to rank 

331 metric: Metric to rank by ("success_rate", "avg_rating", "avg_response_time") 

332 limit: Number of top versions to return 

333 

334 Returns: 

335 List of (version_id, metric_value) tuples, sorted by metric 

336 

337 Raises: 

338 ValueError: If metric name is invalid 

339 """ 

340 valid_metrics = ["success_rate", "avg_rating", "avg_response_time", "avg_tokens"] 

341 if metric not in valid_metrics: 

342 raise ValueError( 

343 f"Invalid metric: {metric}. Valid metrics: {', '.join(valid_metrics)}" 

344 ) 

345 

346 # Get metrics for all versions 

347 all_metrics = await self.compare_variants(version_ids) 

348 

349 # Extract metric values 

350 metric_values = [ 

351 (vid, getattr(metrics, metric)) 

352 for vid, metrics in all_metrics.items() 

353 if metrics.total_uses > 0 # Only include versions with data 

354 ] 

355 

356 # Sort by metric value 

357 # For response_time, lower is better (reverse=False) 

358 # For success_rate, rating, higher is better (reverse=True) 

359 reverse = metric != "avg_response_time" 

360 sorted_versions = sorted(metric_values, key=lambda x: x[1], reverse=reverse) 

361 

362 return sorted_versions[:limit] 

363 

364 async def get_version_performance_over_time( 

365 self, 

366 version_id: str, 

367 bucket_size: str = "hour", 

368 ) -> List[Dict[str, Any]]: 

369 """Get performance metrics bucketed by time period. 

370 

371 Args: 

372 version_id: Version ID 

373 bucket_size: Time bucket size ("hour", "day", "week") 

374 

375 Returns: 

376 List of time-bucketed metrics 

377 

378 Note: 

379 This is a simplified implementation. Production would use 

380 proper time-series bucketing. 

381 """ 

382 events = await self.get_events(version_id) 

383 

384 if not events: 

385 return [] 

386 

387 # Group events by time bucket 

388 buckets: Dict[str, List[MetricEvent]] = {} 

389 

390 for event in events: 

391 # Create bucket key based on bucket_size 

392 if bucket_size == "hour": 

393 bucket_key = event.timestamp.strftime("%Y-%m-%d %H:00") 

394 elif bucket_size == "day": 

395 bucket_key = event.timestamp.strftime("%Y-%m-%d") 

396 elif bucket_size == "week": 

397 bucket_key = event.timestamp.strftime("%Y-W%W") 

398 else: 

399 bucket_key = event.timestamp.strftime("%Y-%m-%d") 

400 

401 if bucket_key not in buckets: 

402 buckets[bucket_key] = [] 

403 buckets[bucket_key].append(event) 

404 

405 # Compute metrics for each bucket 

406 result = [] 

407 for bucket_key, bucket_events in sorted(buckets.items()): 

408 total = len(bucket_events) 

409 successes = sum(1 for e in bucket_events if e.success) 

410 

411 result.append({ 

412 "time_bucket": bucket_key, 

413 "total_uses": total, 

414 "success_count": successes, 

415 "success_rate": successes / total if total > 0 else 0.0, 

416 "avg_response_time": sum( 

417 e.response_time for e in bucket_events if e.response_time 

418 ) / total if total > 0 else 0.0, 

419 }) 

420 

421 return result