Coverage for src/dataknobs_llm/prompts/implementations/versioned_library.py: 24%
114 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:48 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:48 -0700
1"""Versioned prompt library implementation.
3This module provides a prompt library with full versioning support,
4combining version management, A/B testing, and metrics tracking.
5"""
7from typing import Any, Dict, List
9from ..base import AbstractPromptLibrary, PromptTemplateDict, MessageIndex, RAGConfig
10from ..versioning import (
11 VersionManager,
12 ABTestManager,
13 MetricsCollector,
14 PromptVersion,
15 PromptExperiment,
16 PromptVariant,
17 PromptMetrics,
18 VersionStatus,
19)
22class VersionedPromptLibrary(AbstractPromptLibrary):
23 """Prompt library with versioning, A/B testing, and metrics tracking.
25 This library extends the base prompt library interface with:
26 - Version management with semantic versioning
27 - A/B testing experiments with traffic splitting
28 - Performance metrics tracking
29 - Rollback capabilities
31 Example:
32 ```python
33 from dataknobs_llm.prompts import VersionedPromptLibrary
35 # Create library (with optional backend storage)
36 library = VersionedPromptLibrary(storage=backend)
38 # Create a version
39 v1 = await library.create_version(
40 name="greeting",
41 prompt_type="system",
42 template="Hello {{name}}!",
43 version="1.0.0"
44 )
46 # Get latest version (returns PromptTemplateDict for compatibility)
47 template = library.get_system_prompt("greeting")
49 # Create A/B test
50 experiment = await library.create_experiment(
51 name="greeting",
52 prompt_type="system",
53 variants=[
54 PromptVariant("1.0.0", 0.5, "Original"),
55 PromptVariant("1.1.0", 0.5, "Improved")
56 ]
57 )
59 # Track metrics
60 await library.record_usage(
61 version_id=v1.version_id,
62 success=True,
63 response_time=0.5,
64 tokens=100
65 )
66 ```
67 """
69 def __init__(
70 self,
71 storage: Any | None = None,
72 base_library: AbstractPromptLibrary | None = None,
73 ):
74 """Initialize versioned prompt library.
76 Args:
77 storage: Backend storage for persistence (None for in-memory)
78 base_library: Optional base library to wrap (for migration)
79 """
80 self.storage = storage
81 self.base_library = base_library
83 # Initialize managers
84 self.version_manager = VersionManager(storage)
85 self.ab_test_manager = ABTestManager(storage)
86 self.metrics_collector = MetricsCollector(storage)
88 # Cache for converting versions to templates
89 self._template_cache: Dict[str, PromptTemplateDict] = {}
91 # ===== Version Management API =====
93 async def create_version(
94 self,
95 name: str,
96 prompt_type: str,
97 template: str,
98 version: str | None = None,
99 defaults: Dict[str, Any] | None = None,
100 validation: Dict[str, Any] | None = None,
101 metadata: Dict[str, Any] | None = None,
102 created_by: str | None = None,
103 tags: List[str] | None = None,
104 status: VersionStatus = VersionStatus.ACTIVE,
105 ) -> PromptVersion:
106 """Create a new prompt version.
108 Args:
109 name: Prompt name
110 prompt_type: Prompt type ("system", "user", "message")
111 template: Template content
112 version: Semantic version (auto-increments if None)
113 defaults: Default parameter values
114 validation: Validation configuration
115 metadata: Additional metadata
116 created_by: Creator username/ID
117 tags: List of tags
118 status: Initial status
120 Returns:
121 Created PromptVersion
122 """
123 # Find parent version (latest version)
124 latest = await self.version_manager.get_version(name, prompt_type)
125 parent_version = latest.version_id if latest else None
127 return await self.version_manager.create_version(
128 name=name,
129 prompt_type=prompt_type,
130 template=template,
131 version=version,
132 defaults=defaults,
133 validation=validation,
134 metadata=metadata,
135 created_by=created_by,
136 parent_version=parent_version,
137 tags=tags,
138 status=status,
139 )
141 async def get_version(
142 self,
143 name: str,
144 prompt_type: str,
145 version: str = "latest",
146 ) -> PromptVersion | None:
147 """Get a specific prompt version.
149 Args:
150 name: Prompt name
151 prompt_type: Prompt type
152 version: Version string or "latest"
154 Returns:
155 PromptVersion if found, None otherwise
156 """
157 return await self.version_manager.get_version(name, prompt_type, version)
159 async def list_versions(
160 self,
161 name: str,
162 prompt_type: str,
163 tags: List[str] | None = None,
164 status: VersionStatus | None = None,
165 ) -> List[PromptVersion]:
166 """List all versions of a prompt.
168 Args:
169 name: Prompt name
170 prompt_type: Prompt type
171 tags: Filter by tags
172 status: Filter by status
174 Returns:
175 List of PromptVersion objects
176 """
177 return await self.version_manager.list_versions(name, prompt_type, tags, status)
179 async def tag_version(
180 self,
181 version_id: str,
182 tag: str,
183 ) -> PromptVersion:
184 """Add a tag to a version.
186 Args:
187 version_id: Version ID
188 tag: Tag to add
190 Returns:
191 Updated PromptVersion
192 """
193 return await self.version_manager.tag_version(version_id, tag)
195 # ===== A/B Testing API =====
197 async def create_experiment(
198 self,
199 name: str,
200 prompt_type: str,
201 variants: List[PromptVariant],
202 traffic_split: Dict[str, float] | None = None,
203 metadata: Dict[str, Any] | None = None,
204 ) -> PromptExperiment:
205 """Create an A/B test experiment.
207 Args:
208 name: Prompt name
209 prompt_type: Prompt type
210 variants: List of variants to test
211 traffic_split: Optional custom traffic split
212 metadata: Additional metadata
214 Returns:
215 Created PromptExperiment
216 """
217 return await self.ab_test_manager.create_experiment(
218 name=name,
219 prompt_type=prompt_type,
220 variants=variants,
221 traffic_split=traffic_split,
222 metadata=metadata,
223 )
225 async def get_variant_for_user(
226 self,
227 experiment_id: str,
228 user_id: str,
229 ) -> str:
230 """Get variant for a user (sticky assignment).
232 Args:
233 experiment_id: Experiment ID
234 user_id: User identifier
236 Returns:
237 Version string of assigned variant
238 """
239 return await self.ab_test_manager.get_variant_for_user(experiment_id, user_id)
241 async def get_random_variant(
242 self,
243 experiment_id: str,
244 ) -> str:
245 """Get a random variant.
247 Args:
248 experiment_id: Experiment ID
250 Returns:
251 Version string of selected variant
252 """
253 return await self.ab_test_manager.get_random_variant(experiment_id)
255 async def get_experiment(
256 self,
257 experiment_id: str,
258 ) -> PromptExperiment | None:
259 """Get an experiment by ID.
261 Args:
262 experiment_id: Experiment ID
264 Returns:
265 PromptExperiment if found, None otherwise
266 """
267 return await self.ab_test_manager.get_experiment(experiment_id)
269 async def list_experiments(
270 self,
271 name: str | None = None,
272 prompt_type: str | None = None,
273 status: str | None = None,
274 ) -> List[PromptExperiment]:
275 """List experiments.
277 Args:
278 name: Filter by prompt name
279 prompt_type: Filter by prompt type
280 status: Filter by status
282 Returns:
283 List of experiments
284 """
285 return await self.ab_test_manager.list_experiments(name, prompt_type, status)
287 # ===== Metrics API =====
289 async def record_usage(
290 self,
291 version_id: str,
292 success: bool = True,
293 response_time: float | None = None,
294 tokens: int | None = None,
295 user_rating: float | None = None,
296 metadata: Dict[str, Any] | None = None,
297 ):
298 """Record a usage event for metrics tracking.
300 Args:
301 version_id: Version ID
302 success: Whether the use was successful
303 response_time: Response time in seconds
304 tokens: Number of tokens used
305 user_rating: User rating 1-5
306 metadata: Additional event metadata
307 """
308 await self.metrics_collector.record_event(
309 version_id=version_id,
310 success=success,
311 response_time=response_time,
312 tokens=tokens,
313 user_rating=user_rating,
314 metadata=metadata,
315 )
317 async def get_metrics(
318 self,
319 version_id: str,
320 ) -> PromptMetrics:
321 """Get metrics for a version.
323 Args:
324 version_id: Version ID
326 Returns:
327 PromptMetrics with aggregated statistics
328 """
329 return await self.metrics_collector.get_metrics(version_id)
331 async def compare_variants(
332 self,
333 version_ids: List[str],
334 ) -> Dict[str, PromptMetrics]:
335 """Compare metrics across versions.
337 Args:
338 version_ids: List of version IDs
340 Returns:
341 Dictionary mapping version_id to PromptMetrics
342 """
343 return await self.metrics_collector.compare_variants(version_ids)
345 # ===== AbstractPromptLibrary Implementation =====
347 def get_system_prompt(
348 self,
349 name: str,
350 version: str = "latest",
351 **kwargs: Any
352 ) -> PromptTemplateDict | None:
353 """Get a system prompt template.
355 This method is synchronous for compatibility with AbstractPromptLibrary.
356 For async version access, use get_version() directly.
358 Args:
359 name: Prompt name
360 version: Version string or "latest"
361 **kwargs: Additional parameters
363 Returns:
364 PromptTemplateDict if found, None otherwise
365 """
366 import asyncio
368 # Run async version retrieval
369 try:
370 loop = asyncio.get_event_loop()
371 except RuntimeError:
372 loop = asyncio.new_event_loop()
373 asyncio.set_event_loop(loop)
375 prompt_version = loop.run_until_complete(
376 self.version_manager.get_version(name, "system", version)
377 )
379 if not prompt_version:
380 # Fall back to base library if available
381 if self.base_library:
382 return self.base_library.get_system_prompt(name, **kwargs)
383 return None
385 return self._version_to_template(prompt_version)
387 def get_user_prompt(
388 self,
389 name: str,
390 version: str = "latest",
391 **kwargs: Any
392 ) -> PromptTemplateDict | None:
393 """Get a user prompt template.
395 Args:
396 name: Prompt name
397 version: Version string or "latest"
398 **kwargs: Additional parameters
400 Returns:
401 PromptTemplateDict if found, None otherwise
402 """
403 import asyncio
405 try:
406 loop = asyncio.get_event_loop()
407 except RuntimeError:
408 loop = asyncio.new_event_loop()
409 asyncio.set_event_loop(loop)
411 prompt_version = loop.run_until_complete(
412 self.version_manager.get_version(name, "user", version)
413 )
415 if not prompt_version:
416 if self.base_library:
417 return self.base_library.get_user_prompt(name, **kwargs)
418 return None
420 return self._version_to_template(prompt_version)
422 def list_system_prompts(self) -> List[str]:
423 """List all system prompt names.
425 Returns:
426 List of prompt names
427 """
428 import asyncio
430 try:
431 loop = asyncio.get_event_loop()
432 except RuntimeError:
433 loop = asyncio.new_event_loop()
434 asyncio.set_event_loop(loop)
436 # Get unique prompt names from version index
437 names = set()
438 for key in self.version_manager._version_index.keys():
439 name, ptype = key.split(":", 1)
440 if ptype == "system":
441 names.add(name)
443 # Add from base library if available
444 if self.base_library:
445 names.update(self.base_library.list_system_prompts())
447 return sorted(names)
449 def list_user_prompts(self) -> List[str]:
450 """List all user prompt names.
452 Returns:
453 List of prompt names
454 """
455 names = set()
456 for key in self.version_manager._version_index.keys():
457 name, ptype = key.split(":", 1)
458 if ptype == "user":
459 names.add(name)
461 if self.base_library:
462 names.update(self.base_library.list_user_prompts())
464 return sorted(names)
466 def get_message_index(
467 self,
468 name: str,
469 **kwargs: Any
470 ) -> MessageIndex | None:
471 """Get a message index.
473 Note: Message indexes are not versioned in this implementation.
474 Falls back to base library if available.
476 Args:
477 name: Message index name
478 **kwargs: Additional parameters
480 Returns:
481 MessageIndex if found, None otherwise
482 """
483 if self.base_library:
484 return self.base_library.get_message_index(name, **kwargs)
485 return None
487 def list_message_indexes(self) -> List[str]:
488 """List all message index names.
490 Returns:
491 List of message index names
492 """
493 if self.base_library:
494 return self.base_library.list_message_indexes()
495 return []
497 def get_rag_config(
498 self,
499 name: str,
500 **kwargs: Any
501 ) -> RAGConfig | None:
502 """Get a RAG configuration.
504 Note: RAG configs are not versioned in this implementation.
505 Falls back to base library if available.
507 Args:
508 name: RAG config name
509 **kwargs: Additional parameters
511 Returns:
512 RAGConfig if found, None otherwise
513 """
514 if self.base_library:
515 return self.base_library.get_rag_config(name, **kwargs)
516 return None
518 def get_prompt_rag_configs(
519 self,
520 prompt_name: str,
521 prompt_type: str = "user",
522 **kwargs: Any
523 ) -> List[RAGConfig]:
524 """Get RAG configurations for a prompt.
526 Args:
527 prompt_name: Prompt name
528 prompt_type: Prompt type
529 **kwargs: Additional parameters
531 Returns:
532 List of RAG configurations
533 """
534 if self.base_library:
535 return self.base_library.get_prompt_rag_configs(prompt_name, prompt_type, **kwargs)
536 return []
538 def get_metadata(self) -> Dict[str, Any]:
539 """Get library metadata.
541 Returns:
542 Metadata dictionary
543 """
544 return {
545 "type": "VersionedPromptLibrary",
546 "storage": str(type(self.storage).__name__) if self.storage else "in-memory",
547 "has_base_library": self.base_library is not None,
548 "version_count": len(self.version_manager._versions),
549 "experiment_count": len(self.ab_test_manager._experiments),
550 }
552 # ===== Helper Methods =====
554 def _version_to_template(self, version: PromptVersion) -> PromptTemplateDict:
555 """Convert PromptVersion to PromptTemplateDict for compatibility."""
556 # Check cache
557 cache_key = version.version_id
558 if cache_key in self._template_cache:
559 return self._template_cache[cache_key]
561 template: PromptTemplateDict = {
562 "template": version.template,
563 "defaults": version.defaults,
564 "metadata": {
565 **version.metadata,
566 "version_id": version.version_id,
567 "version": version.version,
568 "created_at": version.created_at.isoformat(),
569 "tags": version.tags,
570 "status": version.status.value,
571 }
572 }
574 if version.validation:
575 template["validation"] = version.validation # type: ignore[typeddict-item]
577 # Cache it
578 self._template_cache[cache_key] = template
580 return template