Coverage for src / dataknobs_llm / prompts / builders / async_prompt_builder.py: 18%

123 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 10:29 -0700

1"""Asynchronous prompt builder for constructing prompts with parameter resolution and RAG. 

2 

3This module provides the AsyncPromptBuilder class which coordinates between: 

4- Prompt libraries (template sources) 

5- Async resource adapters (data sources for RAG) 

6- Template renderer (rendering engine) 

7 

8The async builder handles: 

9- Concurrent parameter resolution from multiple sources 

10- Parallel RAG content retrieval and injection 

11- RAG result caching for performance optimization 

12- Validation enforcement 

13- Template defaults merging 

14 

15All async operations use asyncio.gather() for maximum parallelism, making it 

16ideal for high-throughput applications with RAG-enhanced prompts. 

17 

18Key Features: 

19 - Parallel RAG searches across multiple data sources 

20 - RAG result caching to avoid redundant searches 

21 - Async-first design for optimal performance 

22 - Flexible parameter resolution from multiple adapters 

23 - Template validation with configurable levels 

24 - Metadata tracking for debugging and optimization 

25 

26Example: 

27 ```python 

28 from dataknobs_llm.prompts import AsyncPromptBuilder 

29 from dataknobs_llm.prompts import FileSystemPromptLibrary 

30 from dataknobs_llm.prompts.adapters import ( 

31 AsyncDictResourceAdapter, 

32 AsyncDataknobsBackendAdapter 

33 ) 

34 from dataknobs_data import database_factory 

35 

36 # Set up data sources for RAG 

37 docs_db = database_factory.create("vector", embedding_model="...") 

38 

39 # Create adapters 

40 adapters = { 

41 'config': AsyncDictResourceAdapter({ 

42 'company': 'Acme Corp', 

43 'domain': 'e-commerce' 

44 }), 

45 'docs': AsyncDataknobsBackendAdapter(docs_db) 

46 } 

47 

48 # Create builder with prompt library 

49 library = FileSystemPromptLibrary("prompts/") 

50 builder = AsyncPromptBuilder( 

51 library=library, 

52 adapters=adapters, 

53 default_validation=ValidationLevel.WARN 

54 ) 

55 

56 # Render prompt with RAG 

57 result = await builder.render_user_prompt( 

58 "analyze_code", 

59 index=0, 

60 params={ 

61 'code': code_snippet, 

62 'language': 'python' 

63 }, 

64 include_rag=True # Executes RAG searches in parallel 

65 ) 

66 

67 # Access rendered content 

68 print(result.content) 

69 

70 # Cache RAG results for reuse 

71 result_with_metadata = await builder.render_user_prompt( 

72 "analyze_code", 

73 params={'code': another_snippet, 'language': 'python'}, 

74 return_rag_metadata=True 

75 ) 

76 

77 # Reuse cached RAG (avoids re-searching) 

78 result2 = await builder.render_user_prompt( 

79 "analyze_code", 

80 params={'code': yet_another_snippet, 'language': 'python'}, 

81 cached_rag=result_with_metadata.rag_metadata 

82 ) 

83 

84 # Parallel rendering of multiple prompts 

85 results = await asyncio.gather( 

86 builder.render_system_prompt("helpful_assistant"), 

87 builder.render_user_prompt("user_query", params={'question': q}), 

88 builder.render_user_prompt("context_setter", params={'topic': t}) 

89 ) 

90 ``` 

91 

92See Also: 

93 - BasePromptBuilder: Base implementation with shared logic 

94 - PromptBuilder: Synchronous version for non-async applications 

95 - AsyncResourceAdapter: Adapter interface for async data sources 

96 - AbstractPromptLibrary: Prompt library interface 

97""" 

98 

99import asyncio 

100import logging 

101from typing import Any, Dict 

102 

103from ..base import ( 

104 AbstractPromptLibrary, 

105 PromptTemplateDict, 

106 RAGConfig, 

107 ValidationLevel, 

108 RenderResult, 

109) 

110from ..adapters import AsyncResourceAdapter 

111from .base_prompt_builder import BasePromptBuilder 

112 

113logger = logging.getLogger(__name__) 

114 

115 

116class AsyncPromptBuilder(BasePromptBuilder): 

117 """Asynchronous prompt builder for constructing prompts with RAG and validation. 

118 

119 This class provides a high-level async API for building prompts by: 

120 1. Retrieving prompt templates from a library 

121 2. Resolving parameters from adapters and runtime values (concurrently) 

122 3. Executing RAG searches via adapters (in parallel) 

123 4. Injecting RAG content into templates 

124 5. Rendering final prompts with validation 

125 

126 Example: 

127 >>> library = ConfigPromptLibrary(config) 

128 >>> adapters = { 

129 ... 'config': AsyncDictResourceAdapter(config_data), 

130 ... 'docs': AsyncDataknobsBackendAdapter(docs_db) 

131 ... } 

132 >>> builder = AsyncPromptBuilder(library=library, adapters=adapters) 

133 >>> 

134 >>> # Render a system prompt 

135 >>> result = await builder.render_system_prompt( 

136 ... 'analyze_code', 

137 ... params={'code': code_snippet, 'language': 'python'} 

138 ... ) 

139 """ 

140 

141 def __init__( 

142 self, 

143 library: AbstractPromptLibrary, 

144 adapters: Dict[str, AsyncResourceAdapter] | None = None, 

145 default_validation: ValidationLevel = ValidationLevel.WARN, 

146 raise_on_rag_error: bool = False 

147 ): 

148 """Initialize the asynchronous prompt builder. 

149 

150 Args: 

151 library: Prompt library to retrieve templates from 

152 adapters: Dictionary of named async resource adapters for parameter 

153 resolution and RAG searches 

154 default_validation: Default validation level for templates without 

155 explicit validation configuration 

156 raise_on_rag_error: If True, raise exceptions on RAG failures; 

157 if False (default), log warning and continue 

158 

159 Raises: 

160 TypeError: If any adapter is sync (use PromptBuilder instead) 

161 """ 

162 super().__init__(library, adapters, default_validation, raise_on_rag_error) 

163 self._validate_adapters() 

164 

165 def _validate_adapters(self) -> None: 

166 """Validate that all adapters are asynchronous. 

167 

168 Raises: 

169 TypeError: If any adapter is sync 

170 """ 

171 for name, adapter in self.adapters.items(): 

172 if not adapter.is_async(): 

173 raise TypeError( 

174 f"Adapter '{name}' is synchronous. " 

175 "Use PromptBuilder for sync adapters." 

176 ) 

177 

178 async def render_system_prompt( 

179 self, 

180 name: str, 

181 params: Dict[str, Any] | None = None, 

182 include_rag: bool = True, 

183 validation_override: ValidationLevel | None = None, 

184 return_rag_metadata: bool = False, 

185 cached_rag: Dict[str, Any] | None = None, 

186 **kwargs: Any 

187 ) -> RenderResult: 

188 """Render a system prompt with parameters and optional RAG content. 

189 

190 Args: 

191 name: System prompt identifier 

192 params: Runtime parameters to use in rendering 

193 include_rag: Whether to include RAG content (default: True) 

194 validation_override: Override validation level for this render 

195 return_rag_metadata: If True, attach RAG metadata to result 

196 cached_rag: If provided, use these cached RAG results instead 

197 of executing new searches 

198 **kwargs: Additional parameters passed to library 

199 

200 Returns: 

201 RenderResult with rendered content and metadata 

202 

203 Raises: 

204 ValueError: If prompt not found or validation fails 

205 

206 Example: 

207 >>> # Capture RAG metadata 

208 >>> result = await builder.render_system_prompt( 

209 ... 'code_question', 

210 ... params={'language': 'python'}, 

211 ... return_rag_metadata=True 

212 ... ) 

213 >>> print(result.rag_metadata) 

214 >>> 

215 >>> # Reuse cached RAG 

216 >>> result2 = await builder.render_system_prompt( 

217 ... 'code_question', 

218 ... params={'language': 'python'}, 

219 ... cached_rag=result.rag_metadata 

220 ... ) 

221 """ 

222 params = params or {} 

223 

224 # Retrieve template from library 

225 template_dict = self.library.get_system_prompt(name, **kwargs) 

226 if template_dict is None: 

227 raise ValueError(f"System prompt not found: {name}") 

228 

229 # Render the prompt 

230 return await self._render_prompt_impl( 

231 prompt_name=name, 

232 prompt_type="system", 

233 template_dict=template_dict, 

234 runtime_params=params, 

235 include_rag=include_rag, 

236 validation_override=validation_override, 

237 return_rag_metadata=return_rag_metadata, 

238 cached_rag=cached_rag, 

239 **kwargs 

240 ) 

241 

242 async def render_user_prompt( 

243 self, 

244 name: str, 

245 params: Dict[str, Any] | None = None, 

246 include_rag: bool = True, 

247 validation_override: ValidationLevel | None = None, 

248 return_rag_metadata: bool = False, 

249 cached_rag: Dict[str, Any] | None = None, 

250 **kwargs: Any 

251 ) -> RenderResult: 

252 """Render a user prompt with parameters and optional RAG content. 

253 

254 Args: 

255 name: User prompt identifier 

256 params: Runtime parameters to use in rendering 

257 include_rag: Whether to include RAG content (default: True) 

258 validation_override: Override validation level for this render 

259 return_rag_metadata: If True, attach RAG metadata to result 

260 cached_rag: If provided, use these cached RAG results instead 

261 of executing new searches 

262 **kwargs: Additional parameters passed to library 

263 

264 Returns: 

265 RenderResult with rendered content and metadata 

266 

267 Raises: 

268 ValueError: If prompt not found or validation fails 

269 """ 

270 params = params or {} 

271 

272 # Retrieve template from library 

273 template_dict = self.library.get_user_prompt(name, **kwargs) 

274 if template_dict is None: 

275 raise ValueError(f"User prompt not found: {name}") 

276 

277 # Render the prompt 

278 return await self._render_prompt_impl( 

279 prompt_name=name, 

280 prompt_type="user", 

281 template_dict=template_dict, 

282 runtime_params=params, 

283 include_rag=include_rag, 

284 validation_override=validation_override, 

285 return_rag_metadata=return_rag_metadata, 

286 cached_rag=cached_rag, 

287 **kwargs 

288 ) 

289 

290 async def render_inline_system_prompt( 

291 self, 

292 content: str, 

293 params: Dict[str, Any] | None = None, 

294 rag_configs: list[RAGConfig] | None = None, 

295 include_rag: bool = True, 

296 validation_override: ValidationLevel | None = None, 

297 return_rag_metadata: bool = False, 

298 cached_rag: Dict[str, Any] | None = None, 

299 **kwargs: Any, 

300 ) -> RenderResult: 

301 """Render inline system prompt content with optional RAG enhancement. 

302 

303 This method allows users to provide raw prompt content while still 

304 benefiting from RAG context injection and template parameter substitution. 

305 

306 Args: 

307 content: The raw system prompt content (may contain Jinja2 templates) 

308 params: Runtime parameters for template rendering 

309 rag_configs: Optional RAG configurations for context retrieval 

310 include_rag: Whether to execute RAG queries (default: True) 

311 validation_override: Override default validation level 

312 return_rag_metadata: Include RAG metadata in result 

313 cached_rag: Pre-cached RAG results to reuse 

314 **kwargs: Additional parameters 

315 

316 Returns: 

317 RenderResult with rendered content and metadata 

318 

319 Example: 

320 >>> result = await builder.render_inline_system_prompt( 

321 ... content="You are a helpful {{ role }} assistant.", 

322 ... params={"role": "coding"}, 

323 ... rag_configs=[{ 

324 ... "adapter_name": "docs", 

325 ... "query": "{{ topic }}", 

326 ... "placeholder": "CONTEXT", 

327 ... "k": 3 

328 ... }] 

329 ... ) 

330 """ 

331 # Construct a template_dict from inline content 

332 template_dict: PromptTemplateDict = { 

333 "template": content, 

334 "defaults": {}, 

335 "metadata": {"source": "inline", "type": "system"}, 

336 "rag_configs": rag_configs or [], 

337 } 

338 

339 return await self._render_prompt_impl( 

340 prompt_name="<inline:system>", 

341 prompt_type="system", 

342 template_dict=template_dict, 

343 runtime_params=params or {}, 

344 include_rag=include_rag and bool(rag_configs), 

345 validation_override=validation_override, 

346 return_rag_metadata=return_rag_metadata, 

347 cached_rag=cached_rag, 

348 **kwargs, 

349 ) 

350 

351 async def render_inline_user_prompt( 

352 self, 

353 content: str, 

354 params: Dict[str, Any] | None = None, 

355 rag_configs: list[RAGConfig] | None = None, 

356 include_rag: bool = True, 

357 validation_override: ValidationLevel | None = None, 

358 return_rag_metadata: bool = False, 

359 cached_rag: Dict[str, Any] | None = None, 

360 **kwargs: Any, 

361 ) -> RenderResult: 

362 """Render inline user prompt content with optional RAG enhancement. 

363 

364 This method allows users to provide raw prompt content while still 

365 benefiting from RAG context injection and template parameter substitution. 

366 

367 Args: 

368 content: The raw user prompt content (may contain Jinja2 templates) 

369 params: Runtime parameters for template rendering 

370 rag_configs: Optional RAG configurations for context retrieval 

371 include_rag: Whether to execute RAG queries (default: True) 

372 validation_override: Override default validation level 

373 return_rag_metadata: Include RAG metadata in result 

374 cached_rag: Pre-cached RAG results to reuse 

375 **kwargs: Additional parameters 

376 

377 Returns: 

378 RenderResult with rendered content and metadata 

379 

380 Example: 

381 >>> result = await builder.render_inline_user_prompt( 

382 ... content="Help me understand {{ topic }}", 

383 ... params={"topic": "decorators"}, 

384 ... rag_configs=[{ 

385 ... "adapter_name": "docs", 

386 ... "query": "python {{ topic }} tutorial", 

387 ... "placeholder": "DOCS", 

388 ... "k": 5 

389 ... }] 

390 ... ) 

391 """ 

392 # Construct a template_dict from inline content 

393 template_dict: PromptTemplateDict = { 

394 "template": content, 

395 "defaults": {}, 

396 "metadata": {"source": "inline", "type": "user"}, 

397 "rag_configs": rag_configs or [], 

398 } 

399 

400 return await self._render_prompt_impl( 

401 prompt_name="<inline:user>", 

402 prompt_type="user", 

403 template_dict=template_dict, 

404 runtime_params=params or {}, 

405 include_rag=include_rag and bool(rag_configs), 

406 validation_override=validation_override, 

407 return_rag_metadata=return_rag_metadata, 

408 cached_rag=cached_rag, 

409 **kwargs, 

410 ) 

411 

412 async def _render_prompt_impl( 

413 self, 

414 prompt_name: str, 

415 prompt_type: str, 

416 template_dict: PromptTemplateDict, 

417 runtime_params: Dict[str, Any], 

418 include_rag: bool, 

419 validation_override: ValidationLevel | None, 

420 return_rag_metadata: bool = False, 

421 cached_rag: Dict[str, Any] | None = None, 

422 **kwargs: Any 

423 ) -> RenderResult: 

424 """Internal method to render a prompt template asynchronously. 

425 

426 Args: 

427 prompt_name: Name of the prompt 

428 prompt_type: Type of prompt ("system" or "user") 

429 template_dict: Template dictionary from library 

430 runtime_params: Runtime parameters 

431 include_rag: Whether to include RAG content 

432 validation_override: Validation level override 

433 return_rag_metadata: If True, capture and return RAG metadata 

434 cached_rag: If provided, use these cached RAG results instead 

435 of executing new searches 

436 **kwargs: Additional parameters 

437 

438 Returns: 

439 RenderResult with rendered content and metadata 

440 """ 

441 # Extract template components 

442 template = template_dict.get("template", "") 

443 template_metadata = template_dict.get("metadata", {}) 

444 

445 # Extract RAG configs from template_dict (for inline prompts) 

446 # or None to let _execute_rag_searches_impl fetch from library 

447 inline_rag_configs = template_dict.get("rag_configs") 

448 

449 # Step 1: Merge defaults with runtime params 

450 all_params = self._merge_params_with_defaults(template_dict, runtime_params) 

451 

452 # Step 2: Execute or reuse RAG searches 

453 rag_metadata = None 

454 if include_rag: 

455 if cached_rag: 

456 # Use cached RAG results 

457 rag_content = self._extract_formatted_content_from_cache(cached_rag) 

458 if return_rag_metadata: 

459 rag_metadata = cached_rag # Pass through cached metadata 

460 else: 

461 # Execute fresh RAG searches 

462 rag_content, rag_metadata = await self._execute_rag_searches_impl( 

463 prompt_name=prompt_name, 

464 prompt_type=prompt_type, 

465 params=all_params, 

466 capture_metadata=return_rag_metadata, 

467 rag_configs_override=inline_rag_configs, 

468 **kwargs 

469 ) 

470 

471 # Merge RAG content into parameters 

472 all_params.update(rag_content) 

473 

474 # Step 3: Prepare validation config with override 

475 validation_config = self._prepare_validation_config(template_dict, validation_override) 

476 

477 # Step 4: Render template with validation (synchronous) 

478 result = self._renderer.render( 

479 template=template, 

480 params=all_params, 

481 validation=validation_config, 

482 template_metadata=template_metadata 

483 ) 

484 

485 # Attach RAG metadata if requested 

486 if return_rag_metadata and rag_metadata: 

487 result.rag_metadata = rag_metadata 

488 

489 # Add builder metadata 

490 result.metadata.update({ 

491 "prompt_name": prompt_name, 

492 "prompt_type": prompt_type, 

493 "include_rag": include_rag, 

494 "used_cached_rag": cached_rag is not None, 

495 }) 

496 

497 return result 

498 

499 async def _execute_rag_searches_impl( 

500 self, 

501 prompt_name: str, 

502 prompt_type: str, 

503 params: Dict[str, Any], 

504 capture_metadata: bool = False, 

505 rag_configs_override: list[RAGConfig] | None = None, 

506 **kwargs: Any 

507 ) -> tuple[Dict[str, str], Dict[str, Any] | None]: 

508 """Execute RAG searches in parallel and format results for injection. 

509 

510 Args: 

511 prompt_name: Name of the prompt 

512 prompt_type: Type of prompt ("system" or "user") 

513 params: Resolved parameters for query templating 

514 capture_metadata: If True, capture RAG metadata 

515 rag_configs_override: If provided, use these RAG configs instead of 

516 fetching from the library (for inline prompts) 

517 **kwargs: Additional parameters 

518 

519 Returns: 

520 Tuple of (rag_content, rag_metadata): 

521 - rag_content: Dictionary mapping placeholder names to formatted content 

522 - rag_metadata: Optional dict with full RAG details (if capture_metadata=True) 

523 """ 

524 # Get RAG configurations - use override if provided, otherwise fetch from library 

525 if rag_configs_override is not None: 

526 rag_configs = rag_configs_override 

527 else: 

528 rag_configs = self.library.get_prompt_rag_configs( 

529 prompt_name=prompt_name, 

530 prompt_type=prompt_type, 

531 **kwargs 

532 ) 

533 

534 if not rag_configs: 

535 return {}, None 

536 

537 # Execute all RAG searches in parallel 

538 rag_content = {} 

539 rag_metadata = {} if capture_metadata else None 

540 

541 if capture_metadata: 

542 tasks_with_metadata = [ 

543 self._execute_single_rag_with_metadata(rag_config, params) 

544 for rag_config in rag_configs 

545 ] 

546 results_with_metadata = await asyncio.gather(*tasks_with_metadata, return_exceptions=True) 

547 

548 for rag_config, result in zip(rag_configs, results_with_metadata, strict=True): 

549 placeholder = rag_config.get("placeholder", "RAG_CONTENT") 

550 

551 if isinstance(result, BaseException): 

552 error_msg = f"RAG search failed for {prompt_name}: {result}" 

553 if self._raise_on_rag_error: 

554 raise RuntimeError(error_msg) from result 

555 else: 

556 logger.warning(error_msg) 

557 rag_content[placeholder] = "" 

558 if rag_metadata is not None: 

559 from datetime import datetime 

560 rag_metadata[placeholder] = { 

561 "error": str(result), 

562 "timestamp": datetime.now().isoformat() 

563 } 

564 else: 

565 formatted_content, metadata = result 

566 rag_content[placeholder] = formatted_content 

567 if metadata and rag_metadata is not None: 

568 rag_metadata[placeholder] = metadata 

569 else: 

570 tasks_no_metadata = [ 

571 self._execute_single_rag_search_safe(rag_config, params) 

572 for rag_config in rag_configs 

573 ] 

574 results_no_metadata = await asyncio.gather(*tasks_no_metadata, return_exceptions=True) 

575 

576 for rag_config, result in zip(rag_configs, results_no_metadata, strict=True): 

577 placeholder = rag_config.get("placeholder", "RAG_CONTENT") 

578 

579 if isinstance(result, BaseException): 

580 error_msg = f"RAG search failed for {prompt_name}: {result}" 

581 if self._raise_on_rag_error: 

582 raise RuntimeError(error_msg) from result 

583 else: 

584 logger.warning(error_msg) 

585 rag_content[placeholder] = "" 

586 else: 

587 rag_content[placeholder] = result 

588 

589 return rag_content, rag_metadata 

590 

591 async def _execute_single_rag_search_safe( 

592 self, 

593 rag_config: RAGConfig, 

594 params: Dict[str, Any] 

595 ) -> str: 

596 """Safely execute a single RAG search (for use with asyncio.gather). 

597 

598 Args: 

599 rag_config: RAG configuration 

600 params: Parameters for query templating 

601 

602 Returns: 

603 Formatted RAG content string 

604 

605 Raises: 

606 Exception: Propagated from _execute_single_rag_search 

607 """ 

608 return await self._execute_single_rag_search(rag_config, params) 

609 

610 async def _execute_single_rag_search( 

611 self, 

612 rag_config: RAGConfig, 

613 params: Dict[str, Any] 

614 ) -> str: 

615 """Execute a single RAG search and format results asynchronously. 

616 

617 Args: 

618 rag_config: RAG configuration 

619 params: Parameters for query templating 

620 

621 Returns: 

622 Formatted RAG content string 

623 

624 Raises: 

625 KeyError: If adapter not found 

626 Exception: If search fails 

627 """ 

628 # Get adapter 

629 adapter_name = rag_config.get("adapter_name") 

630 if not adapter_name: 

631 raise ValueError("RAG config missing 'adapter_name'") 

632 

633 if adapter_name not in self.adapters: 

634 raise KeyError( 

635 f"Adapter '{adapter_name}' not found. " 

636 f"Available adapters: {list(self.adapters.keys())}" 

637 ) 

638 

639 adapter = self.adapters[adapter_name] 

640 

641 # Render query template 

642 query_template = rag_config.get("query", "") 

643 query = self._render_rag_query(query_template, params) 

644 

645 # Execute search (async) 

646 k = rag_config.get("k", 5) 

647 filters = rag_config.get("filters") 

648 search_results = await adapter.search(query=query, k=k, filters=filters) 

649 

650 # Format results 

651 formatted_content = self._format_rag_results( 

652 results=search_results, 

653 rag_config=rag_config, 

654 params=params 

655 ) 

656 

657 return formatted_content 

658 

659 async def _execute_single_rag_with_metadata( 

660 self, 

661 rag_config: RAGConfig, 

662 params: Dict[str, Any] 

663 ) -> tuple[str, Dict[str, Any]]: 

664 """Execute a single RAG search with metadata capture. 

665 

666 This method executes a RAG search and captures detailed metadata 

667 including the query, results, and query hash for caching. 

668 

669 Args: 

670 rag_config: RAG configuration 

671 params: Parameters for query templating 

672 

673 Returns: 

674 Tuple of (formatted_content, metadata): 

675 - formatted_content: Formatted RAG content string 

676 - metadata: Dictionary with RAG metadata including: 

677 - adapter_name: Name of the adapter used 

678 - query: Rendered query string 

679 - query_hash: SHA256 hash for cache matching 

680 - k: Number of results requested 

681 - filters: Filters applied to search 

682 - timestamp: ISO format timestamp 

683 - results: Raw search results 

684 - formatted_content: Formatted output 

685 - item_template: Template used for formatting 

686 - header: Header text used 

687 

688 Raises: 

689 KeyError: If adapter not found 

690 Exception: If search fails 

691 """ 

692 from datetime import datetime 

693 

694 # Get adapter 

695 adapter_name = rag_config.get("adapter_name") 

696 if not adapter_name: 

697 raise ValueError("RAG config missing 'adapter_name'") 

698 

699 if adapter_name not in self.adapters: 

700 raise KeyError( 

701 f"Adapter '{adapter_name}' not found. " 

702 f"Available adapters: {list(self.adapters.keys())}" 

703 ) 

704 

705 adapter = self.adapters[adapter_name] 

706 

707 # Render query template 

708 query_template = rag_config.get("query", "") 

709 query = self._render_rag_query(query_template, params) 

710 

711 # Compute query hash for cache matching 

712 query_hash = self._compute_rag_query_hash(adapter_name, query) 

713 

714 # Execute search (async) 

715 k = rag_config.get("k", 5) 

716 filters = rag_config.get("filters") 

717 search_results = await adapter.search(query=query, k=k, filters=filters) 

718 

719 # Format results 

720 formatted_content = self._format_rag_results( 

721 results=search_results, 

722 rag_config=rag_config, 

723 params=params 

724 ) 

725 

726 # Build metadata 

727 metadata = { 

728 "adapter_name": adapter_name, 

729 "query": query, 

730 "query_hash": query_hash, 

731 "k": k, 

732 "filters": filters, 

733 "timestamp": datetime.now().isoformat(), 

734 "results": search_results, # Store raw results 

735 "formatted_content": formatted_content, 

736 "item_template": rag_config.get("item_template"), 

737 "header": rag_config.get("header"), 

738 } 

739 

740 return formatted_content, metadata