Coverage for src/dataknobs_llm/prompts/builders/async_prompt_builder.py: 90%

114 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 13:51 -0700

1"""Asynchronous prompt builder for constructing prompts with parameter resolution and RAG. 

2 

3This module provides the AsyncPromptBuilder class which coordinates between: 

4- Prompt libraries (template sources) 

5- Async resource adapters (data sources for RAG) 

6- Template renderer (rendering engine) 

7 

8The async builder handles: 

9- Concurrent parameter resolution from multiple sources 

10- Parallel RAG content retrieval and injection 

11- RAG result caching for performance optimization 

12- Validation enforcement 

13- Template defaults merging 

14 

15All async operations use asyncio.gather() for maximum parallelism, making it 

16ideal for high-throughput applications with RAG-enhanced prompts. 

17 

18Key Features: 

19 - Parallel RAG searches across multiple data sources 

20 - RAG result caching to avoid redundant searches 

21 - Async-first design for optimal performance 

22 - Flexible parameter resolution from multiple adapters 

23 - Template validation with configurable levels 

24 - Metadata tracking for debugging and optimization 

25 

26Example: 

27 ```python 

28 from dataknobs_llm.prompts import AsyncPromptBuilder 

29 from dataknobs_llm.prompts import FileSystemPromptLibrary 

30 from dataknobs_llm.prompts.adapters import ( 

31 AsyncDictResourceAdapter, 

32 AsyncDataknobsBackendAdapter 

33 ) 

34 from dataknobs_data import database_factory 

35 

36 # Set up data sources for RAG 

37 docs_db = database_factory.create("vector", embedding_model="...") 

38 

39 # Create adapters 

40 adapters = { 

41 'config': AsyncDictResourceAdapter({ 

42 'company': 'Acme Corp', 

43 'domain': 'e-commerce' 

44 }), 

45 'docs': AsyncDataknobsBackendAdapter(docs_db) 

46 } 

47 

48 # Create builder with prompt library 

49 library = FileSystemPromptLibrary("prompts/") 

50 builder = AsyncPromptBuilder( 

51 library=library, 

52 adapters=adapters, 

53 default_validation=ValidationLevel.WARN 

54 ) 

55 

56 # Render prompt with RAG 

57 result = await builder.render_user_prompt( 

58 "analyze_code", 

59 index=0, 

60 params={ 

61 'code': code_snippet, 

62 'language': 'python' 

63 }, 

64 include_rag=True # Executes RAG searches in parallel 

65 ) 

66 

67 # Access rendered content 

68 print(result.content) 

69 

70 # Cache RAG results for reuse 

71 result_with_metadata = await builder.render_user_prompt( 

72 "analyze_code", 

73 params={'code': another_snippet, 'language': 'python'}, 

74 return_rag_metadata=True 

75 ) 

76 

77 # Reuse cached RAG (avoids re-searching) 

78 result2 = await builder.render_user_prompt( 

79 "analyze_code", 

80 params={'code': yet_another_snippet, 'language': 'python'}, 

81 cached_rag=result_with_metadata.rag_metadata 

82 ) 

83 

84 # Parallel rendering of multiple prompts 

85 results = await asyncio.gather( 

86 builder.render_system_prompt("helpful_assistant"), 

87 builder.render_user_prompt("user_query", params={'question': q}), 

88 builder.render_user_prompt("context_setter", params={'topic': t}) 

89 ) 

90 ``` 

91 

92See Also: 

93 - BasePromptBuilder: Base implementation with shared logic 

94 - PromptBuilder: Synchronous version for non-async applications 

95 - AsyncResourceAdapter: Adapter interface for async data sources 

96 - AbstractPromptLibrary: Prompt library interface 

97""" 

98 

99import asyncio 

100import logging 

101from typing import Any, Dict 

102 

103from ..base import ( 

104 AbstractPromptLibrary, 

105 PromptTemplateDict, 

106 RAGConfig, 

107 ValidationLevel, 

108 RenderResult, 

109) 

110from ..adapters import AsyncResourceAdapter 

111from .base_prompt_builder import BasePromptBuilder 

112 

113logger = logging.getLogger(__name__) 

114 

115 

116class AsyncPromptBuilder(BasePromptBuilder): 

117 """Asynchronous prompt builder for constructing prompts with RAG and validation. 

118 

119 This class provides a high-level async API for building prompts by: 

120 1. Retrieving prompt templates from a library 

121 2. Resolving parameters from adapters and runtime values (concurrently) 

122 3. Executing RAG searches via adapters (in parallel) 

123 4. Injecting RAG content into templates 

124 5. Rendering final prompts with validation 

125 

126 Example: 

127 >>> library = ConfigPromptLibrary(config) 

128 >>> adapters = { 

129 ... 'config': AsyncDictResourceAdapter(config_data), 

130 ... 'docs': AsyncDataknobsBackendAdapter(docs_db) 

131 ... } 

132 >>> builder = AsyncPromptBuilder(library=library, adapters=adapters) 

133 >>> 

134 >>> # Render a system prompt 

135 >>> result = await builder.render_system_prompt( 

136 ... 'analyze_code', 

137 ... params={'code': code_snippet, 'language': 'python'} 

138 ... ) 

139 """ 

140 

141 def __init__( 

142 self, 

143 library: AbstractPromptLibrary, 

144 adapters: Dict[str, AsyncResourceAdapter] | None = None, 

145 default_validation: ValidationLevel = ValidationLevel.WARN, 

146 raise_on_rag_error: bool = False 

147 ): 

148 """Initialize the asynchronous prompt builder. 

149 

150 Args: 

151 library: Prompt library to retrieve templates from 

152 adapters: Dictionary of named async resource adapters for parameter 

153 resolution and RAG searches 

154 default_validation: Default validation level for templates without 

155 explicit validation configuration 

156 raise_on_rag_error: If True, raise exceptions on RAG failures; 

157 if False (default), log warning and continue 

158 

159 Raises: 

160 TypeError: If any adapter is sync (use PromptBuilder instead) 

161 """ 

162 super().__init__(library, adapters, default_validation, raise_on_rag_error) 

163 self._validate_adapters() 

164 

165 def _validate_adapters(self) -> None: 

166 """Validate that all adapters are asynchronous. 

167 

168 Raises: 

169 TypeError: If any adapter is sync 

170 """ 

171 for name, adapter in self.adapters.items(): 

172 if not adapter.is_async(): 

173 raise TypeError( 

174 f"Adapter '{name}' is synchronous. " 

175 "Use PromptBuilder for sync adapters." 

176 ) 

177 

178 async def render_system_prompt( 

179 self, 

180 name: str, 

181 params: Dict[str, Any] | None = None, 

182 include_rag: bool = True, 

183 validation_override: ValidationLevel | None = None, 

184 return_rag_metadata: bool = False, 

185 cached_rag: Dict[str, Any] | None = None, 

186 **kwargs: Any 

187 ) -> RenderResult: 

188 """Render a system prompt with parameters and optional RAG content. 

189 

190 Args: 

191 name: System prompt identifier 

192 params: Runtime parameters to use in rendering 

193 include_rag: Whether to include RAG content (default: True) 

194 validation_override: Override validation level for this render 

195 return_rag_metadata: If True, attach RAG metadata to result 

196 cached_rag: If provided, use these cached RAG results instead 

197 of executing new searches 

198 **kwargs: Additional parameters passed to library 

199 

200 Returns: 

201 RenderResult with rendered content and metadata 

202 

203 Raises: 

204 ValueError: If prompt not found or validation fails 

205 

206 Example: 

207 >>> # Capture RAG metadata 

208 >>> result = await builder.render_system_prompt( 

209 ... 'code_question', 

210 ... params={'language': 'python'}, 

211 ... return_rag_metadata=True 

212 ... ) 

213 >>> print(result.rag_metadata) 

214 >>> 

215 >>> # Reuse cached RAG 

216 >>> result2 = await builder.render_system_prompt( 

217 ... 'code_question', 

218 ... params={'language': 'python'}, 

219 ... cached_rag=result.rag_metadata 

220 ... ) 

221 """ 

222 params = params or {} 

223 

224 # Retrieve template from library 

225 template_dict = self.library.get_system_prompt(name, **kwargs) 

226 if template_dict is None: 

227 raise ValueError(f"System prompt not found: {name}") 

228 

229 # Render the prompt 

230 return await self._render_prompt_impl( 

231 prompt_name=name, 

232 prompt_type="system", 

233 template_dict=template_dict, 

234 runtime_params=params, 

235 include_rag=include_rag, 

236 validation_override=validation_override, 

237 return_rag_metadata=return_rag_metadata, 

238 cached_rag=cached_rag, 

239 **kwargs 

240 ) 

241 

242 async def render_user_prompt( 

243 self, 

244 name: str, 

245 params: Dict[str, Any] | None = None, 

246 include_rag: bool = True, 

247 validation_override: ValidationLevel | None = None, 

248 return_rag_metadata: bool = False, 

249 cached_rag: Dict[str, Any] | None = None, 

250 **kwargs: Any 

251 ) -> RenderResult: 

252 """Render a user prompt with parameters and optional RAG content. 

253 

254 Args: 

255 name: User prompt identifier 

256 params: Runtime parameters to use in rendering 

257 include_rag: Whether to include RAG content (default: True) 

258 validation_override: Override validation level for this render 

259 return_rag_metadata: If True, attach RAG metadata to result 

260 cached_rag: If provided, use these cached RAG results instead 

261 of executing new searches 

262 **kwargs: Additional parameters passed to library 

263 

264 Returns: 

265 RenderResult with rendered content and metadata 

266 

267 Raises: 

268 ValueError: If prompt not found or validation fails 

269 """ 

270 params = params or {} 

271 

272 # Retrieve template from library 

273 template_dict = self.library.get_user_prompt(name, **kwargs) 

274 if template_dict is None: 

275 raise ValueError(f"User prompt not found: {name}") 

276 

277 # Render the prompt 

278 return await self._render_prompt_impl( 

279 prompt_name=name, 

280 prompt_type="user", 

281 template_dict=template_dict, 

282 runtime_params=params, 

283 include_rag=include_rag, 

284 validation_override=validation_override, 

285 return_rag_metadata=return_rag_metadata, 

286 cached_rag=cached_rag, 

287 **kwargs 

288 ) 

289 

290 async def _render_prompt_impl( 

291 self, 

292 prompt_name: str, 

293 prompt_type: str, 

294 template_dict: PromptTemplateDict, 

295 runtime_params: Dict[str, Any], 

296 include_rag: bool, 

297 validation_override: ValidationLevel | None, 

298 return_rag_metadata: bool = False, 

299 cached_rag: Dict[str, Any] | None = None, 

300 **kwargs: Any 

301 ) -> RenderResult: 

302 """Internal method to render a prompt template asynchronously. 

303 

304 Args: 

305 prompt_name: Name of the prompt 

306 prompt_type: Type of prompt ("system" or "user") 

307 template_dict: Template dictionary from library 

308 runtime_params: Runtime parameters 

309 include_rag: Whether to include RAG content 

310 validation_override: Validation level override 

311 return_rag_metadata: If True, capture and return RAG metadata 

312 cached_rag: If provided, use these cached RAG results instead 

313 of executing new searches 

314 **kwargs: Additional parameters 

315 

316 Returns: 

317 RenderResult with rendered content and metadata 

318 """ 

319 # Extract template components 

320 template = template_dict.get("template", "") 

321 template_metadata = template_dict.get("metadata", {}) 

322 

323 # Step 1: Merge defaults with runtime params 

324 all_params = self._merge_params_with_defaults(template_dict, runtime_params) 

325 

326 # Step 2: Execute or reuse RAG searches 

327 rag_metadata = None 

328 if include_rag: 

329 if cached_rag: 

330 # Use cached RAG results 

331 rag_content = self._extract_formatted_content_from_cache(cached_rag) 

332 if return_rag_metadata: 

333 rag_metadata = cached_rag # Pass through cached metadata 

334 else: 

335 # Execute fresh RAG searches 

336 rag_content, rag_metadata = await self._execute_rag_searches_impl( 

337 prompt_name=prompt_name, 

338 prompt_type=prompt_type, 

339 params=all_params, 

340 capture_metadata=return_rag_metadata, 

341 **kwargs 

342 ) 

343 

344 # Merge RAG content into parameters 

345 all_params.update(rag_content) 

346 

347 # Step 3: Prepare validation config with override 

348 validation_config = self._prepare_validation_config(template_dict, validation_override) 

349 

350 # Step 4: Render template with validation (synchronous) 

351 result = self._renderer.render( 

352 template=template, 

353 params=all_params, 

354 validation=validation_config, 

355 template_metadata=template_metadata 

356 ) 

357 

358 # Attach RAG metadata if requested 

359 if return_rag_metadata and rag_metadata: 

360 result.rag_metadata = rag_metadata 

361 

362 # Add builder metadata 

363 result.metadata.update({ 

364 "prompt_name": prompt_name, 

365 "prompt_type": prompt_type, 

366 "include_rag": include_rag, 

367 "used_cached_rag": cached_rag is not None, 

368 }) 

369 

370 return result 

371 

372 async def _execute_rag_searches_impl( 

373 self, 

374 prompt_name: str, 

375 prompt_type: str, 

376 params: Dict[str, Any], 

377 capture_metadata: bool = False, 

378 **kwargs: Any 

379 ) -> tuple[Dict[str, str], Dict[str, Any] | None]: 

380 """Execute RAG searches in parallel and format results for injection. 

381 

382 Args: 

383 prompt_name: Name of the prompt 

384 prompt_type: Type of prompt ("system" or "user") 

385 params: Resolved parameters for query templating 

386 capture_metadata: If True, capture RAG metadata 

387 **kwargs: Additional parameters 

388 

389 Returns: 

390 Tuple of (rag_content, rag_metadata): 

391 - rag_content: Dictionary mapping placeholder names to formatted content 

392 - rag_metadata: Optional dict with full RAG details (if capture_metadata=True) 

393 """ 

394 # Get RAG configurations for this prompt 

395 rag_configs = self.library.get_prompt_rag_configs( 

396 prompt_name=prompt_name, 

397 prompt_type=prompt_type, 

398 **kwargs 

399 ) 

400 

401 if not rag_configs: 

402 return {}, None 

403 

404 # Execute all RAG searches in parallel 

405 rag_content = {} 

406 rag_metadata = {} if capture_metadata else None 

407 

408 if capture_metadata: 

409 tasks_with_metadata = [ 

410 self._execute_single_rag_with_metadata(rag_config, params) 

411 for rag_config in rag_configs 

412 ] 

413 results_with_metadata = await asyncio.gather(*tasks_with_metadata, return_exceptions=True) 

414 

415 for rag_config, result in zip(rag_configs, results_with_metadata, strict=True): 

416 placeholder = rag_config.get("placeholder", "RAG_CONTENT") 

417 

418 if isinstance(result, BaseException): 

419 error_msg = f"RAG search failed for {prompt_name}: {result}" 

420 if self._raise_on_rag_error: 

421 raise RuntimeError(error_msg) from result 

422 else: 

423 logger.warning(error_msg) 

424 rag_content[placeholder] = "" 

425 if rag_metadata is not None: 

426 from datetime import datetime 

427 rag_metadata[placeholder] = { 

428 "error": str(result), 

429 "timestamp": datetime.now().isoformat() 

430 } 

431 else: 

432 formatted_content, metadata = result 

433 rag_content[placeholder] = formatted_content 

434 if metadata and rag_metadata is not None: 

435 rag_metadata[placeholder] = metadata 

436 else: 

437 tasks_no_metadata = [ 

438 self._execute_single_rag_search_safe(rag_config, params) 

439 for rag_config in rag_configs 

440 ] 

441 results_no_metadata = await asyncio.gather(*tasks_no_metadata, return_exceptions=True) 

442 

443 for rag_config, result in zip(rag_configs, results_no_metadata, strict=True): 

444 placeholder = rag_config.get("placeholder", "RAG_CONTENT") 

445 

446 if isinstance(result, BaseException): 

447 error_msg = f"RAG search failed for {prompt_name}: {result}" 

448 if self._raise_on_rag_error: 

449 raise RuntimeError(error_msg) from result 

450 else: 

451 logger.warning(error_msg) 

452 rag_content[placeholder] = "" 

453 else: 

454 rag_content[placeholder] = result 

455 

456 return rag_content, rag_metadata 

457 

458 async def _execute_single_rag_search_safe( 

459 self, 

460 rag_config: RAGConfig, 

461 params: Dict[str, Any] 

462 ) -> str: 

463 """Safely execute a single RAG search (for use with asyncio.gather). 

464 

465 Args: 

466 rag_config: RAG configuration 

467 params: Parameters for query templating 

468 

469 Returns: 

470 Formatted RAG content string 

471 

472 Raises: 

473 Exception: Propagated from _execute_single_rag_search 

474 """ 

475 return await self._execute_single_rag_search(rag_config, params) 

476 

477 async def _execute_single_rag_search( 

478 self, 

479 rag_config: RAGConfig, 

480 params: Dict[str, Any] 

481 ) -> str: 

482 """Execute a single RAG search and format results asynchronously. 

483 

484 Args: 

485 rag_config: RAG configuration 

486 params: Parameters for query templating 

487 

488 Returns: 

489 Formatted RAG content string 

490 

491 Raises: 

492 KeyError: If adapter not found 

493 Exception: If search fails 

494 """ 

495 # Get adapter 

496 adapter_name = rag_config.get("adapter_name") 

497 if not adapter_name: 

498 raise ValueError("RAG config missing 'adapter_name'") 

499 

500 if adapter_name not in self.adapters: 

501 raise KeyError( 

502 f"Adapter '{adapter_name}' not found. " 

503 f"Available adapters: {list(self.adapters.keys())}" 

504 ) 

505 

506 adapter = self.adapters[adapter_name] 

507 

508 # Render query template 

509 query_template = rag_config.get("query", "") 

510 query = self._render_rag_query(query_template, params) 

511 

512 # Execute search (async) 

513 k = rag_config.get("k", 5) 

514 filters = rag_config.get("filters") 

515 search_results = await adapter.search(query=query, k=k, filters=filters) 

516 

517 # Format results 

518 formatted_content = self._format_rag_results( 

519 results=search_results, 

520 rag_config=rag_config, 

521 params=params 

522 ) 

523 

524 return formatted_content 

525 

526 async def _execute_single_rag_with_metadata( 

527 self, 

528 rag_config: RAGConfig, 

529 params: Dict[str, Any] 

530 ) -> tuple[str, Dict[str, Any]]: 

531 """Execute a single RAG search with metadata capture. 

532 

533 This method executes a RAG search and captures detailed metadata 

534 including the query, results, and query hash for caching. 

535 

536 Args: 

537 rag_config: RAG configuration 

538 params: Parameters for query templating 

539 

540 Returns: 

541 Tuple of (formatted_content, metadata): 

542 - formatted_content: Formatted RAG content string 

543 - metadata: Dictionary with RAG metadata including: 

544 - adapter_name: Name of the adapter used 

545 - query: Rendered query string 

546 - query_hash: SHA256 hash for cache matching 

547 - k: Number of results requested 

548 - filters: Filters applied to search 

549 - timestamp: ISO format timestamp 

550 - results: Raw search results 

551 - formatted_content: Formatted output 

552 - item_template: Template used for formatting 

553 - header: Header text used 

554 

555 Raises: 

556 KeyError: If adapter not found 

557 Exception: If search fails 

558 """ 

559 from datetime import datetime 

560 

561 # Get adapter 

562 adapter_name = rag_config.get("adapter_name") 

563 if not adapter_name: 

564 raise ValueError("RAG config missing 'adapter_name'") 

565 

566 if adapter_name not in self.adapters: 

567 raise KeyError( 

568 f"Adapter '{adapter_name}' not found. " 

569 f"Available adapters: {list(self.adapters.keys())}" 

570 ) 

571 

572 adapter = self.adapters[adapter_name] 

573 

574 # Render query template 

575 query_template = rag_config.get("query", "") 

576 query = self._render_rag_query(query_template, params) 

577 

578 # Compute query hash for cache matching 

579 query_hash = self._compute_rag_query_hash(adapter_name, query) 

580 

581 # Execute search (async) 

582 k = rag_config.get("k", 5) 

583 filters = rag_config.get("filters") 

584 search_results = await adapter.search(query=query, k=k, filters=filters) 

585 

586 # Format results 

587 formatted_content = self._format_rag_results( 

588 results=search_results, 

589 rag_config=rag_config, 

590 params=params 

591 ) 

592 

593 # Build metadata 

594 metadata = { 

595 "adapter_name": adapter_name, 

596 "query": query, 

597 "query_hash": query_hash, 

598 "k": k, 

599 "filters": filters, 

600 "timestamp": datetime.now().isoformat(), 

601 "results": search_results, # Store raw results 

602 "formatted_content": formatted_content, 

603 "item_template": rag_config.get("item_template"), 

604 "header": rag_config.get("header"), 

605 } 

606 

607 return formatted_content, metadata