Coverage for src/dataknobs_llm/prompts/builders/async_prompt_builder.py: 16%
106 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:04 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:04 -0600
1"""Asynchronous prompt builder for constructing prompts with parameter resolution and RAG.
3This module provides the AsyncPromptBuilder class which coordinates between:
4- Prompt libraries (template sources)
5- Async resource adapters (data sources)
6- Template renderer (rendering engine)
8The async builder handles:
9- Concurrent parameter resolution from multiple sources
10- Parallel RAG content retrieval and injection
11- Validation enforcement
12- Template defaults merging
14All async operations use asyncio.gather() for maximum parallelism.
15"""
17import asyncio
18import logging
19from typing import Any, Dict, List, Optional
21from ..base import (
22 PromptTemplate,
23 RAGConfig,
24 ValidationLevel,
25 RenderResult,
26)
27from ..adapters import AsyncResourceAdapter
28from .base_prompt_builder import BasePromptBuilder
30logger = logging.getLogger(__name__)
33class AsyncPromptBuilder(BasePromptBuilder):
34 """Asynchronous prompt builder for constructing prompts with RAG and validation.
36 This class provides a high-level async API for building prompts by:
37 1. Retrieving prompt templates from a library
38 2. Resolving parameters from adapters and runtime values (concurrently)
39 3. Executing RAG searches via adapters (in parallel)
40 4. Injecting RAG content into templates
41 5. Rendering final prompts with validation
43 Example:
44 >>> library = ConfigPromptLibrary(config)
45 >>> adapters = {
46 ... 'config': AsyncDictResourceAdapter(config_data),
47 ... 'docs': AsyncDataknobsBackendAdapter(docs_db)
48 ... }
49 >>> builder = AsyncPromptBuilder(library=library, adapters=adapters)
50 >>>
51 >>> # Render a system prompt
52 >>> result = await builder.render_system_prompt(
53 ... 'analyze_code',
54 ... params={'code': code_snippet, 'language': 'python'}
55 ... )
56 """
58 def __init__(
59 self,
60 library,
61 adapters: Optional[Dict[str, AsyncResourceAdapter]] = None,
62 default_validation: ValidationLevel = ValidationLevel.WARN,
63 raise_on_rag_error: bool = False
64 ):
65 """Initialize the asynchronous prompt builder.
67 Args:
68 library: Prompt library to retrieve templates from
69 adapters: Dictionary of named async resource adapters for parameter
70 resolution and RAG searches
71 default_validation: Default validation level for templates without
72 explicit validation configuration
73 raise_on_rag_error: If True, raise exceptions on RAG failures;
74 if False (default), log warning and continue
76 Raises:
77 TypeError: If any adapter is sync (use PromptBuilder instead)
78 """
79 super().__init__(library, adapters, default_validation, raise_on_rag_error)
80 self._validate_adapters()
82 def _validate_adapters(self) -> None:
83 """Validate that all adapters are asynchronous.
85 Raises:
86 TypeError: If any adapter is sync
87 """
88 for name, adapter in self.adapters.items():
89 if not adapter.is_async():
90 raise TypeError(
91 f"Adapter '{name}' is synchronous. "
92 "Use PromptBuilder for sync adapters."
93 )
95 async def render_system_prompt(
96 self,
97 name: str,
98 params: Optional[Dict[str, Any]] = None,
99 include_rag: bool = True,
100 validation_override: Optional[ValidationLevel] = None,
101 return_rag_metadata: bool = False,
102 cached_rag: Optional[Dict[str, Any]] = None,
103 **kwargs: Any
104 ) -> RenderResult:
105 """Render a system prompt with parameters and optional RAG content.
107 Args:
108 name: System prompt identifier
109 params: Runtime parameters to use in rendering
110 include_rag: Whether to include RAG content (default: True)
111 validation_override: Override validation level for this render
112 return_rag_metadata: If True, attach RAG metadata to result
113 cached_rag: If provided, use these cached RAG results instead
114 of executing new searches
115 **kwargs: Additional parameters passed to library
117 Returns:
118 RenderResult with rendered content and metadata
120 Raises:
121 ValueError: If prompt not found or validation fails
123 Example:
124 >>> # Capture RAG metadata
125 >>> result = await builder.render_system_prompt(
126 ... 'code_question',
127 ... params={'language': 'python'},
128 ... return_rag_metadata=True
129 ... )
130 >>> print(result.rag_metadata)
131 >>>
132 >>> # Reuse cached RAG
133 >>> result2 = await builder.render_system_prompt(
134 ... 'code_question',
135 ... params={'language': 'python'},
136 ... cached_rag=result.rag_metadata
137 ... )
138 """
139 params = params or {}
141 # Retrieve template from library
142 template_dict = self.library.get_system_prompt(name, **kwargs)
143 if template_dict is None:
144 raise ValueError(f"System prompt not found: {name}")
146 # Render the prompt
147 return await self._render_prompt_impl(
148 prompt_name=name,
149 prompt_type="system",
150 template_dict=template_dict,
151 runtime_params=params,
152 include_rag=include_rag,
153 validation_override=validation_override,
154 return_rag_metadata=return_rag_metadata,
155 cached_rag=cached_rag,
156 **kwargs
157 )
159 async def render_user_prompt(
160 self,
161 name: str,
162 params: Optional[Dict[str, Any]] = None,
163 include_rag: bool = True,
164 validation_override: Optional[ValidationLevel] = None,
165 return_rag_metadata: bool = False,
166 cached_rag: Optional[Dict[str, Any]] = None,
167 **kwargs: Any
168 ) -> RenderResult:
169 """Render a user prompt with parameters and optional RAG content.
171 Args:
172 name: User prompt identifier
173 params: Runtime parameters to use in rendering
174 include_rag: Whether to include RAG content (default: True)
175 validation_override: Override validation level for this render
176 return_rag_metadata: If True, attach RAG metadata to result
177 cached_rag: If provided, use these cached RAG results instead
178 of executing new searches
179 **kwargs: Additional parameters passed to library
181 Returns:
182 RenderResult with rendered content and metadata
184 Raises:
185 ValueError: If prompt not found or validation fails
186 """
187 params = params or {}
189 # Retrieve template from library
190 template_dict = self.library.get_user_prompt(name, **kwargs)
191 if template_dict is None:
192 raise ValueError(f"User prompt not found: {name}")
194 # Render the prompt
195 return await self._render_prompt_impl(
196 prompt_name=name,
197 prompt_type="user",
198 template_dict=template_dict,
199 runtime_params=params,
200 include_rag=include_rag,
201 validation_override=validation_override,
202 return_rag_metadata=return_rag_metadata,
203 cached_rag=cached_rag,
204 **kwargs
205 )
207 async def _render_prompt_impl(
208 self,
209 prompt_name: str,
210 prompt_type: str,
211 template_dict: PromptTemplate,
212 runtime_params: Dict[str, Any],
213 include_rag: bool,
214 validation_override: Optional[ValidationLevel],
215 return_rag_metadata: bool = False,
216 cached_rag: Optional[Dict[str, Any]] = None,
217 **kwargs: Any
218 ) -> RenderResult:
219 """Internal method to render a prompt template asynchronously.
221 Args:
222 prompt_name: Name of the prompt
223 prompt_type: Type of prompt ("system" or "user")
224 template_dict: Template dictionary from library
225 runtime_params: Runtime parameters
226 include_rag: Whether to include RAG content
227 validation_override: Validation level override
228 return_rag_metadata: If True, capture and return RAG metadata
229 cached_rag: If provided, use these cached RAG results instead
230 of executing new searches
231 **kwargs: Additional parameters
233 Returns:
234 RenderResult with rendered content and metadata
235 """
236 # Extract template components
237 template = template_dict.get("template", "")
238 template_metadata = template_dict.get("metadata", {})
240 # Step 1: Merge defaults with runtime params
241 all_params = self._merge_params_with_defaults(template_dict, runtime_params)
243 # Step 2: Execute or reuse RAG searches
244 rag_metadata = None
245 if include_rag:
246 if cached_rag:
247 # Use cached RAG results
248 rag_content = self._extract_formatted_content_from_cache(cached_rag)
249 if return_rag_metadata:
250 rag_metadata = cached_rag # Pass through cached metadata
251 else:
252 # Execute fresh RAG searches
253 rag_content, rag_metadata = await self._execute_rag_searches_impl(
254 prompt_name=prompt_name,
255 prompt_type=prompt_type,
256 params=all_params,
257 capture_metadata=return_rag_metadata,
258 **kwargs
259 )
261 # Merge RAG content into parameters
262 all_params.update(rag_content)
264 # Step 3: Prepare validation config with override
265 validation_config = self._prepare_validation_config(template_dict, validation_override)
267 # Step 4: Render template with validation (synchronous)
268 result = self._renderer.render(
269 template=template,
270 params=all_params,
271 validation=validation_config,
272 template_metadata=template_metadata
273 )
275 # Attach RAG metadata if requested
276 if return_rag_metadata and rag_metadata:
277 result.rag_metadata = rag_metadata
279 # Add builder metadata
280 result.metadata.update({
281 "prompt_name": prompt_name,
282 "prompt_type": prompt_type,
283 "include_rag": include_rag,
284 "used_cached_rag": cached_rag is not None,
285 })
287 return result
289 async def _execute_rag_searches_impl(
290 self,
291 prompt_name: str,
292 prompt_type: str,
293 params: Dict[str, Any],
294 capture_metadata: bool = False,
295 **kwargs: Any
296 ) -> tuple[Dict[str, str], Optional[Dict[str, Any]]]:
297 """Execute RAG searches in parallel and format results for injection.
299 Args:
300 prompt_name: Name of the prompt
301 prompt_type: Type of prompt ("system" or "user")
302 params: Resolved parameters for query templating
303 capture_metadata: If True, capture RAG metadata
304 **kwargs: Additional parameters
306 Returns:
307 Tuple of (rag_content, rag_metadata):
308 - rag_content: Dictionary mapping placeholder names to formatted content
309 - rag_metadata: Optional dict with full RAG details (if capture_metadata=True)
310 """
311 # Get RAG configurations for this prompt
312 rag_configs = self.library.get_prompt_rag_configs(
313 prompt_name=prompt_name,
314 prompt_type=prompt_type,
315 **kwargs
316 )
318 if not rag_configs:
319 return {}, None
321 # Execute all RAG searches in parallel
322 if capture_metadata:
323 tasks = [
324 self._execute_single_rag_with_metadata(rag_config, params)
325 for rag_config in rag_configs
326 ]
327 else:
328 tasks = [
329 self._execute_single_rag_search_safe(rag_config, params)
330 for rag_config in rag_configs
331 ]
333 results = await asyncio.gather(*tasks, return_exceptions=True)
335 # Collect results
336 rag_content = {}
337 rag_metadata = {} if capture_metadata else None
339 for rag_config, result in zip(rag_configs, results):
340 placeholder = rag_config.get("placeholder", "RAG_CONTENT")
342 if isinstance(result, Exception):
343 error_msg = f"RAG search failed for {prompt_name}: {result}"
344 if self._raise_on_rag_error:
345 raise RuntimeError(error_msg) from result
346 else:
347 logger.warning(error_msg)
348 rag_content[placeholder] = ""
349 if capture_metadata:
350 from datetime import datetime
351 rag_metadata[placeholder] = {
352 "error": str(result),
353 "timestamp": datetime.now().isoformat()
354 }
355 else:
356 if capture_metadata:
357 formatted_content, metadata = result
358 rag_content[placeholder] = formatted_content
359 if metadata:
360 rag_metadata[placeholder] = metadata
361 else:
362 rag_content[placeholder] = result
364 return rag_content, rag_metadata
366 async def _execute_single_rag_search_safe(
367 self,
368 rag_config: RAGConfig,
369 params: Dict[str, Any]
370 ) -> str:
371 """Safely execute a single RAG search (for use with asyncio.gather).
373 Args:
374 rag_config: RAG configuration
375 params: Parameters for query templating
377 Returns:
378 Formatted RAG content string
380 Raises:
381 Exception: Propagated from _execute_single_rag_search
382 """
383 return await self._execute_single_rag_search(rag_config, params)
385 async def _execute_single_rag_search(
386 self,
387 rag_config: RAGConfig,
388 params: Dict[str, Any]
389 ) -> str:
390 """Execute a single RAG search and format results asynchronously.
392 Args:
393 rag_config: RAG configuration
394 params: Parameters for query templating
396 Returns:
397 Formatted RAG content string
399 Raises:
400 KeyError: If adapter not found
401 Exception: If search fails
402 """
403 # Get adapter
404 adapter_name = rag_config.get("adapter_name")
405 if not adapter_name:
406 raise ValueError("RAG config missing 'adapter_name'")
408 if adapter_name not in self.adapters:
409 raise KeyError(
410 f"Adapter '{adapter_name}' not found. "
411 f"Available adapters: {list(self.adapters.keys())}"
412 )
414 adapter = self.adapters[adapter_name]
416 # Render query template
417 query_template = rag_config.get("query", "")
418 query = self._render_rag_query(query_template, params)
420 # Execute search (async)
421 k = rag_config.get("k", 5)
422 filters = rag_config.get("filters")
423 search_results = await adapter.search(query=query, k=k, filters=filters)
425 # Format results
426 formatted_content = self._format_rag_results(
427 results=search_results,
428 rag_config=rag_config,
429 params=params
430 )
432 return formatted_content
434 async def _execute_single_rag_with_metadata(
435 self,
436 rag_config: RAGConfig,
437 params: Dict[str, Any]
438 ) -> tuple[str, Dict[str, Any]]:
439 """Execute a single RAG search with metadata capture.
441 This method executes a RAG search and captures detailed metadata
442 including the query, results, and query hash for caching.
444 Args:
445 rag_config: RAG configuration
446 params: Parameters for query templating
448 Returns:
449 Tuple of (formatted_content, metadata):
450 - formatted_content: Formatted RAG content string
451 - metadata: Dictionary with RAG metadata including:
452 - adapter_name: Name of the adapter used
453 - query: Rendered query string
454 - query_hash: SHA256 hash for cache matching
455 - k: Number of results requested
456 - filters: Filters applied to search
457 - timestamp: ISO format timestamp
458 - results: Raw search results
459 - formatted_content: Formatted output
460 - item_template: Template used for formatting
461 - header: Header text used
463 Raises:
464 KeyError: If adapter not found
465 Exception: If search fails
466 """
467 from datetime import datetime
469 # Get adapter
470 adapter_name = rag_config.get("adapter_name")
471 if not adapter_name:
472 raise ValueError("RAG config missing 'adapter_name'")
474 if adapter_name not in self.adapters:
475 raise KeyError(
476 f"Adapter '{adapter_name}' not found. "
477 f"Available adapters: {list(self.adapters.keys())}"
478 )
480 adapter = self.adapters[adapter_name]
482 # Render query template
483 query_template = rag_config.get("query", "")
484 query = self._render_rag_query(query_template, params)
486 # Compute query hash for cache matching
487 query_hash = self._compute_rag_query_hash(adapter_name, query)
489 # Execute search (async)
490 k = rag_config.get("k", 5)
491 filters = rag_config.get("filters")
492 search_results = await adapter.search(query=query, k=k, filters=filters)
494 # Format results
495 formatted_content = self._format_rag_results(
496 results=search_results,
497 rag_config=rag_config,
498 params=params
499 )
501 # Build metadata
502 metadata = {
503 "adapter_name": adapter_name,
504 "query": query,
505 "query_hash": query_hash,
506 "k": k,
507 "filters": filters,
508 "timestamp": datetime.now().isoformat(),
509 "results": search_results, # Store raw results
510 "formatted_content": formatted_content,
511 "item_template": rag_config.get("item_template"),
512 "header": rag_config.get("header"),
513 }
515 return formatted_content, metadata