Coverage for src/dataknobs_llm/prompts/implementations/versioned_library.py: 24%

115 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-31 16:04 -0600

1"""Versioned prompt library implementation. 

2 

3This module provides a prompt library with full versioning support, 

4combining version management, A/B testing, and metrics tracking. 

5""" 

6 

7from typing import Any, Dict, List, Optional 

8from pathlib import Path 

9 

10from ..base import AbstractPromptLibrary, PromptTemplate, MessageIndex, RAGConfig 

11from ..versioning import ( 

12 VersionManager, 

13 ABTestManager, 

14 MetricsCollector, 

15 PromptVersion, 

16 PromptExperiment, 

17 PromptVariant, 

18 PromptMetrics, 

19 VersionStatus, 

20 VersioningError, 

21) 

22 

23 

24class VersionedPromptLibrary(AbstractPromptLibrary): 

25 """Prompt library with versioning, A/B testing, and metrics tracking. 

26 

27 This library extends the base prompt library interface with: 

28 - Version management with semantic versioning 

29 - A/B testing experiments with traffic splitting 

30 - Performance metrics tracking 

31 - Rollback capabilities 

32 

33 Example: 

34 ```python 

35 from dataknobs_llm.prompts import VersionedPromptLibrary 

36 

37 # Create library (with optional backend storage) 

38 library = VersionedPromptLibrary(storage=backend) 

39 

40 # Create a version 

41 v1 = await library.create_version( 

42 name="greeting", 

43 prompt_type="system", 

44 template="Hello {{name}}!", 

45 version="1.0.0" 

46 ) 

47 

48 # Get latest version (returns PromptTemplate for compatibility) 

49 template = library.get_system_prompt("greeting") 

50 

51 # Create A/B test 

52 experiment = await library.create_experiment( 

53 name="greeting", 

54 prompt_type="system", 

55 variants=[ 

56 PromptVariant("1.0.0", 0.5, "Original"), 

57 PromptVariant("1.1.0", 0.5, "Improved") 

58 ] 

59 ) 

60 

61 # Track metrics 

62 await library.record_usage( 

63 version_id=v1.version_id, 

64 success=True, 

65 response_time=0.5, 

66 tokens=100 

67 ) 

68 ``` 

69 """ 

70 

71 def __init__( 

72 self, 

73 storage: Optional[Any] = None, 

74 base_library: Optional[AbstractPromptLibrary] = None, 

75 ): 

76 """Initialize versioned prompt library. 

77 

78 Args: 

79 storage: Backend storage for persistence (None for in-memory) 

80 base_library: Optional base library to wrap (for migration) 

81 """ 

82 self.storage = storage 

83 self.base_library = base_library 

84 

85 # Initialize managers 

86 self.version_manager = VersionManager(storage) 

87 self.ab_test_manager = ABTestManager(storage) 

88 self.metrics_collector = MetricsCollector(storage) 

89 

90 # Cache for converting versions to templates 

91 self._template_cache: Dict[str, PromptTemplate] = {} 

92 

93 # ===== Version Management API ===== 

94 

95 async def create_version( 

96 self, 

97 name: str, 

98 prompt_type: str, 

99 template: str, 

100 version: Optional[str] = None, 

101 defaults: Optional[Dict[str, Any]] = None, 

102 validation: Optional[Dict[str, Any]] = None, 

103 metadata: Optional[Dict[str, Any]] = None, 

104 created_by: Optional[str] = None, 

105 tags: Optional[List[str]] = None, 

106 status: VersionStatus = VersionStatus.ACTIVE, 

107 ) -> PromptVersion: 

108 """Create a new prompt version. 

109 

110 Args: 

111 name: Prompt name 

112 prompt_type: Prompt type ("system", "user", "message") 

113 template: Template content 

114 version: Semantic version (auto-increments if None) 

115 defaults: Default parameter values 

116 validation: Validation configuration 

117 metadata: Additional metadata 

118 created_by: Creator username/ID 

119 tags: List of tags 

120 status: Initial status 

121 

122 Returns: 

123 Created PromptVersion 

124 """ 

125 # Find parent version (latest version) 

126 latest = await self.version_manager.get_version(name, prompt_type) 

127 parent_version = latest.version_id if latest else None 

128 

129 return await self.version_manager.create_version( 

130 name=name, 

131 prompt_type=prompt_type, 

132 template=template, 

133 version=version, 

134 defaults=defaults, 

135 validation=validation, 

136 metadata=metadata, 

137 created_by=created_by, 

138 parent_version=parent_version, 

139 tags=tags, 

140 status=status, 

141 ) 

142 

143 async def get_version( 

144 self, 

145 name: str, 

146 prompt_type: str, 

147 version: str = "latest", 

148 ) -> Optional[PromptVersion]: 

149 """Get a specific prompt version. 

150 

151 Args: 

152 name: Prompt name 

153 prompt_type: Prompt type 

154 version: Version string or "latest" 

155 

156 Returns: 

157 PromptVersion if found, None otherwise 

158 """ 

159 return await self.version_manager.get_version(name, prompt_type, version) 

160 

161 async def list_versions( 

162 self, 

163 name: str, 

164 prompt_type: str, 

165 tags: Optional[List[str]] = None, 

166 status: Optional[VersionStatus] = None, 

167 ) -> List[PromptVersion]: 

168 """List all versions of a prompt. 

169 

170 Args: 

171 name: Prompt name 

172 prompt_type: Prompt type 

173 tags: Filter by tags 

174 status: Filter by status 

175 

176 Returns: 

177 List of PromptVersion objects 

178 """ 

179 return await self.version_manager.list_versions(name, prompt_type, tags, status) 

180 

181 async def tag_version( 

182 self, 

183 version_id: str, 

184 tag: str, 

185 ) -> PromptVersion: 

186 """Add a tag to a version. 

187 

188 Args: 

189 version_id: Version ID 

190 tag: Tag to add 

191 

192 Returns: 

193 Updated PromptVersion 

194 """ 

195 return await self.version_manager.tag_version(version_id, tag) 

196 

197 # ===== A/B Testing API ===== 

198 

199 async def create_experiment( 

200 self, 

201 name: str, 

202 prompt_type: str, 

203 variants: List[PromptVariant], 

204 traffic_split: Optional[Dict[str, float]] = None, 

205 metadata: Optional[Dict[str, Any]] = None, 

206 ) -> PromptExperiment: 

207 """Create an A/B test experiment. 

208 

209 Args: 

210 name: Prompt name 

211 prompt_type: Prompt type 

212 variants: List of variants to test 

213 traffic_split: Optional custom traffic split 

214 metadata: Additional metadata 

215 

216 Returns: 

217 Created PromptExperiment 

218 """ 

219 return await self.ab_test_manager.create_experiment( 

220 name=name, 

221 prompt_type=prompt_type, 

222 variants=variants, 

223 traffic_split=traffic_split, 

224 metadata=metadata, 

225 ) 

226 

227 async def get_variant_for_user( 

228 self, 

229 experiment_id: str, 

230 user_id: str, 

231 ) -> str: 

232 """Get variant for a user (sticky assignment). 

233 

234 Args: 

235 experiment_id: Experiment ID 

236 user_id: User identifier 

237 

238 Returns: 

239 Version string of assigned variant 

240 """ 

241 return await self.ab_test_manager.get_variant_for_user(experiment_id, user_id) 

242 

243 async def get_random_variant( 

244 self, 

245 experiment_id: str, 

246 ) -> str: 

247 """Get a random variant. 

248 

249 Args: 

250 experiment_id: Experiment ID 

251 

252 Returns: 

253 Version string of selected variant 

254 """ 

255 return await self.ab_test_manager.get_random_variant(experiment_id) 

256 

257 async def get_experiment( 

258 self, 

259 experiment_id: str, 

260 ) -> Optional[PromptExperiment]: 

261 """Get an experiment by ID. 

262 

263 Args: 

264 experiment_id: Experiment ID 

265 

266 Returns: 

267 PromptExperiment if found, None otherwise 

268 """ 

269 return await self.ab_test_manager.get_experiment(experiment_id) 

270 

271 async def list_experiments( 

272 self, 

273 name: Optional[str] = None, 

274 prompt_type: Optional[str] = None, 

275 status: Optional[str] = None, 

276 ) -> List[PromptExperiment]: 

277 """List experiments. 

278 

279 Args: 

280 name: Filter by prompt name 

281 prompt_type: Filter by prompt type 

282 status: Filter by status 

283 

284 Returns: 

285 List of experiments 

286 """ 

287 return await self.ab_test_manager.list_experiments(name, prompt_type, status) 

288 

289 # ===== Metrics API ===== 

290 

291 async def record_usage( 

292 self, 

293 version_id: str, 

294 success: bool = True, 

295 response_time: Optional[float] = None, 

296 tokens: Optional[int] = None, 

297 user_rating: Optional[float] = None, 

298 metadata: Optional[Dict[str, Any]] = None, 

299 ): 

300 """Record a usage event for metrics tracking. 

301 

302 Args: 

303 version_id: Version ID 

304 success: Whether the use was successful 

305 response_time: Response time in seconds 

306 tokens: Number of tokens used 

307 user_rating: User rating 1-5 

308 metadata: Additional event metadata 

309 """ 

310 await self.metrics_collector.record_event( 

311 version_id=version_id, 

312 success=success, 

313 response_time=response_time, 

314 tokens=tokens, 

315 user_rating=user_rating, 

316 metadata=metadata, 

317 ) 

318 

319 async def get_metrics( 

320 self, 

321 version_id: str, 

322 ) -> PromptMetrics: 

323 """Get metrics for a version. 

324 

325 Args: 

326 version_id: Version ID 

327 

328 Returns: 

329 PromptMetrics with aggregated statistics 

330 """ 

331 return await self.metrics_collector.get_metrics(version_id) 

332 

333 async def compare_variants( 

334 self, 

335 version_ids: List[str], 

336 ) -> Dict[str, PromptMetrics]: 

337 """Compare metrics across versions. 

338 

339 Args: 

340 version_ids: List of version IDs 

341 

342 Returns: 

343 Dictionary mapping version_id to PromptMetrics 

344 """ 

345 return await self.metrics_collector.compare_variants(version_ids) 

346 

347 # ===== AbstractPromptLibrary Implementation ===== 

348 

349 def get_system_prompt( 

350 self, 

351 name: str, 

352 version: str = "latest", 

353 **kwargs: Any 

354 ) -> Optional[PromptTemplate]: 

355 """Get a system prompt template. 

356 

357 This method is synchronous for compatibility with AbstractPromptLibrary. 

358 For async version access, use get_version() directly. 

359 

360 Args: 

361 name: Prompt name 

362 version: Version string or "latest" 

363 **kwargs: Additional parameters 

364 

365 Returns: 

366 PromptTemplate if found, None otherwise 

367 """ 

368 import asyncio 

369 

370 # Run async version retrieval 

371 try: 

372 loop = asyncio.get_event_loop() 

373 except RuntimeError: 

374 loop = asyncio.new_event_loop() 

375 asyncio.set_event_loop(loop) 

376 

377 prompt_version = loop.run_until_complete( 

378 self.version_manager.get_version(name, "system", version) 

379 ) 

380 

381 if not prompt_version: 

382 # Fall back to base library if available 

383 if self.base_library: 

384 return self.base_library.get_system_prompt(name, **kwargs) 

385 return None 

386 

387 return self._version_to_template(prompt_version) 

388 

389 def get_user_prompt( 

390 self, 

391 name: str, 

392 version: str = "latest", 

393 **kwargs: Any 

394 ) -> Optional[PromptTemplate]: 

395 """Get a user prompt template. 

396 

397 Args: 

398 name: Prompt name 

399 version: Version string or "latest" 

400 **kwargs: Additional parameters 

401 

402 Returns: 

403 PromptTemplate if found, None otherwise 

404 """ 

405 import asyncio 

406 

407 try: 

408 loop = asyncio.get_event_loop() 

409 except RuntimeError: 

410 loop = asyncio.new_event_loop() 

411 asyncio.set_event_loop(loop) 

412 

413 prompt_version = loop.run_until_complete( 

414 self.version_manager.get_version(name, "user", version) 

415 ) 

416 

417 if not prompt_version: 

418 if self.base_library: 

419 return self.base_library.get_user_prompt(name, **kwargs) 

420 return None 

421 

422 return self._version_to_template(prompt_version) 

423 

424 def list_system_prompts(self) -> List[str]: 

425 """List all system prompt names. 

426 

427 Returns: 

428 List of prompt names 

429 """ 

430 import asyncio 

431 

432 try: 

433 loop = asyncio.get_event_loop() 

434 except RuntimeError: 

435 loop = asyncio.new_event_loop() 

436 asyncio.set_event_loop(loop) 

437 

438 # Get unique prompt names from version index 

439 names = set() 

440 for key in self.version_manager._version_index.keys(): 

441 name, ptype = key.split(":", 1) 

442 if ptype == "system": 

443 names.add(name) 

444 

445 # Add from base library if available 

446 if self.base_library: 

447 names.update(self.base_library.list_system_prompts()) 

448 

449 return sorted(names) 

450 

451 def list_user_prompts(self) -> List[str]: 

452 """List all user prompt names. 

453 

454 Returns: 

455 List of prompt names 

456 """ 

457 names = set() 

458 for key in self.version_manager._version_index.keys(): 

459 name, ptype = key.split(":", 1) 

460 if ptype == "user": 

461 names.add(name) 

462 

463 if self.base_library: 

464 names.update(self.base_library.list_user_prompts()) 

465 

466 return sorted(names) 

467 

468 def get_message_index( 

469 self, 

470 name: str, 

471 **kwargs: Any 

472 ) -> Optional[MessageIndex]: 

473 """Get a message index. 

474 

475 Note: Message indexes are not versioned in this implementation. 

476 Falls back to base library if available. 

477 

478 Args: 

479 name: Message index name 

480 **kwargs: Additional parameters 

481 

482 Returns: 

483 MessageIndex if found, None otherwise 

484 """ 

485 if self.base_library: 

486 return self.base_library.get_message_index(name, **kwargs) 

487 return None 

488 

489 def list_message_indexes(self) -> List[str]: 

490 """List all message index names. 

491 

492 Returns: 

493 List of message index names 

494 """ 

495 if self.base_library: 

496 return self.base_library.list_message_indexes() 

497 return [] 

498 

499 def get_rag_config( 

500 self, 

501 name: str, 

502 **kwargs: Any 

503 ) -> Optional[RAGConfig]: 

504 """Get a RAG configuration. 

505 

506 Note: RAG configs are not versioned in this implementation. 

507 Falls back to base library if available. 

508 

509 Args: 

510 name: RAG config name 

511 **kwargs: Additional parameters 

512 

513 Returns: 

514 RAGConfig if found, None otherwise 

515 """ 

516 if self.base_library: 

517 return self.base_library.get_rag_config(name, **kwargs) 

518 return None 

519 

520 def get_prompt_rag_configs( 

521 self, 

522 prompt_name: str, 

523 prompt_type: str = "user", 

524 **kwargs: Any 

525 ) -> List[RAGConfig]: 

526 """Get RAG configurations for a prompt. 

527 

528 Args: 

529 prompt_name: Prompt name 

530 prompt_type: Prompt type 

531 **kwargs: Additional parameters 

532 

533 Returns: 

534 List of RAG configurations 

535 """ 

536 if self.base_library: 

537 return self.base_library.get_prompt_rag_configs(prompt_name, prompt_type, **kwargs) 

538 return [] 

539 

540 def get_metadata(self) -> Dict[str, Any]: 

541 """Get library metadata. 

542 

543 Returns: 

544 Metadata dictionary 

545 """ 

546 return { 

547 "type": "VersionedPromptLibrary", 

548 "storage": str(type(self.storage).__name__) if self.storage else "in-memory", 

549 "has_base_library": self.base_library is not None, 

550 "version_count": len(self.version_manager._versions), 

551 "experiment_count": len(self.ab_test_manager._experiments), 

552 } 

553 

554 # ===== Helper Methods ===== 

555 

556 def _version_to_template(self, version: PromptVersion) -> PromptTemplate: 

557 """Convert PromptVersion to PromptTemplate for compatibility.""" 

558 # Check cache 

559 cache_key = version.version_id 

560 if cache_key in self._template_cache: 

561 return self._template_cache[cache_key] 

562 

563 template: PromptTemplate = { 

564 "template": version.template, 

565 "defaults": version.defaults, 

566 "metadata": { 

567 **version.metadata, 

568 "version_id": version.version_id, 

569 "version": version.version, 

570 "created_at": version.created_at.isoformat(), 

571 "tags": version.tags, 

572 "status": version.status.value, 

573 } 

574 } 

575 

576 if version.validation: 

577 template["validation"] = version.validation 

578 

579 # Cache it 

580 self._template_cache[cache_key] = template 

581 

582 return template