Coverage for src / dataknobs_llm / prompts / builders / async_prompt_builder.py: 18%
123 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 10:29 -0700
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 10:29 -0700
1"""Asynchronous prompt builder for constructing prompts with parameter resolution and RAG.
3This module provides the AsyncPromptBuilder class which coordinates between:
4- Prompt libraries (template sources)
5- Async resource adapters (data sources for RAG)
6- Template renderer (rendering engine)
8The async builder handles:
9- Concurrent parameter resolution from multiple sources
10- Parallel RAG content retrieval and injection
11- RAG result caching for performance optimization
12- Validation enforcement
13- Template defaults merging
15All async operations use asyncio.gather() for maximum parallelism, making it
16ideal for high-throughput applications with RAG-enhanced prompts.
18Key Features:
19 - Parallel RAG searches across multiple data sources
20 - RAG result caching to avoid redundant searches
21 - Async-first design for optimal performance
22 - Flexible parameter resolution from multiple adapters
23 - Template validation with configurable levels
24 - Metadata tracking for debugging and optimization
26Example:
27 ```python
28 from dataknobs_llm.prompts import AsyncPromptBuilder
29 from dataknobs_llm.prompts import FileSystemPromptLibrary
30 from dataknobs_llm.prompts.adapters import (
31 AsyncDictResourceAdapter,
32 AsyncDataknobsBackendAdapter
33 )
34 from dataknobs_data import database_factory
36 # Set up data sources for RAG
37 docs_db = database_factory.create("vector", embedding_model="...")
39 # Create adapters
40 adapters = {
41 'config': AsyncDictResourceAdapter({
42 'company': 'Acme Corp',
43 'domain': 'e-commerce'
44 }),
45 'docs': AsyncDataknobsBackendAdapter(docs_db)
46 }
48 # Create builder with prompt library
49 library = FileSystemPromptLibrary("prompts/")
50 builder = AsyncPromptBuilder(
51 library=library,
52 adapters=adapters,
53 default_validation=ValidationLevel.WARN
54 )
56 # Render prompt with RAG
57 result = await builder.render_user_prompt(
58 "analyze_code",
59 index=0,
60 params={
61 'code': code_snippet,
62 'language': 'python'
63 },
64 include_rag=True # Executes RAG searches in parallel
65 )
67 # Access rendered content
68 print(result.content)
70 # Cache RAG results for reuse
71 result_with_metadata = await builder.render_user_prompt(
72 "analyze_code",
73 params={'code': another_snippet, 'language': 'python'},
74 return_rag_metadata=True
75 )
77 # Reuse cached RAG (avoids re-searching)
78 result2 = await builder.render_user_prompt(
79 "analyze_code",
80 params={'code': yet_another_snippet, 'language': 'python'},
81 cached_rag=result_with_metadata.rag_metadata
82 )
84 # Parallel rendering of multiple prompts
85 results = await asyncio.gather(
86 builder.render_system_prompt("helpful_assistant"),
87 builder.render_user_prompt("user_query", params={'question': q}),
88 builder.render_user_prompt("context_setter", params={'topic': t})
89 )
90 ```
92See Also:
93 - BasePromptBuilder: Base implementation with shared logic
94 - PromptBuilder: Synchronous version for non-async applications
95 - AsyncResourceAdapter: Adapter interface for async data sources
96 - AbstractPromptLibrary: Prompt library interface
97"""
99import asyncio
100import logging
101from typing import Any, Dict
103from ..base import (
104 AbstractPromptLibrary,
105 PromptTemplateDict,
106 RAGConfig,
107 ValidationLevel,
108 RenderResult,
109)
110from ..adapters import AsyncResourceAdapter
111from .base_prompt_builder import BasePromptBuilder
113logger = logging.getLogger(__name__)
116class AsyncPromptBuilder(BasePromptBuilder):
117 """Asynchronous prompt builder for constructing prompts with RAG and validation.
119 This class provides a high-level async API for building prompts by:
120 1. Retrieving prompt templates from a library
121 2. Resolving parameters from adapters and runtime values (concurrently)
122 3. Executing RAG searches via adapters (in parallel)
123 4. Injecting RAG content into templates
124 5. Rendering final prompts with validation
126 Example:
127 >>> library = ConfigPromptLibrary(config)
128 >>> adapters = {
129 ... 'config': AsyncDictResourceAdapter(config_data),
130 ... 'docs': AsyncDataknobsBackendAdapter(docs_db)
131 ... }
132 >>> builder = AsyncPromptBuilder(library=library, adapters=adapters)
133 >>>
134 >>> # Render a system prompt
135 >>> result = await builder.render_system_prompt(
136 ... 'analyze_code',
137 ... params={'code': code_snippet, 'language': 'python'}
138 ... )
139 """
141 def __init__(
142 self,
143 library: AbstractPromptLibrary,
144 adapters: Dict[str, AsyncResourceAdapter] | None = None,
145 default_validation: ValidationLevel = ValidationLevel.WARN,
146 raise_on_rag_error: bool = False
147 ):
148 """Initialize the asynchronous prompt builder.
150 Args:
151 library: Prompt library to retrieve templates from
152 adapters: Dictionary of named async resource adapters for parameter
153 resolution and RAG searches
154 default_validation: Default validation level for templates without
155 explicit validation configuration
156 raise_on_rag_error: If True, raise exceptions on RAG failures;
157 if False (default), log warning and continue
159 Raises:
160 TypeError: If any adapter is sync (use PromptBuilder instead)
161 """
162 super().__init__(library, adapters, default_validation, raise_on_rag_error)
163 self._validate_adapters()
165 def _validate_adapters(self) -> None:
166 """Validate that all adapters are asynchronous.
168 Raises:
169 TypeError: If any adapter is sync
170 """
171 for name, adapter in self.adapters.items():
172 if not adapter.is_async():
173 raise TypeError(
174 f"Adapter '{name}' is synchronous. "
175 "Use PromptBuilder for sync adapters."
176 )
178 async def render_system_prompt(
179 self,
180 name: str,
181 params: Dict[str, Any] | None = None,
182 include_rag: bool = True,
183 validation_override: ValidationLevel | None = None,
184 return_rag_metadata: bool = False,
185 cached_rag: Dict[str, Any] | None = None,
186 **kwargs: Any
187 ) -> RenderResult:
188 """Render a system prompt with parameters and optional RAG content.
190 Args:
191 name: System prompt identifier
192 params: Runtime parameters to use in rendering
193 include_rag: Whether to include RAG content (default: True)
194 validation_override: Override validation level for this render
195 return_rag_metadata: If True, attach RAG metadata to result
196 cached_rag: If provided, use these cached RAG results instead
197 of executing new searches
198 **kwargs: Additional parameters passed to library
200 Returns:
201 RenderResult with rendered content and metadata
203 Raises:
204 ValueError: If prompt not found or validation fails
206 Example:
207 >>> # Capture RAG metadata
208 >>> result = await builder.render_system_prompt(
209 ... 'code_question',
210 ... params={'language': 'python'},
211 ... return_rag_metadata=True
212 ... )
213 >>> print(result.rag_metadata)
214 >>>
215 >>> # Reuse cached RAG
216 >>> result2 = await builder.render_system_prompt(
217 ... 'code_question',
218 ... params={'language': 'python'},
219 ... cached_rag=result.rag_metadata
220 ... )
221 """
222 params = params or {}
224 # Retrieve template from library
225 template_dict = self.library.get_system_prompt(name, **kwargs)
226 if template_dict is None:
227 raise ValueError(f"System prompt not found: {name}")
229 # Render the prompt
230 return await self._render_prompt_impl(
231 prompt_name=name,
232 prompt_type="system",
233 template_dict=template_dict,
234 runtime_params=params,
235 include_rag=include_rag,
236 validation_override=validation_override,
237 return_rag_metadata=return_rag_metadata,
238 cached_rag=cached_rag,
239 **kwargs
240 )
242 async def render_user_prompt(
243 self,
244 name: str,
245 params: Dict[str, Any] | None = None,
246 include_rag: bool = True,
247 validation_override: ValidationLevel | None = None,
248 return_rag_metadata: bool = False,
249 cached_rag: Dict[str, Any] | None = None,
250 **kwargs: Any
251 ) -> RenderResult:
252 """Render a user prompt with parameters and optional RAG content.
254 Args:
255 name: User prompt identifier
256 params: Runtime parameters to use in rendering
257 include_rag: Whether to include RAG content (default: True)
258 validation_override: Override validation level for this render
259 return_rag_metadata: If True, attach RAG metadata to result
260 cached_rag: If provided, use these cached RAG results instead
261 of executing new searches
262 **kwargs: Additional parameters passed to library
264 Returns:
265 RenderResult with rendered content and metadata
267 Raises:
268 ValueError: If prompt not found or validation fails
269 """
270 params = params or {}
272 # Retrieve template from library
273 template_dict = self.library.get_user_prompt(name, **kwargs)
274 if template_dict is None:
275 raise ValueError(f"User prompt not found: {name}")
277 # Render the prompt
278 return await self._render_prompt_impl(
279 prompt_name=name,
280 prompt_type="user",
281 template_dict=template_dict,
282 runtime_params=params,
283 include_rag=include_rag,
284 validation_override=validation_override,
285 return_rag_metadata=return_rag_metadata,
286 cached_rag=cached_rag,
287 **kwargs
288 )
290 async def render_inline_system_prompt(
291 self,
292 content: str,
293 params: Dict[str, Any] | None = None,
294 rag_configs: list[RAGConfig] | None = None,
295 include_rag: bool = True,
296 validation_override: ValidationLevel | None = None,
297 return_rag_metadata: bool = False,
298 cached_rag: Dict[str, Any] | None = None,
299 **kwargs: Any,
300 ) -> RenderResult:
301 """Render inline system prompt content with optional RAG enhancement.
303 This method allows users to provide raw prompt content while still
304 benefiting from RAG context injection and template parameter substitution.
306 Args:
307 content: The raw system prompt content (may contain Jinja2 templates)
308 params: Runtime parameters for template rendering
309 rag_configs: Optional RAG configurations for context retrieval
310 include_rag: Whether to execute RAG queries (default: True)
311 validation_override: Override default validation level
312 return_rag_metadata: Include RAG metadata in result
313 cached_rag: Pre-cached RAG results to reuse
314 **kwargs: Additional parameters
316 Returns:
317 RenderResult with rendered content and metadata
319 Example:
320 >>> result = await builder.render_inline_system_prompt(
321 ... content="You are a helpful {{ role }} assistant.",
322 ... params={"role": "coding"},
323 ... rag_configs=[{
324 ... "adapter_name": "docs",
325 ... "query": "{{ topic }}",
326 ... "placeholder": "CONTEXT",
327 ... "k": 3
328 ... }]
329 ... )
330 """
331 # Construct a template_dict from inline content
332 template_dict: PromptTemplateDict = {
333 "template": content,
334 "defaults": {},
335 "metadata": {"source": "inline", "type": "system"},
336 "rag_configs": rag_configs or [],
337 }
339 return await self._render_prompt_impl(
340 prompt_name="<inline:system>",
341 prompt_type="system",
342 template_dict=template_dict,
343 runtime_params=params or {},
344 include_rag=include_rag and bool(rag_configs),
345 validation_override=validation_override,
346 return_rag_metadata=return_rag_metadata,
347 cached_rag=cached_rag,
348 **kwargs,
349 )
351 async def render_inline_user_prompt(
352 self,
353 content: str,
354 params: Dict[str, Any] | None = None,
355 rag_configs: list[RAGConfig] | None = None,
356 include_rag: bool = True,
357 validation_override: ValidationLevel | None = None,
358 return_rag_metadata: bool = False,
359 cached_rag: Dict[str, Any] | None = None,
360 **kwargs: Any,
361 ) -> RenderResult:
362 """Render inline user prompt content with optional RAG enhancement.
364 This method allows users to provide raw prompt content while still
365 benefiting from RAG context injection and template parameter substitution.
367 Args:
368 content: The raw user prompt content (may contain Jinja2 templates)
369 params: Runtime parameters for template rendering
370 rag_configs: Optional RAG configurations for context retrieval
371 include_rag: Whether to execute RAG queries (default: True)
372 validation_override: Override default validation level
373 return_rag_metadata: Include RAG metadata in result
374 cached_rag: Pre-cached RAG results to reuse
375 **kwargs: Additional parameters
377 Returns:
378 RenderResult with rendered content and metadata
380 Example:
381 >>> result = await builder.render_inline_user_prompt(
382 ... content="Help me understand {{ topic }}",
383 ... params={"topic": "decorators"},
384 ... rag_configs=[{
385 ... "adapter_name": "docs",
386 ... "query": "python {{ topic }} tutorial",
387 ... "placeholder": "DOCS",
388 ... "k": 5
389 ... }]
390 ... )
391 """
392 # Construct a template_dict from inline content
393 template_dict: PromptTemplateDict = {
394 "template": content,
395 "defaults": {},
396 "metadata": {"source": "inline", "type": "user"},
397 "rag_configs": rag_configs or [],
398 }
400 return await self._render_prompt_impl(
401 prompt_name="<inline:user>",
402 prompt_type="user",
403 template_dict=template_dict,
404 runtime_params=params or {},
405 include_rag=include_rag and bool(rag_configs),
406 validation_override=validation_override,
407 return_rag_metadata=return_rag_metadata,
408 cached_rag=cached_rag,
409 **kwargs,
410 )
412 async def _render_prompt_impl(
413 self,
414 prompt_name: str,
415 prompt_type: str,
416 template_dict: PromptTemplateDict,
417 runtime_params: Dict[str, Any],
418 include_rag: bool,
419 validation_override: ValidationLevel | None,
420 return_rag_metadata: bool = False,
421 cached_rag: Dict[str, Any] | None = None,
422 **kwargs: Any
423 ) -> RenderResult:
424 """Internal method to render a prompt template asynchronously.
426 Args:
427 prompt_name: Name of the prompt
428 prompt_type: Type of prompt ("system" or "user")
429 template_dict: Template dictionary from library
430 runtime_params: Runtime parameters
431 include_rag: Whether to include RAG content
432 validation_override: Validation level override
433 return_rag_metadata: If True, capture and return RAG metadata
434 cached_rag: If provided, use these cached RAG results instead
435 of executing new searches
436 **kwargs: Additional parameters
438 Returns:
439 RenderResult with rendered content and metadata
440 """
441 # Extract template components
442 template = template_dict.get("template", "")
443 template_metadata = template_dict.get("metadata", {})
445 # Extract RAG configs from template_dict (for inline prompts)
446 # or None to let _execute_rag_searches_impl fetch from library
447 inline_rag_configs = template_dict.get("rag_configs")
449 # Step 1: Merge defaults with runtime params
450 all_params = self._merge_params_with_defaults(template_dict, runtime_params)
452 # Step 2: Execute or reuse RAG searches
453 rag_metadata = None
454 if include_rag:
455 if cached_rag:
456 # Use cached RAG results
457 rag_content = self._extract_formatted_content_from_cache(cached_rag)
458 if return_rag_metadata:
459 rag_metadata = cached_rag # Pass through cached metadata
460 else:
461 # Execute fresh RAG searches
462 rag_content, rag_metadata = await self._execute_rag_searches_impl(
463 prompt_name=prompt_name,
464 prompt_type=prompt_type,
465 params=all_params,
466 capture_metadata=return_rag_metadata,
467 rag_configs_override=inline_rag_configs,
468 **kwargs
469 )
471 # Merge RAG content into parameters
472 all_params.update(rag_content)
474 # Step 3: Prepare validation config with override
475 validation_config = self._prepare_validation_config(template_dict, validation_override)
477 # Step 4: Render template with validation (synchronous)
478 result = self._renderer.render(
479 template=template,
480 params=all_params,
481 validation=validation_config,
482 template_metadata=template_metadata
483 )
485 # Attach RAG metadata if requested
486 if return_rag_metadata and rag_metadata:
487 result.rag_metadata = rag_metadata
489 # Add builder metadata
490 result.metadata.update({
491 "prompt_name": prompt_name,
492 "prompt_type": prompt_type,
493 "include_rag": include_rag,
494 "used_cached_rag": cached_rag is not None,
495 })
497 return result
499 async def _execute_rag_searches_impl(
500 self,
501 prompt_name: str,
502 prompt_type: str,
503 params: Dict[str, Any],
504 capture_metadata: bool = False,
505 rag_configs_override: list[RAGConfig] | None = None,
506 **kwargs: Any
507 ) -> tuple[Dict[str, str], Dict[str, Any] | None]:
508 """Execute RAG searches in parallel and format results for injection.
510 Args:
511 prompt_name: Name of the prompt
512 prompt_type: Type of prompt ("system" or "user")
513 params: Resolved parameters for query templating
514 capture_metadata: If True, capture RAG metadata
515 rag_configs_override: If provided, use these RAG configs instead of
516 fetching from the library (for inline prompts)
517 **kwargs: Additional parameters
519 Returns:
520 Tuple of (rag_content, rag_metadata):
521 - rag_content: Dictionary mapping placeholder names to formatted content
522 - rag_metadata: Optional dict with full RAG details (if capture_metadata=True)
523 """
524 # Get RAG configurations - use override if provided, otherwise fetch from library
525 if rag_configs_override is not None:
526 rag_configs = rag_configs_override
527 else:
528 rag_configs = self.library.get_prompt_rag_configs(
529 prompt_name=prompt_name,
530 prompt_type=prompt_type,
531 **kwargs
532 )
534 if not rag_configs:
535 return {}, None
537 # Execute all RAG searches in parallel
538 rag_content = {}
539 rag_metadata = {} if capture_metadata else None
541 if capture_metadata:
542 tasks_with_metadata = [
543 self._execute_single_rag_with_metadata(rag_config, params)
544 for rag_config in rag_configs
545 ]
546 results_with_metadata = await asyncio.gather(*tasks_with_metadata, return_exceptions=True)
548 for rag_config, result in zip(rag_configs, results_with_metadata, strict=True):
549 placeholder = rag_config.get("placeholder", "RAG_CONTENT")
551 if isinstance(result, BaseException):
552 error_msg = f"RAG search failed for {prompt_name}: {result}"
553 if self._raise_on_rag_error:
554 raise RuntimeError(error_msg) from result
555 else:
556 logger.warning(error_msg)
557 rag_content[placeholder] = ""
558 if rag_metadata is not None:
559 from datetime import datetime
560 rag_metadata[placeholder] = {
561 "error": str(result),
562 "timestamp": datetime.now().isoformat()
563 }
564 else:
565 formatted_content, metadata = result
566 rag_content[placeholder] = formatted_content
567 if metadata and rag_metadata is not None:
568 rag_metadata[placeholder] = metadata
569 else:
570 tasks_no_metadata = [
571 self._execute_single_rag_search_safe(rag_config, params)
572 for rag_config in rag_configs
573 ]
574 results_no_metadata = await asyncio.gather(*tasks_no_metadata, return_exceptions=True)
576 for rag_config, result in zip(rag_configs, results_no_metadata, strict=True):
577 placeholder = rag_config.get("placeholder", "RAG_CONTENT")
579 if isinstance(result, BaseException):
580 error_msg = f"RAG search failed for {prompt_name}: {result}"
581 if self._raise_on_rag_error:
582 raise RuntimeError(error_msg) from result
583 else:
584 logger.warning(error_msg)
585 rag_content[placeholder] = ""
586 else:
587 rag_content[placeholder] = result
589 return rag_content, rag_metadata
591 async def _execute_single_rag_search_safe(
592 self,
593 rag_config: RAGConfig,
594 params: Dict[str, Any]
595 ) -> str:
596 """Safely execute a single RAG search (for use with asyncio.gather).
598 Args:
599 rag_config: RAG configuration
600 params: Parameters for query templating
602 Returns:
603 Formatted RAG content string
605 Raises:
606 Exception: Propagated from _execute_single_rag_search
607 """
608 return await self._execute_single_rag_search(rag_config, params)
610 async def _execute_single_rag_search(
611 self,
612 rag_config: RAGConfig,
613 params: Dict[str, Any]
614 ) -> str:
615 """Execute a single RAG search and format results asynchronously.
617 Args:
618 rag_config: RAG configuration
619 params: Parameters for query templating
621 Returns:
622 Formatted RAG content string
624 Raises:
625 KeyError: If adapter not found
626 Exception: If search fails
627 """
628 # Get adapter
629 adapter_name = rag_config.get("adapter_name")
630 if not adapter_name:
631 raise ValueError("RAG config missing 'adapter_name'")
633 if adapter_name not in self.adapters:
634 raise KeyError(
635 f"Adapter '{adapter_name}' not found. "
636 f"Available adapters: {list(self.adapters.keys())}"
637 )
639 adapter = self.adapters[adapter_name]
641 # Render query template
642 query_template = rag_config.get("query", "")
643 query = self._render_rag_query(query_template, params)
645 # Execute search (async)
646 k = rag_config.get("k", 5)
647 filters = rag_config.get("filters")
648 search_results = await adapter.search(query=query, k=k, filters=filters)
650 # Format results
651 formatted_content = self._format_rag_results(
652 results=search_results,
653 rag_config=rag_config,
654 params=params
655 )
657 return formatted_content
659 async def _execute_single_rag_with_metadata(
660 self,
661 rag_config: RAGConfig,
662 params: Dict[str, Any]
663 ) -> tuple[str, Dict[str, Any]]:
664 """Execute a single RAG search with metadata capture.
666 This method executes a RAG search and captures detailed metadata
667 including the query, results, and query hash for caching.
669 Args:
670 rag_config: RAG configuration
671 params: Parameters for query templating
673 Returns:
674 Tuple of (formatted_content, metadata):
675 - formatted_content: Formatted RAG content string
676 - metadata: Dictionary with RAG metadata including:
677 - adapter_name: Name of the adapter used
678 - query: Rendered query string
679 - query_hash: SHA256 hash for cache matching
680 - k: Number of results requested
681 - filters: Filters applied to search
682 - timestamp: ISO format timestamp
683 - results: Raw search results
684 - formatted_content: Formatted output
685 - item_template: Template used for formatting
686 - header: Header text used
688 Raises:
689 KeyError: If adapter not found
690 Exception: If search fails
691 """
692 from datetime import datetime
694 # Get adapter
695 adapter_name = rag_config.get("adapter_name")
696 if not adapter_name:
697 raise ValueError("RAG config missing 'adapter_name'")
699 if adapter_name not in self.adapters:
700 raise KeyError(
701 f"Adapter '{adapter_name}' not found. "
702 f"Available adapters: {list(self.adapters.keys())}"
703 )
705 adapter = self.adapters[adapter_name]
707 # Render query template
708 query_template = rag_config.get("query", "")
709 query = self._render_rag_query(query_template, params)
711 # Compute query hash for cache matching
712 query_hash = self._compute_rag_query_hash(adapter_name, query)
714 # Execute search (async)
715 k = rag_config.get("k", 5)
716 filters = rag_config.get("filters")
717 search_results = await adapter.search(query=query, k=k, filters=filters)
719 # Format results
720 formatted_content = self._format_rag_results(
721 results=search_results,
722 rag_config=rag_config,
723 params=params
724 )
726 # Build metadata
727 metadata = {
728 "adapter_name": adapter_name,
729 "query": query,
730 "query_hash": query_hash,
731 "k": k,
732 "filters": filters,
733 "timestamp": datetime.now().isoformat(),
734 "results": search_results, # Store raw results
735 "formatted_content": formatted_content,
736 "item_template": rag_config.get("item_template"),
737 "header": rag_config.get("header"),
738 }
740 return formatted_content, metadata