Coverage for src/dataknobs_llm/prompts/implementations/versioned_library.py: 24%
115 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:04 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:04 -0600
1"""Versioned prompt library implementation.
3This module provides a prompt library with full versioning support,
4combining version management, A/B testing, and metrics tracking.
5"""
7from typing import Any, Dict, List, Optional
8from pathlib import Path
10from ..base import AbstractPromptLibrary, PromptTemplate, MessageIndex, RAGConfig
11from ..versioning import (
12 VersionManager,
13 ABTestManager,
14 MetricsCollector,
15 PromptVersion,
16 PromptExperiment,
17 PromptVariant,
18 PromptMetrics,
19 VersionStatus,
20 VersioningError,
21)
24class VersionedPromptLibrary(AbstractPromptLibrary):
25 """Prompt library with versioning, A/B testing, and metrics tracking.
27 This library extends the base prompt library interface with:
28 - Version management with semantic versioning
29 - A/B testing experiments with traffic splitting
30 - Performance metrics tracking
31 - Rollback capabilities
33 Example:
34 ```python
35 from dataknobs_llm.prompts import VersionedPromptLibrary
37 # Create library (with optional backend storage)
38 library = VersionedPromptLibrary(storage=backend)
40 # Create a version
41 v1 = await library.create_version(
42 name="greeting",
43 prompt_type="system",
44 template="Hello {{name}}!",
45 version="1.0.0"
46 )
48 # Get latest version (returns PromptTemplate for compatibility)
49 template = library.get_system_prompt("greeting")
51 # Create A/B test
52 experiment = await library.create_experiment(
53 name="greeting",
54 prompt_type="system",
55 variants=[
56 PromptVariant("1.0.0", 0.5, "Original"),
57 PromptVariant("1.1.0", 0.5, "Improved")
58 ]
59 )
61 # Track metrics
62 await library.record_usage(
63 version_id=v1.version_id,
64 success=True,
65 response_time=0.5,
66 tokens=100
67 )
68 ```
69 """
71 def __init__(
72 self,
73 storage: Optional[Any] = None,
74 base_library: Optional[AbstractPromptLibrary] = None,
75 ):
76 """Initialize versioned prompt library.
78 Args:
79 storage: Backend storage for persistence (None for in-memory)
80 base_library: Optional base library to wrap (for migration)
81 """
82 self.storage = storage
83 self.base_library = base_library
85 # Initialize managers
86 self.version_manager = VersionManager(storage)
87 self.ab_test_manager = ABTestManager(storage)
88 self.metrics_collector = MetricsCollector(storage)
90 # Cache for converting versions to templates
91 self._template_cache: Dict[str, PromptTemplate] = {}
93 # ===== Version Management API =====
95 async def create_version(
96 self,
97 name: str,
98 prompt_type: str,
99 template: str,
100 version: Optional[str] = None,
101 defaults: Optional[Dict[str, Any]] = None,
102 validation: Optional[Dict[str, Any]] = None,
103 metadata: Optional[Dict[str, Any]] = None,
104 created_by: Optional[str] = None,
105 tags: Optional[List[str]] = None,
106 status: VersionStatus = VersionStatus.ACTIVE,
107 ) -> PromptVersion:
108 """Create a new prompt version.
110 Args:
111 name: Prompt name
112 prompt_type: Prompt type ("system", "user", "message")
113 template: Template content
114 version: Semantic version (auto-increments if None)
115 defaults: Default parameter values
116 validation: Validation configuration
117 metadata: Additional metadata
118 created_by: Creator username/ID
119 tags: List of tags
120 status: Initial status
122 Returns:
123 Created PromptVersion
124 """
125 # Find parent version (latest version)
126 latest = await self.version_manager.get_version(name, prompt_type)
127 parent_version = latest.version_id if latest else None
129 return await self.version_manager.create_version(
130 name=name,
131 prompt_type=prompt_type,
132 template=template,
133 version=version,
134 defaults=defaults,
135 validation=validation,
136 metadata=metadata,
137 created_by=created_by,
138 parent_version=parent_version,
139 tags=tags,
140 status=status,
141 )
143 async def get_version(
144 self,
145 name: str,
146 prompt_type: str,
147 version: str = "latest",
148 ) -> Optional[PromptVersion]:
149 """Get a specific prompt version.
151 Args:
152 name: Prompt name
153 prompt_type: Prompt type
154 version: Version string or "latest"
156 Returns:
157 PromptVersion if found, None otherwise
158 """
159 return await self.version_manager.get_version(name, prompt_type, version)
161 async def list_versions(
162 self,
163 name: str,
164 prompt_type: str,
165 tags: Optional[List[str]] = None,
166 status: Optional[VersionStatus] = None,
167 ) -> List[PromptVersion]:
168 """List all versions of a prompt.
170 Args:
171 name: Prompt name
172 prompt_type: Prompt type
173 tags: Filter by tags
174 status: Filter by status
176 Returns:
177 List of PromptVersion objects
178 """
179 return await self.version_manager.list_versions(name, prompt_type, tags, status)
181 async def tag_version(
182 self,
183 version_id: str,
184 tag: str,
185 ) -> PromptVersion:
186 """Add a tag to a version.
188 Args:
189 version_id: Version ID
190 tag: Tag to add
192 Returns:
193 Updated PromptVersion
194 """
195 return await self.version_manager.tag_version(version_id, tag)
197 # ===== A/B Testing API =====
199 async def create_experiment(
200 self,
201 name: str,
202 prompt_type: str,
203 variants: List[PromptVariant],
204 traffic_split: Optional[Dict[str, float]] = None,
205 metadata: Optional[Dict[str, Any]] = None,
206 ) -> PromptExperiment:
207 """Create an A/B test experiment.
209 Args:
210 name: Prompt name
211 prompt_type: Prompt type
212 variants: List of variants to test
213 traffic_split: Optional custom traffic split
214 metadata: Additional metadata
216 Returns:
217 Created PromptExperiment
218 """
219 return await self.ab_test_manager.create_experiment(
220 name=name,
221 prompt_type=prompt_type,
222 variants=variants,
223 traffic_split=traffic_split,
224 metadata=metadata,
225 )
227 async def get_variant_for_user(
228 self,
229 experiment_id: str,
230 user_id: str,
231 ) -> str:
232 """Get variant for a user (sticky assignment).
234 Args:
235 experiment_id: Experiment ID
236 user_id: User identifier
238 Returns:
239 Version string of assigned variant
240 """
241 return await self.ab_test_manager.get_variant_for_user(experiment_id, user_id)
243 async def get_random_variant(
244 self,
245 experiment_id: str,
246 ) -> str:
247 """Get a random variant.
249 Args:
250 experiment_id: Experiment ID
252 Returns:
253 Version string of selected variant
254 """
255 return await self.ab_test_manager.get_random_variant(experiment_id)
257 async def get_experiment(
258 self,
259 experiment_id: str,
260 ) -> Optional[PromptExperiment]:
261 """Get an experiment by ID.
263 Args:
264 experiment_id: Experiment ID
266 Returns:
267 PromptExperiment if found, None otherwise
268 """
269 return await self.ab_test_manager.get_experiment(experiment_id)
271 async def list_experiments(
272 self,
273 name: Optional[str] = None,
274 prompt_type: Optional[str] = None,
275 status: Optional[str] = None,
276 ) -> List[PromptExperiment]:
277 """List experiments.
279 Args:
280 name: Filter by prompt name
281 prompt_type: Filter by prompt type
282 status: Filter by status
284 Returns:
285 List of experiments
286 """
287 return await self.ab_test_manager.list_experiments(name, prompt_type, status)
289 # ===== Metrics API =====
291 async def record_usage(
292 self,
293 version_id: str,
294 success: bool = True,
295 response_time: Optional[float] = None,
296 tokens: Optional[int] = None,
297 user_rating: Optional[float] = None,
298 metadata: Optional[Dict[str, Any]] = None,
299 ):
300 """Record a usage event for metrics tracking.
302 Args:
303 version_id: Version ID
304 success: Whether the use was successful
305 response_time: Response time in seconds
306 tokens: Number of tokens used
307 user_rating: User rating 1-5
308 metadata: Additional event metadata
309 """
310 await self.metrics_collector.record_event(
311 version_id=version_id,
312 success=success,
313 response_time=response_time,
314 tokens=tokens,
315 user_rating=user_rating,
316 metadata=metadata,
317 )
319 async def get_metrics(
320 self,
321 version_id: str,
322 ) -> PromptMetrics:
323 """Get metrics for a version.
325 Args:
326 version_id: Version ID
328 Returns:
329 PromptMetrics with aggregated statistics
330 """
331 return await self.metrics_collector.get_metrics(version_id)
333 async def compare_variants(
334 self,
335 version_ids: List[str],
336 ) -> Dict[str, PromptMetrics]:
337 """Compare metrics across versions.
339 Args:
340 version_ids: List of version IDs
342 Returns:
343 Dictionary mapping version_id to PromptMetrics
344 """
345 return await self.metrics_collector.compare_variants(version_ids)
347 # ===== AbstractPromptLibrary Implementation =====
349 def get_system_prompt(
350 self,
351 name: str,
352 version: str = "latest",
353 **kwargs: Any
354 ) -> Optional[PromptTemplate]:
355 """Get a system prompt template.
357 This method is synchronous for compatibility with AbstractPromptLibrary.
358 For async version access, use get_version() directly.
360 Args:
361 name: Prompt name
362 version: Version string or "latest"
363 **kwargs: Additional parameters
365 Returns:
366 PromptTemplate if found, None otherwise
367 """
368 import asyncio
370 # Run async version retrieval
371 try:
372 loop = asyncio.get_event_loop()
373 except RuntimeError:
374 loop = asyncio.new_event_loop()
375 asyncio.set_event_loop(loop)
377 prompt_version = loop.run_until_complete(
378 self.version_manager.get_version(name, "system", version)
379 )
381 if not prompt_version:
382 # Fall back to base library if available
383 if self.base_library:
384 return self.base_library.get_system_prompt(name, **kwargs)
385 return None
387 return self._version_to_template(prompt_version)
389 def get_user_prompt(
390 self,
391 name: str,
392 version: str = "latest",
393 **kwargs: Any
394 ) -> Optional[PromptTemplate]:
395 """Get a user prompt template.
397 Args:
398 name: Prompt name
399 version: Version string or "latest"
400 **kwargs: Additional parameters
402 Returns:
403 PromptTemplate if found, None otherwise
404 """
405 import asyncio
407 try:
408 loop = asyncio.get_event_loop()
409 except RuntimeError:
410 loop = asyncio.new_event_loop()
411 asyncio.set_event_loop(loop)
413 prompt_version = loop.run_until_complete(
414 self.version_manager.get_version(name, "user", version)
415 )
417 if not prompt_version:
418 if self.base_library:
419 return self.base_library.get_user_prompt(name, **kwargs)
420 return None
422 return self._version_to_template(prompt_version)
424 def list_system_prompts(self) -> List[str]:
425 """List all system prompt names.
427 Returns:
428 List of prompt names
429 """
430 import asyncio
432 try:
433 loop = asyncio.get_event_loop()
434 except RuntimeError:
435 loop = asyncio.new_event_loop()
436 asyncio.set_event_loop(loop)
438 # Get unique prompt names from version index
439 names = set()
440 for key in self.version_manager._version_index.keys():
441 name, ptype = key.split(":", 1)
442 if ptype == "system":
443 names.add(name)
445 # Add from base library if available
446 if self.base_library:
447 names.update(self.base_library.list_system_prompts())
449 return sorted(names)
451 def list_user_prompts(self) -> List[str]:
452 """List all user prompt names.
454 Returns:
455 List of prompt names
456 """
457 names = set()
458 for key in self.version_manager._version_index.keys():
459 name, ptype = key.split(":", 1)
460 if ptype == "user":
461 names.add(name)
463 if self.base_library:
464 names.update(self.base_library.list_user_prompts())
466 return sorted(names)
468 def get_message_index(
469 self,
470 name: str,
471 **kwargs: Any
472 ) -> Optional[MessageIndex]:
473 """Get a message index.
475 Note: Message indexes are not versioned in this implementation.
476 Falls back to base library if available.
478 Args:
479 name: Message index name
480 **kwargs: Additional parameters
482 Returns:
483 MessageIndex if found, None otherwise
484 """
485 if self.base_library:
486 return self.base_library.get_message_index(name, **kwargs)
487 return None
489 def list_message_indexes(self) -> List[str]:
490 """List all message index names.
492 Returns:
493 List of message index names
494 """
495 if self.base_library:
496 return self.base_library.list_message_indexes()
497 return []
499 def get_rag_config(
500 self,
501 name: str,
502 **kwargs: Any
503 ) -> Optional[RAGConfig]:
504 """Get a RAG configuration.
506 Note: RAG configs are not versioned in this implementation.
507 Falls back to base library if available.
509 Args:
510 name: RAG config name
511 **kwargs: Additional parameters
513 Returns:
514 RAGConfig if found, None otherwise
515 """
516 if self.base_library:
517 return self.base_library.get_rag_config(name, **kwargs)
518 return None
520 def get_prompt_rag_configs(
521 self,
522 prompt_name: str,
523 prompt_type: str = "user",
524 **kwargs: Any
525 ) -> List[RAGConfig]:
526 """Get RAG configurations for a prompt.
528 Args:
529 prompt_name: Prompt name
530 prompt_type: Prompt type
531 **kwargs: Additional parameters
533 Returns:
534 List of RAG configurations
535 """
536 if self.base_library:
537 return self.base_library.get_prompt_rag_configs(prompt_name, prompt_type, **kwargs)
538 return []
540 def get_metadata(self) -> Dict[str, Any]:
541 """Get library metadata.
543 Returns:
544 Metadata dictionary
545 """
546 return {
547 "type": "VersionedPromptLibrary",
548 "storage": str(type(self.storage).__name__) if self.storage else "in-memory",
549 "has_base_library": self.base_library is not None,
550 "version_count": len(self.version_manager._versions),
551 "experiment_count": len(self.ab_test_manager._experiments),
552 }
554 # ===== Helper Methods =====
556 def _version_to_template(self, version: PromptVersion) -> PromptTemplate:
557 """Convert PromptVersion to PromptTemplate for compatibility."""
558 # Check cache
559 cache_key = version.version_id
560 if cache_key in self._template_cache:
561 return self._template_cache[cache_key]
563 template: PromptTemplate = {
564 "template": version.template,
565 "defaults": version.defaults,
566 "metadata": {
567 **version.metadata,
568 "version_id": version.version_id,
569 "version": version.version,
570 "created_at": version.created_at.isoformat(),
571 "tags": version.tags,
572 "status": version.status.value,
573 }
574 }
576 if version.validation:
577 template["validation"] = version.validation
579 # Cache it
580 self._template_cache[cache_key] = template
582 return template