Coverage for src/dataknobs_llm/prompts/builders/async_prompt_builder.py: 90%
114 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Asynchronous prompt builder for constructing prompts with parameter resolution and RAG.
3This module provides the AsyncPromptBuilder class which coordinates between:
4- Prompt libraries (template sources)
5- Async resource adapters (data sources for RAG)
6- Template renderer (rendering engine)
8The async builder handles:
9- Concurrent parameter resolution from multiple sources
10- Parallel RAG content retrieval and injection
11- RAG result caching for performance optimization
12- Validation enforcement
13- Template defaults merging
15All async operations use asyncio.gather() for maximum parallelism, making it
16ideal for high-throughput applications with RAG-enhanced prompts.
18Key Features:
19 - Parallel RAG searches across multiple data sources
20 - RAG result caching to avoid redundant searches
21 - Async-first design for optimal performance
22 - Flexible parameter resolution from multiple adapters
23 - Template validation with configurable levels
24 - Metadata tracking for debugging and optimization
26Example:
27 ```python
28 from dataknobs_llm.prompts import AsyncPromptBuilder
29 from dataknobs_llm.prompts import FileSystemPromptLibrary
30 from dataknobs_llm.prompts.adapters import (
31 AsyncDictResourceAdapter,
32 AsyncDataknobsBackendAdapter
33 )
34 from dataknobs_data import database_factory
36 # Set up data sources for RAG
37 docs_db = database_factory.create("vector", embedding_model="...")
39 # Create adapters
40 adapters = {
41 'config': AsyncDictResourceAdapter({
42 'company': 'Acme Corp',
43 'domain': 'e-commerce'
44 }),
45 'docs': AsyncDataknobsBackendAdapter(docs_db)
46 }
48 # Create builder with prompt library
49 library = FileSystemPromptLibrary("prompts/")
50 builder = AsyncPromptBuilder(
51 library=library,
52 adapters=adapters,
53 default_validation=ValidationLevel.WARN
54 )
56 # Render prompt with RAG
57 result = await builder.render_user_prompt(
58 "analyze_code",
59 index=0,
60 params={
61 'code': code_snippet,
62 'language': 'python'
63 },
64 include_rag=True # Executes RAG searches in parallel
65 )
67 # Access rendered content
68 print(result.content)
70 # Cache RAG results for reuse
71 result_with_metadata = await builder.render_user_prompt(
72 "analyze_code",
73 params={'code': another_snippet, 'language': 'python'},
74 return_rag_metadata=True
75 )
77 # Reuse cached RAG (avoids re-searching)
78 result2 = await builder.render_user_prompt(
79 "analyze_code",
80 params={'code': yet_another_snippet, 'language': 'python'},
81 cached_rag=result_with_metadata.rag_metadata
82 )
84 # Parallel rendering of multiple prompts
85 results = await asyncio.gather(
86 builder.render_system_prompt("helpful_assistant"),
87 builder.render_user_prompt("user_query", params={'question': q}),
88 builder.render_user_prompt("context_setter", params={'topic': t})
89 )
90 ```
92See Also:
93 - BasePromptBuilder: Base implementation with shared logic
94 - PromptBuilder: Synchronous version for non-async applications
95 - AsyncResourceAdapter: Adapter interface for async data sources
96 - AbstractPromptLibrary: Prompt library interface
97"""
99import asyncio
100import logging
101from typing import Any, Dict
103from ..base import (
104 AbstractPromptLibrary,
105 PromptTemplateDict,
106 RAGConfig,
107 ValidationLevel,
108 RenderResult,
109)
110from ..adapters import AsyncResourceAdapter
111from .base_prompt_builder import BasePromptBuilder
113logger = logging.getLogger(__name__)
116class AsyncPromptBuilder(BasePromptBuilder):
117 """Asynchronous prompt builder for constructing prompts with RAG and validation.
119 This class provides a high-level async API for building prompts by:
120 1. Retrieving prompt templates from a library
121 2. Resolving parameters from adapters and runtime values (concurrently)
122 3. Executing RAG searches via adapters (in parallel)
123 4. Injecting RAG content into templates
124 5. Rendering final prompts with validation
126 Example:
127 >>> library = ConfigPromptLibrary(config)
128 >>> adapters = {
129 ... 'config': AsyncDictResourceAdapter(config_data),
130 ... 'docs': AsyncDataknobsBackendAdapter(docs_db)
131 ... }
132 >>> builder = AsyncPromptBuilder(library=library, adapters=adapters)
133 >>>
134 >>> # Render a system prompt
135 >>> result = await builder.render_system_prompt(
136 ... 'analyze_code',
137 ... params={'code': code_snippet, 'language': 'python'}
138 ... )
139 """
141 def __init__(
142 self,
143 library: AbstractPromptLibrary,
144 adapters: Dict[str, AsyncResourceAdapter] | None = None,
145 default_validation: ValidationLevel = ValidationLevel.WARN,
146 raise_on_rag_error: bool = False
147 ):
148 """Initialize the asynchronous prompt builder.
150 Args:
151 library: Prompt library to retrieve templates from
152 adapters: Dictionary of named async resource adapters for parameter
153 resolution and RAG searches
154 default_validation: Default validation level for templates without
155 explicit validation configuration
156 raise_on_rag_error: If True, raise exceptions on RAG failures;
157 if False (default), log warning and continue
159 Raises:
160 TypeError: If any adapter is sync (use PromptBuilder instead)
161 """
162 super().__init__(library, adapters, default_validation, raise_on_rag_error)
163 self._validate_adapters()
165 def _validate_adapters(self) -> None:
166 """Validate that all adapters are asynchronous.
168 Raises:
169 TypeError: If any adapter is sync
170 """
171 for name, adapter in self.adapters.items():
172 if not adapter.is_async():
173 raise TypeError(
174 f"Adapter '{name}' is synchronous. "
175 "Use PromptBuilder for sync adapters."
176 )
178 async def render_system_prompt(
179 self,
180 name: str,
181 params: Dict[str, Any] | None = None,
182 include_rag: bool = True,
183 validation_override: ValidationLevel | None = None,
184 return_rag_metadata: bool = False,
185 cached_rag: Dict[str, Any] | None = None,
186 **kwargs: Any
187 ) -> RenderResult:
188 """Render a system prompt with parameters and optional RAG content.
190 Args:
191 name: System prompt identifier
192 params: Runtime parameters to use in rendering
193 include_rag: Whether to include RAG content (default: True)
194 validation_override: Override validation level for this render
195 return_rag_metadata: If True, attach RAG metadata to result
196 cached_rag: If provided, use these cached RAG results instead
197 of executing new searches
198 **kwargs: Additional parameters passed to library
200 Returns:
201 RenderResult with rendered content and metadata
203 Raises:
204 ValueError: If prompt not found or validation fails
206 Example:
207 >>> # Capture RAG metadata
208 >>> result = await builder.render_system_prompt(
209 ... 'code_question',
210 ... params={'language': 'python'},
211 ... return_rag_metadata=True
212 ... )
213 >>> print(result.rag_metadata)
214 >>>
215 >>> # Reuse cached RAG
216 >>> result2 = await builder.render_system_prompt(
217 ... 'code_question',
218 ... params={'language': 'python'},
219 ... cached_rag=result.rag_metadata
220 ... )
221 """
222 params = params or {}
224 # Retrieve template from library
225 template_dict = self.library.get_system_prompt(name, **kwargs)
226 if template_dict is None:
227 raise ValueError(f"System prompt not found: {name}")
229 # Render the prompt
230 return await self._render_prompt_impl(
231 prompt_name=name,
232 prompt_type="system",
233 template_dict=template_dict,
234 runtime_params=params,
235 include_rag=include_rag,
236 validation_override=validation_override,
237 return_rag_metadata=return_rag_metadata,
238 cached_rag=cached_rag,
239 **kwargs
240 )
242 async def render_user_prompt(
243 self,
244 name: str,
245 params: Dict[str, Any] | None = None,
246 include_rag: bool = True,
247 validation_override: ValidationLevel | None = None,
248 return_rag_metadata: bool = False,
249 cached_rag: Dict[str, Any] | None = None,
250 **kwargs: Any
251 ) -> RenderResult:
252 """Render a user prompt with parameters and optional RAG content.
254 Args:
255 name: User prompt identifier
256 params: Runtime parameters to use in rendering
257 include_rag: Whether to include RAG content (default: True)
258 validation_override: Override validation level for this render
259 return_rag_metadata: If True, attach RAG metadata to result
260 cached_rag: If provided, use these cached RAG results instead
261 of executing new searches
262 **kwargs: Additional parameters passed to library
264 Returns:
265 RenderResult with rendered content and metadata
267 Raises:
268 ValueError: If prompt not found or validation fails
269 """
270 params = params or {}
272 # Retrieve template from library
273 template_dict = self.library.get_user_prompt(name, **kwargs)
274 if template_dict is None:
275 raise ValueError(f"User prompt not found: {name}")
277 # Render the prompt
278 return await self._render_prompt_impl(
279 prompt_name=name,
280 prompt_type="user",
281 template_dict=template_dict,
282 runtime_params=params,
283 include_rag=include_rag,
284 validation_override=validation_override,
285 return_rag_metadata=return_rag_metadata,
286 cached_rag=cached_rag,
287 **kwargs
288 )
290 async def _render_prompt_impl(
291 self,
292 prompt_name: str,
293 prompt_type: str,
294 template_dict: PromptTemplateDict,
295 runtime_params: Dict[str, Any],
296 include_rag: bool,
297 validation_override: ValidationLevel | None,
298 return_rag_metadata: bool = False,
299 cached_rag: Dict[str, Any] | None = None,
300 **kwargs: Any
301 ) -> RenderResult:
302 """Internal method to render a prompt template asynchronously.
304 Args:
305 prompt_name: Name of the prompt
306 prompt_type: Type of prompt ("system" or "user")
307 template_dict: Template dictionary from library
308 runtime_params: Runtime parameters
309 include_rag: Whether to include RAG content
310 validation_override: Validation level override
311 return_rag_metadata: If True, capture and return RAG metadata
312 cached_rag: If provided, use these cached RAG results instead
313 of executing new searches
314 **kwargs: Additional parameters
316 Returns:
317 RenderResult with rendered content and metadata
318 """
319 # Extract template components
320 template = template_dict.get("template", "")
321 template_metadata = template_dict.get("metadata", {})
323 # Step 1: Merge defaults with runtime params
324 all_params = self._merge_params_with_defaults(template_dict, runtime_params)
326 # Step 2: Execute or reuse RAG searches
327 rag_metadata = None
328 if include_rag:
329 if cached_rag:
330 # Use cached RAG results
331 rag_content = self._extract_formatted_content_from_cache(cached_rag)
332 if return_rag_metadata:
333 rag_metadata = cached_rag # Pass through cached metadata
334 else:
335 # Execute fresh RAG searches
336 rag_content, rag_metadata = await self._execute_rag_searches_impl(
337 prompt_name=prompt_name,
338 prompt_type=prompt_type,
339 params=all_params,
340 capture_metadata=return_rag_metadata,
341 **kwargs
342 )
344 # Merge RAG content into parameters
345 all_params.update(rag_content)
347 # Step 3: Prepare validation config with override
348 validation_config = self._prepare_validation_config(template_dict, validation_override)
350 # Step 4: Render template with validation (synchronous)
351 result = self._renderer.render(
352 template=template,
353 params=all_params,
354 validation=validation_config,
355 template_metadata=template_metadata
356 )
358 # Attach RAG metadata if requested
359 if return_rag_metadata and rag_metadata:
360 result.rag_metadata = rag_metadata
362 # Add builder metadata
363 result.metadata.update({
364 "prompt_name": prompt_name,
365 "prompt_type": prompt_type,
366 "include_rag": include_rag,
367 "used_cached_rag": cached_rag is not None,
368 })
370 return result
372 async def _execute_rag_searches_impl(
373 self,
374 prompt_name: str,
375 prompt_type: str,
376 params: Dict[str, Any],
377 capture_metadata: bool = False,
378 **kwargs: Any
379 ) -> tuple[Dict[str, str], Dict[str, Any] | None]:
380 """Execute RAG searches in parallel and format results for injection.
382 Args:
383 prompt_name: Name of the prompt
384 prompt_type: Type of prompt ("system" or "user")
385 params: Resolved parameters for query templating
386 capture_metadata: If True, capture RAG metadata
387 **kwargs: Additional parameters
389 Returns:
390 Tuple of (rag_content, rag_metadata):
391 - rag_content: Dictionary mapping placeholder names to formatted content
392 - rag_metadata: Optional dict with full RAG details (if capture_metadata=True)
393 """
394 # Get RAG configurations for this prompt
395 rag_configs = self.library.get_prompt_rag_configs(
396 prompt_name=prompt_name,
397 prompt_type=prompt_type,
398 **kwargs
399 )
401 if not rag_configs:
402 return {}, None
404 # Execute all RAG searches in parallel
405 rag_content = {}
406 rag_metadata = {} if capture_metadata else None
408 if capture_metadata:
409 tasks_with_metadata = [
410 self._execute_single_rag_with_metadata(rag_config, params)
411 for rag_config in rag_configs
412 ]
413 results_with_metadata = await asyncio.gather(*tasks_with_metadata, return_exceptions=True)
415 for rag_config, result in zip(rag_configs, results_with_metadata, strict=True):
416 placeholder = rag_config.get("placeholder", "RAG_CONTENT")
418 if isinstance(result, BaseException):
419 error_msg = f"RAG search failed for {prompt_name}: {result}"
420 if self._raise_on_rag_error:
421 raise RuntimeError(error_msg) from result
422 else:
423 logger.warning(error_msg)
424 rag_content[placeholder] = ""
425 if rag_metadata is not None:
426 from datetime import datetime
427 rag_metadata[placeholder] = {
428 "error": str(result),
429 "timestamp": datetime.now().isoformat()
430 }
431 else:
432 formatted_content, metadata = result
433 rag_content[placeholder] = formatted_content
434 if metadata and rag_metadata is not None:
435 rag_metadata[placeholder] = metadata
436 else:
437 tasks_no_metadata = [
438 self._execute_single_rag_search_safe(rag_config, params)
439 for rag_config in rag_configs
440 ]
441 results_no_metadata = await asyncio.gather(*tasks_no_metadata, return_exceptions=True)
443 for rag_config, result in zip(rag_configs, results_no_metadata, strict=True):
444 placeholder = rag_config.get("placeholder", "RAG_CONTENT")
446 if isinstance(result, BaseException):
447 error_msg = f"RAG search failed for {prompt_name}: {result}"
448 if self._raise_on_rag_error:
449 raise RuntimeError(error_msg) from result
450 else:
451 logger.warning(error_msg)
452 rag_content[placeholder] = ""
453 else:
454 rag_content[placeholder] = result
456 return rag_content, rag_metadata
458 async def _execute_single_rag_search_safe(
459 self,
460 rag_config: RAGConfig,
461 params: Dict[str, Any]
462 ) -> str:
463 """Safely execute a single RAG search (for use with asyncio.gather).
465 Args:
466 rag_config: RAG configuration
467 params: Parameters for query templating
469 Returns:
470 Formatted RAG content string
472 Raises:
473 Exception: Propagated from _execute_single_rag_search
474 """
475 return await self._execute_single_rag_search(rag_config, params)
477 async def _execute_single_rag_search(
478 self,
479 rag_config: RAGConfig,
480 params: Dict[str, Any]
481 ) -> str:
482 """Execute a single RAG search and format results asynchronously.
484 Args:
485 rag_config: RAG configuration
486 params: Parameters for query templating
488 Returns:
489 Formatted RAG content string
491 Raises:
492 KeyError: If adapter not found
493 Exception: If search fails
494 """
495 # Get adapter
496 adapter_name = rag_config.get("adapter_name")
497 if not adapter_name:
498 raise ValueError("RAG config missing 'adapter_name'")
500 if adapter_name not in self.adapters:
501 raise KeyError(
502 f"Adapter '{adapter_name}' not found. "
503 f"Available adapters: {list(self.adapters.keys())}"
504 )
506 adapter = self.adapters[adapter_name]
508 # Render query template
509 query_template = rag_config.get("query", "")
510 query = self._render_rag_query(query_template, params)
512 # Execute search (async)
513 k = rag_config.get("k", 5)
514 filters = rag_config.get("filters")
515 search_results = await adapter.search(query=query, k=k, filters=filters)
517 # Format results
518 formatted_content = self._format_rag_results(
519 results=search_results,
520 rag_config=rag_config,
521 params=params
522 )
524 return formatted_content
526 async def _execute_single_rag_with_metadata(
527 self,
528 rag_config: RAGConfig,
529 params: Dict[str, Any]
530 ) -> tuple[str, Dict[str, Any]]:
531 """Execute a single RAG search with metadata capture.
533 This method executes a RAG search and captures detailed metadata
534 including the query, results, and query hash for caching.
536 Args:
537 rag_config: RAG configuration
538 params: Parameters for query templating
540 Returns:
541 Tuple of (formatted_content, metadata):
542 - formatted_content: Formatted RAG content string
543 - metadata: Dictionary with RAG metadata including:
544 - adapter_name: Name of the adapter used
545 - query: Rendered query string
546 - query_hash: SHA256 hash for cache matching
547 - k: Number of results requested
548 - filters: Filters applied to search
549 - timestamp: ISO format timestamp
550 - results: Raw search results
551 - formatted_content: Formatted output
552 - item_template: Template used for formatting
553 - header: Header text used
555 Raises:
556 KeyError: If adapter not found
557 Exception: If search fails
558 """
559 from datetime import datetime
561 # Get adapter
562 adapter_name = rag_config.get("adapter_name")
563 if not adapter_name:
564 raise ValueError("RAG config missing 'adapter_name'")
566 if adapter_name not in self.adapters:
567 raise KeyError(
568 f"Adapter '{adapter_name}' not found. "
569 f"Available adapters: {list(self.adapters.keys())}"
570 )
572 adapter = self.adapters[adapter_name]
574 # Render query template
575 query_template = rag_config.get("query", "")
576 query = self._render_rag_query(query_template, params)
578 # Compute query hash for cache matching
579 query_hash = self._compute_rag_query_hash(adapter_name, query)
581 # Execute search (async)
582 k = rag_config.get("k", 5)
583 filters = rag_config.get("filters")
584 search_results = await adapter.search(query=query, k=k, filters=filters)
586 # Format results
587 formatted_content = self._format_rag_results(
588 results=search_results,
589 rag_config=rag_config,
590 params=params
591 )
593 # Build metadata
594 metadata = {
595 "adapter_name": adapter_name,
596 "query": query,
597 "query_hash": query_hash,
598 "k": k,
599 "filters": filters,
600 "timestamp": datetime.now().isoformat(),
601 "results": search_results, # Store raw results
602 "formatted_content": formatted_content,
603 "item_template": rag_config.get("item_template"),
604 "header": rag_config.get("header"),
605 }
607 return formatted_content, metadata