Coverage for src / dataknobs_llm / prompts / implementations / versioned_library.py: 24%

114 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 10:28 -0700

1"""Versioned prompt library implementation. 

2 

3This module provides a prompt library with full versioning support, 

4combining version management, A/B testing, and metrics tracking. 

5""" 

6 

7from typing import Any, Dict, List 

8 

9from ..base import AbstractPromptLibrary, PromptTemplateDict, MessageIndex, RAGConfig 

10from ..versioning import ( 

11 VersionManager, 

12 ABTestManager, 

13 MetricsCollector, 

14 PromptVersion, 

15 PromptExperiment, 

16 PromptVariant, 

17 PromptMetrics, 

18 VersionStatus, 

19) 

20 

21 

22class VersionedPromptLibrary(AbstractPromptLibrary): 

23 """Prompt library with versioning, A/B testing, and metrics tracking. 

24 

25 This library extends the base prompt library interface with: 

26 - Version management with semantic versioning 

27 - A/B testing experiments with traffic splitting 

28 - Performance metrics tracking 

29 - Rollback capabilities 

30 

31 Example: 

32 ```python 

33 from dataknobs_llm.prompts import VersionedPromptLibrary 

34 

35 # Create library (with optional backend storage) 

36 library = VersionedPromptLibrary(storage=backend) 

37 

38 # Create a version 

39 v1 = await library.create_version( 

40 name="greeting", 

41 prompt_type="system", 

42 template="Hello {{name}}!", 

43 version="1.0.0" 

44 ) 

45 

46 # Get latest version (returns PromptTemplateDict for compatibility) 

47 template = library.get_system_prompt("greeting") 

48 

49 # Create A/B test 

50 experiment = await library.create_experiment( 

51 name="greeting", 

52 prompt_type="system", 

53 variants=[ 

54 PromptVariant("1.0.0", 0.5, "Original"), 

55 PromptVariant("1.1.0", 0.5, "Improved") 

56 ] 

57 ) 

58 

59 # Track metrics 

60 await library.record_usage( 

61 version_id=v1.version_id, 

62 success=True, 

63 response_time=0.5, 

64 tokens=100 

65 ) 

66 ``` 

67 """ 

68 

69 def __init__( 

70 self, 

71 storage: Any | None = None, 

72 base_library: AbstractPromptLibrary | None = None, 

73 ): 

74 """Initialize versioned prompt library. 

75 

76 Args: 

77 storage: Backend storage for persistence (None for in-memory) 

78 base_library: Optional base library to wrap (for migration) 

79 """ 

80 self.storage = storage 

81 self.base_library = base_library 

82 

83 # Initialize managers 

84 self.version_manager = VersionManager(storage) 

85 self.ab_test_manager = ABTestManager(storage) 

86 self.metrics_collector = MetricsCollector(storage) 

87 

88 # Cache for converting versions to templates 

89 self._template_cache: Dict[str, PromptTemplateDict] = {} 

90 

91 # ===== Version Management API ===== 

92 

93 async def create_version( 

94 self, 

95 name: str, 

96 prompt_type: str, 

97 template: str, 

98 version: str | None = None, 

99 defaults: Dict[str, Any] | None = None, 

100 validation: Dict[str, Any] | None = None, 

101 metadata: Dict[str, Any] | None = None, 

102 created_by: str | None = None, 

103 tags: List[str] | None = None, 

104 status: VersionStatus = VersionStatus.ACTIVE, 

105 ) -> PromptVersion: 

106 """Create a new prompt version. 

107 

108 Args: 

109 name: Prompt name 

110 prompt_type: Prompt type ("system", "user", "message") 

111 template: Template content 

112 version: Semantic version (auto-increments if None) 

113 defaults: Default parameter values 

114 validation: Validation configuration 

115 metadata: Additional metadata 

116 created_by: Creator username/ID 

117 tags: List of tags 

118 status: Initial status 

119 

120 Returns: 

121 Created PromptVersion 

122 """ 

123 # Find parent version (latest version) 

124 latest = await self.version_manager.get_version(name, prompt_type) 

125 parent_version = latest.version_id if latest else None 

126 

127 return await self.version_manager.create_version( 

128 name=name, 

129 prompt_type=prompt_type, 

130 template=template, 

131 version=version, 

132 defaults=defaults, 

133 validation=validation, 

134 metadata=metadata, 

135 created_by=created_by, 

136 parent_version=parent_version, 

137 tags=tags, 

138 status=status, 

139 ) 

140 

141 async def get_version( 

142 self, 

143 name: str, 

144 prompt_type: str, 

145 version: str = "latest", 

146 ) -> PromptVersion | None: 

147 """Get a specific prompt version. 

148 

149 Args: 

150 name: Prompt name 

151 prompt_type: Prompt type 

152 version: Version string or "latest" 

153 

154 Returns: 

155 PromptVersion if found, None otherwise 

156 """ 

157 return await self.version_manager.get_version(name, prompt_type, version) 

158 

159 async def list_versions( 

160 self, 

161 name: str, 

162 prompt_type: str, 

163 tags: List[str] | None = None, 

164 status: VersionStatus | None = None, 

165 ) -> List[PromptVersion]: 

166 """List all versions of a prompt. 

167 

168 Args: 

169 name: Prompt name 

170 prompt_type: Prompt type 

171 tags: Filter by tags 

172 status: Filter by status 

173 

174 Returns: 

175 List of PromptVersion objects 

176 """ 

177 return await self.version_manager.list_versions(name, prompt_type, tags, status) 

178 

179 async def tag_version( 

180 self, 

181 version_id: str, 

182 tag: str, 

183 ) -> PromptVersion: 

184 """Add a tag to a version. 

185 

186 Args: 

187 version_id: Version ID 

188 tag: Tag to add 

189 

190 Returns: 

191 Updated PromptVersion 

192 """ 

193 return await self.version_manager.tag_version(version_id, tag) 

194 

195 # ===== A/B Testing API ===== 

196 

197 async def create_experiment( 

198 self, 

199 name: str, 

200 prompt_type: str, 

201 variants: List[PromptVariant], 

202 traffic_split: Dict[str, float] | None = None, 

203 metadata: Dict[str, Any] | None = None, 

204 ) -> PromptExperiment: 

205 """Create an A/B test experiment. 

206 

207 Args: 

208 name: Prompt name 

209 prompt_type: Prompt type 

210 variants: List of variants to test 

211 traffic_split: Optional custom traffic split 

212 metadata: Additional metadata 

213 

214 Returns: 

215 Created PromptExperiment 

216 """ 

217 return await self.ab_test_manager.create_experiment( 

218 name=name, 

219 prompt_type=prompt_type, 

220 variants=variants, 

221 traffic_split=traffic_split, 

222 metadata=metadata, 

223 ) 

224 

225 async def get_variant_for_user( 

226 self, 

227 experiment_id: str, 

228 user_id: str, 

229 ) -> str: 

230 """Get variant for a user (sticky assignment). 

231 

232 Args: 

233 experiment_id: Experiment ID 

234 user_id: User identifier 

235 

236 Returns: 

237 Version string of assigned variant 

238 """ 

239 return await self.ab_test_manager.get_variant_for_user(experiment_id, user_id) 

240 

241 async def get_random_variant( 

242 self, 

243 experiment_id: str, 

244 ) -> str: 

245 """Get a random variant. 

246 

247 Args: 

248 experiment_id: Experiment ID 

249 

250 Returns: 

251 Version string of selected variant 

252 """ 

253 return await self.ab_test_manager.get_random_variant(experiment_id) 

254 

255 async def get_experiment( 

256 self, 

257 experiment_id: str, 

258 ) -> PromptExperiment | None: 

259 """Get an experiment by ID. 

260 

261 Args: 

262 experiment_id: Experiment ID 

263 

264 Returns: 

265 PromptExperiment if found, None otherwise 

266 """ 

267 return await self.ab_test_manager.get_experiment(experiment_id) 

268 

269 async def list_experiments( 

270 self, 

271 name: str | None = None, 

272 prompt_type: str | None = None, 

273 status: str | None = None, 

274 ) -> List[PromptExperiment]: 

275 """List experiments. 

276 

277 Args: 

278 name: Filter by prompt name 

279 prompt_type: Filter by prompt type 

280 status: Filter by status 

281 

282 Returns: 

283 List of experiments 

284 """ 

285 return await self.ab_test_manager.list_experiments(name, prompt_type, status) 

286 

287 # ===== Metrics API ===== 

288 

289 async def record_usage( 

290 self, 

291 version_id: str, 

292 success: bool = True, 

293 response_time: float | None = None, 

294 tokens: int | None = None, 

295 user_rating: float | None = None, 

296 metadata: Dict[str, Any] | None = None, 

297 ): 

298 """Record a usage event for metrics tracking. 

299 

300 Args: 

301 version_id: Version ID 

302 success: Whether the use was successful 

303 response_time: Response time in seconds 

304 tokens: Number of tokens used 

305 user_rating: User rating 1-5 

306 metadata: Additional event metadata 

307 """ 

308 await self.metrics_collector.record_event( 

309 version_id=version_id, 

310 success=success, 

311 response_time=response_time, 

312 tokens=tokens, 

313 user_rating=user_rating, 

314 metadata=metadata, 

315 ) 

316 

317 async def get_metrics( 

318 self, 

319 version_id: str, 

320 ) -> PromptMetrics: 

321 """Get metrics for a version. 

322 

323 Args: 

324 version_id: Version ID 

325 

326 Returns: 

327 PromptMetrics with aggregated statistics 

328 """ 

329 return await self.metrics_collector.get_metrics(version_id) 

330 

331 async def compare_variants( 

332 self, 

333 version_ids: List[str], 

334 ) -> Dict[str, PromptMetrics]: 

335 """Compare metrics across versions. 

336 

337 Args: 

338 version_ids: List of version IDs 

339 

340 Returns: 

341 Dictionary mapping version_id to PromptMetrics 

342 """ 

343 return await self.metrics_collector.compare_variants(version_ids) 

344 

345 # ===== AbstractPromptLibrary Implementation ===== 

346 

347 def get_system_prompt( 

348 self, 

349 name: str, 

350 version: str = "latest", 

351 **kwargs: Any 

352 ) -> PromptTemplateDict | None: 

353 """Get a system prompt template. 

354 

355 This method is synchronous for compatibility with AbstractPromptLibrary. 

356 For async version access, use get_version() directly. 

357 

358 Args: 

359 name: Prompt name 

360 version: Version string or "latest" 

361 **kwargs: Additional parameters 

362 

363 Returns: 

364 PromptTemplateDict if found, None otherwise 

365 """ 

366 import asyncio 

367 

368 # Run async version retrieval 

369 try: 

370 loop = asyncio.get_event_loop() 

371 except RuntimeError: 

372 loop = asyncio.new_event_loop() 

373 asyncio.set_event_loop(loop) 

374 

375 prompt_version = loop.run_until_complete( 

376 self.version_manager.get_version(name, "system", version) 

377 ) 

378 

379 if not prompt_version: 

380 # Fall back to base library if available 

381 if self.base_library: 

382 return self.base_library.get_system_prompt(name, **kwargs) 

383 return None 

384 

385 return self._version_to_template(prompt_version) 

386 

387 def get_user_prompt( 

388 self, 

389 name: str, 

390 version: str = "latest", 

391 **kwargs: Any 

392 ) -> PromptTemplateDict | None: 

393 """Get a user prompt template. 

394 

395 Args: 

396 name: Prompt name 

397 version: Version string or "latest" 

398 **kwargs: Additional parameters 

399 

400 Returns: 

401 PromptTemplateDict if found, None otherwise 

402 """ 

403 import asyncio 

404 

405 try: 

406 loop = asyncio.get_event_loop() 

407 except RuntimeError: 

408 loop = asyncio.new_event_loop() 

409 asyncio.set_event_loop(loop) 

410 

411 prompt_version = loop.run_until_complete( 

412 self.version_manager.get_version(name, "user", version) 

413 ) 

414 

415 if not prompt_version: 

416 if self.base_library: 

417 return self.base_library.get_user_prompt(name, **kwargs) 

418 return None 

419 

420 return self._version_to_template(prompt_version) 

421 

422 def list_system_prompts(self) -> List[str]: 

423 """List all system prompt names. 

424 

425 Returns: 

426 List of prompt names 

427 """ 

428 import asyncio 

429 

430 try: 

431 loop = asyncio.get_event_loop() 

432 except RuntimeError: 

433 loop = asyncio.new_event_loop() 

434 asyncio.set_event_loop(loop) 

435 

436 # Get unique prompt names from version index 

437 names = set() 

438 for key in self.version_manager._version_index.keys(): 

439 name, ptype = key.split(":", 1) 

440 if ptype == "system": 

441 names.add(name) 

442 

443 # Add from base library if available 

444 if self.base_library: 

445 names.update(self.base_library.list_system_prompts()) 

446 

447 return sorted(names) 

448 

449 def list_user_prompts(self) -> List[str]: 

450 """List all user prompt names. 

451 

452 Returns: 

453 List of prompt names 

454 """ 

455 names = set() 

456 for key in self.version_manager._version_index.keys(): 

457 name, ptype = key.split(":", 1) 

458 if ptype == "user": 

459 names.add(name) 

460 

461 if self.base_library: 

462 names.update(self.base_library.list_user_prompts()) 

463 

464 return sorted(names) 

465 

466 def get_message_index( 

467 self, 

468 name: str, 

469 **kwargs: Any 

470 ) -> MessageIndex | None: 

471 """Get a message index. 

472 

473 Note: Message indexes are not versioned in this implementation. 

474 Falls back to base library if available. 

475 

476 Args: 

477 name: Message index name 

478 **kwargs: Additional parameters 

479 

480 Returns: 

481 MessageIndex if found, None otherwise 

482 """ 

483 if self.base_library: 

484 return self.base_library.get_message_index(name, **kwargs) 

485 return None 

486 

487 def list_message_indexes(self) -> List[str]: 

488 """List all message index names. 

489 

490 Returns: 

491 List of message index names 

492 """ 

493 if self.base_library: 

494 return self.base_library.list_message_indexes() 

495 return [] 

496 

497 def get_rag_config( 

498 self, 

499 name: str, 

500 **kwargs: Any 

501 ) -> RAGConfig | None: 

502 """Get a RAG configuration. 

503 

504 Note: RAG configs are not versioned in this implementation. 

505 Falls back to base library if available. 

506 

507 Args: 

508 name: RAG config name 

509 **kwargs: Additional parameters 

510 

511 Returns: 

512 RAGConfig if found, None otherwise 

513 """ 

514 if self.base_library: 

515 return self.base_library.get_rag_config(name, **kwargs) 

516 return None 

517 

518 def get_prompt_rag_configs( 

519 self, 

520 prompt_name: str, 

521 prompt_type: str = "user", 

522 **kwargs: Any 

523 ) -> List[RAGConfig]: 

524 """Get RAG configurations for a prompt. 

525 

526 Args: 

527 prompt_name: Prompt name 

528 prompt_type: Prompt type 

529 **kwargs: Additional parameters 

530 

531 Returns: 

532 List of RAG configurations 

533 """ 

534 if self.base_library: 

535 return self.base_library.get_prompt_rag_configs(prompt_name, prompt_type, **kwargs) 

536 return [] 

537 

538 def get_metadata(self) -> Dict[str, Any]: 

539 """Get library metadata. 

540 

541 Returns: 

542 Metadata dictionary 

543 """ 

544 return { 

545 "type": "VersionedPromptLibrary", 

546 "storage": str(type(self.storage).__name__) if self.storage else "in-memory", 

547 "has_base_library": self.base_library is not None, 

548 "version_count": len(self.version_manager._versions), 

549 "experiment_count": len(self.ab_test_manager._experiments), 

550 } 

551 

552 # ===== Helper Methods ===== 

553 

554 def _version_to_template(self, version: PromptVersion) -> PromptTemplateDict: 

555 """Convert PromptVersion to PromptTemplateDict for compatibility.""" 

556 # Check cache 

557 cache_key = version.version_id 

558 if cache_key in self._template_cache: 

559 return self._template_cache[cache_key] 

560 

561 template: PromptTemplateDict = { 

562 "template": version.template, 

563 "defaults": version.defaults, 

564 "metadata": { 

565 **version.metadata, 

566 "version_id": version.version_id, 

567 "version": version.version, 

568 "created_at": version.created_at.isoformat(), 

569 "tags": version.tags, 

570 "status": version.status.value, 

571 } 

572 } 

573 

574 if version.validation: 

575 template["validation"] = version.validation # type: ignore[typeddict-item] 

576 

577 # Cache it 

578 self._template_cache[cache_key] = template 

579 

580 return template