Coverage for src/dataknobs_llm/prompts/builders/async_prompt_builder.py: 16%

106 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-31 16:04 -0600

1"""Asynchronous prompt builder for constructing prompts with parameter resolution and RAG. 

2 

3This module provides the AsyncPromptBuilder class which coordinates between: 

4- Prompt libraries (template sources) 

5- Async resource adapters (data sources) 

6- Template renderer (rendering engine) 

7 

8The async builder handles: 

9- Concurrent parameter resolution from multiple sources 

10- Parallel RAG content retrieval and injection 

11- Validation enforcement 

12- Template defaults merging 

13 

14All async operations use asyncio.gather() for maximum parallelism. 

15""" 

16 

17import asyncio 

18import logging 

19from typing import Any, Dict, List, Optional 

20 

21from ..base import ( 

22 PromptTemplate, 

23 RAGConfig, 

24 ValidationLevel, 

25 RenderResult, 

26) 

27from ..adapters import AsyncResourceAdapter 

28from .base_prompt_builder import BasePromptBuilder 

29 

30logger = logging.getLogger(__name__) 

31 

32 

33class AsyncPromptBuilder(BasePromptBuilder): 

34 """Asynchronous prompt builder for constructing prompts with RAG and validation. 

35 

36 This class provides a high-level async API for building prompts by: 

37 1. Retrieving prompt templates from a library 

38 2. Resolving parameters from adapters and runtime values (concurrently) 

39 3. Executing RAG searches via adapters (in parallel) 

40 4. Injecting RAG content into templates 

41 5. Rendering final prompts with validation 

42 

43 Example: 

44 >>> library = ConfigPromptLibrary(config) 

45 >>> adapters = { 

46 ... 'config': AsyncDictResourceAdapter(config_data), 

47 ... 'docs': AsyncDataknobsBackendAdapter(docs_db) 

48 ... } 

49 >>> builder = AsyncPromptBuilder(library=library, adapters=adapters) 

50 >>> 

51 >>> # Render a system prompt 

52 >>> result = await builder.render_system_prompt( 

53 ... 'analyze_code', 

54 ... params={'code': code_snippet, 'language': 'python'} 

55 ... ) 

56 """ 

57 

58 def __init__( 

59 self, 

60 library, 

61 adapters: Optional[Dict[str, AsyncResourceAdapter]] = None, 

62 default_validation: ValidationLevel = ValidationLevel.WARN, 

63 raise_on_rag_error: bool = False 

64 ): 

65 """Initialize the asynchronous prompt builder. 

66 

67 Args: 

68 library: Prompt library to retrieve templates from 

69 adapters: Dictionary of named async resource adapters for parameter 

70 resolution and RAG searches 

71 default_validation: Default validation level for templates without 

72 explicit validation configuration 

73 raise_on_rag_error: If True, raise exceptions on RAG failures; 

74 if False (default), log warning and continue 

75 

76 Raises: 

77 TypeError: If any adapter is sync (use PromptBuilder instead) 

78 """ 

79 super().__init__(library, adapters, default_validation, raise_on_rag_error) 

80 self._validate_adapters() 

81 

82 def _validate_adapters(self) -> None: 

83 """Validate that all adapters are asynchronous. 

84 

85 Raises: 

86 TypeError: If any adapter is sync 

87 """ 

88 for name, adapter in self.adapters.items(): 

89 if not adapter.is_async(): 

90 raise TypeError( 

91 f"Adapter '{name}' is synchronous. " 

92 "Use PromptBuilder for sync adapters." 

93 ) 

94 

95 async def render_system_prompt( 

96 self, 

97 name: str, 

98 params: Optional[Dict[str, Any]] = None, 

99 include_rag: bool = True, 

100 validation_override: Optional[ValidationLevel] = None, 

101 return_rag_metadata: bool = False, 

102 cached_rag: Optional[Dict[str, Any]] = None, 

103 **kwargs: Any 

104 ) -> RenderResult: 

105 """Render a system prompt with parameters and optional RAG content. 

106 

107 Args: 

108 name: System prompt identifier 

109 params: Runtime parameters to use in rendering 

110 include_rag: Whether to include RAG content (default: True) 

111 validation_override: Override validation level for this render 

112 return_rag_metadata: If True, attach RAG metadata to result 

113 cached_rag: If provided, use these cached RAG results instead 

114 of executing new searches 

115 **kwargs: Additional parameters passed to library 

116 

117 Returns: 

118 RenderResult with rendered content and metadata 

119 

120 Raises: 

121 ValueError: If prompt not found or validation fails 

122 

123 Example: 

124 >>> # Capture RAG metadata 

125 >>> result = await builder.render_system_prompt( 

126 ... 'code_question', 

127 ... params={'language': 'python'}, 

128 ... return_rag_metadata=True 

129 ... ) 

130 >>> print(result.rag_metadata) 

131 >>> 

132 >>> # Reuse cached RAG 

133 >>> result2 = await builder.render_system_prompt( 

134 ... 'code_question', 

135 ... params={'language': 'python'}, 

136 ... cached_rag=result.rag_metadata 

137 ... ) 

138 """ 

139 params = params or {} 

140 

141 # Retrieve template from library 

142 template_dict = self.library.get_system_prompt(name, **kwargs) 

143 if template_dict is None: 

144 raise ValueError(f"System prompt not found: {name}") 

145 

146 # Render the prompt 

147 return await self._render_prompt_impl( 

148 prompt_name=name, 

149 prompt_type="system", 

150 template_dict=template_dict, 

151 runtime_params=params, 

152 include_rag=include_rag, 

153 validation_override=validation_override, 

154 return_rag_metadata=return_rag_metadata, 

155 cached_rag=cached_rag, 

156 **kwargs 

157 ) 

158 

159 async def render_user_prompt( 

160 self, 

161 name: str, 

162 params: Optional[Dict[str, Any]] = None, 

163 include_rag: bool = True, 

164 validation_override: Optional[ValidationLevel] = None, 

165 return_rag_metadata: bool = False, 

166 cached_rag: Optional[Dict[str, Any]] = None, 

167 **kwargs: Any 

168 ) -> RenderResult: 

169 """Render a user prompt with parameters and optional RAG content. 

170 

171 Args: 

172 name: User prompt identifier 

173 params: Runtime parameters to use in rendering 

174 include_rag: Whether to include RAG content (default: True) 

175 validation_override: Override validation level for this render 

176 return_rag_metadata: If True, attach RAG metadata to result 

177 cached_rag: If provided, use these cached RAG results instead 

178 of executing new searches 

179 **kwargs: Additional parameters passed to library 

180 

181 Returns: 

182 RenderResult with rendered content and metadata 

183 

184 Raises: 

185 ValueError: If prompt not found or validation fails 

186 """ 

187 params = params or {} 

188 

189 # Retrieve template from library 

190 template_dict = self.library.get_user_prompt(name, **kwargs) 

191 if template_dict is None: 

192 raise ValueError(f"User prompt not found: {name}") 

193 

194 # Render the prompt 

195 return await self._render_prompt_impl( 

196 prompt_name=name, 

197 prompt_type="user", 

198 template_dict=template_dict, 

199 runtime_params=params, 

200 include_rag=include_rag, 

201 validation_override=validation_override, 

202 return_rag_metadata=return_rag_metadata, 

203 cached_rag=cached_rag, 

204 **kwargs 

205 ) 

206 

207 async def _render_prompt_impl( 

208 self, 

209 prompt_name: str, 

210 prompt_type: str, 

211 template_dict: PromptTemplate, 

212 runtime_params: Dict[str, Any], 

213 include_rag: bool, 

214 validation_override: Optional[ValidationLevel], 

215 return_rag_metadata: bool = False, 

216 cached_rag: Optional[Dict[str, Any]] = None, 

217 **kwargs: Any 

218 ) -> RenderResult: 

219 """Internal method to render a prompt template asynchronously. 

220 

221 Args: 

222 prompt_name: Name of the prompt 

223 prompt_type: Type of prompt ("system" or "user") 

224 template_dict: Template dictionary from library 

225 runtime_params: Runtime parameters 

226 include_rag: Whether to include RAG content 

227 validation_override: Validation level override 

228 return_rag_metadata: If True, capture and return RAG metadata 

229 cached_rag: If provided, use these cached RAG results instead 

230 of executing new searches 

231 **kwargs: Additional parameters 

232 

233 Returns: 

234 RenderResult with rendered content and metadata 

235 """ 

236 # Extract template components 

237 template = template_dict.get("template", "") 

238 template_metadata = template_dict.get("metadata", {}) 

239 

240 # Step 1: Merge defaults with runtime params 

241 all_params = self._merge_params_with_defaults(template_dict, runtime_params) 

242 

243 # Step 2: Execute or reuse RAG searches 

244 rag_metadata = None 

245 if include_rag: 

246 if cached_rag: 

247 # Use cached RAG results 

248 rag_content = self._extract_formatted_content_from_cache(cached_rag) 

249 if return_rag_metadata: 

250 rag_metadata = cached_rag # Pass through cached metadata 

251 else: 

252 # Execute fresh RAG searches 

253 rag_content, rag_metadata = await self._execute_rag_searches_impl( 

254 prompt_name=prompt_name, 

255 prompt_type=prompt_type, 

256 params=all_params, 

257 capture_metadata=return_rag_metadata, 

258 **kwargs 

259 ) 

260 

261 # Merge RAG content into parameters 

262 all_params.update(rag_content) 

263 

264 # Step 3: Prepare validation config with override 

265 validation_config = self._prepare_validation_config(template_dict, validation_override) 

266 

267 # Step 4: Render template with validation (synchronous) 

268 result = self._renderer.render( 

269 template=template, 

270 params=all_params, 

271 validation=validation_config, 

272 template_metadata=template_metadata 

273 ) 

274 

275 # Attach RAG metadata if requested 

276 if return_rag_metadata and rag_metadata: 

277 result.rag_metadata = rag_metadata 

278 

279 # Add builder metadata 

280 result.metadata.update({ 

281 "prompt_name": prompt_name, 

282 "prompt_type": prompt_type, 

283 "include_rag": include_rag, 

284 "used_cached_rag": cached_rag is not None, 

285 }) 

286 

287 return result 

288 

289 async def _execute_rag_searches_impl( 

290 self, 

291 prompt_name: str, 

292 prompt_type: str, 

293 params: Dict[str, Any], 

294 capture_metadata: bool = False, 

295 **kwargs: Any 

296 ) -> tuple[Dict[str, str], Optional[Dict[str, Any]]]: 

297 """Execute RAG searches in parallel and format results for injection. 

298 

299 Args: 

300 prompt_name: Name of the prompt 

301 prompt_type: Type of prompt ("system" or "user") 

302 params: Resolved parameters for query templating 

303 capture_metadata: If True, capture RAG metadata 

304 **kwargs: Additional parameters 

305 

306 Returns: 

307 Tuple of (rag_content, rag_metadata): 

308 - rag_content: Dictionary mapping placeholder names to formatted content 

309 - rag_metadata: Optional dict with full RAG details (if capture_metadata=True) 

310 """ 

311 # Get RAG configurations for this prompt 

312 rag_configs = self.library.get_prompt_rag_configs( 

313 prompt_name=prompt_name, 

314 prompt_type=prompt_type, 

315 **kwargs 

316 ) 

317 

318 if not rag_configs: 

319 return {}, None 

320 

321 # Execute all RAG searches in parallel 

322 if capture_metadata: 

323 tasks = [ 

324 self._execute_single_rag_with_metadata(rag_config, params) 

325 for rag_config in rag_configs 

326 ] 

327 else: 

328 tasks = [ 

329 self._execute_single_rag_search_safe(rag_config, params) 

330 for rag_config in rag_configs 

331 ] 

332 

333 results = await asyncio.gather(*tasks, return_exceptions=True) 

334 

335 # Collect results 

336 rag_content = {} 

337 rag_metadata = {} if capture_metadata else None 

338 

339 for rag_config, result in zip(rag_configs, results): 

340 placeholder = rag_config.get("placeholder", "RAG_CONTENT") 

341 

342 if isinstance(result, Exception): 

343 error_msg = f"RAG search failed for {prompt_name}: {result}" 

344 if self._raise_on_rag_error: 

345 raise RuntimeError(error_msg) from result 

346 else: 

347 logger.warning(error_msg) 

348 rag_content[placeholder] = "" 

349 if capture_metadata: 

350 from datetime import datetime 

351 rag_metadata[placeholder] = { 

352 "error": str(result), 

353 "timestamp": datetime.now().isoformat() 

354 } 

355 else: 

356 if capture_metadata: 

357 formatted_content, metadata = result 

358 rag_content[placeholder] = formatted_content 

359 if metadata: 

360 rag_metadata[placeholder] = metadata 

361 else: 

362 rag_content[placeholder] = result 

363 

364 return rag_content, rag_metadata 

365 

366 async def _execute_single_rag_search_safe( 

367 self, 

368 rag_config: RAGConfig, 

369 params: Dict[str, Any] 

370 ) -> str: 

371 """Safely execute a single RAG search (for use with asyncio.gather). 

372 

373 Args: 

374 rag_config: RAG configuration 

375 params: Parameters for query templating 

376 

377 Returns: 

378 Formatted RAG content string 

379 

380 Raises: 

381 Exception: Propagated from _execute_single_rag_search 

382 """ 

383 return await self._execute_single_rag_search(rag_config, params) 

384 

385 async def _execute_single_rag_search( 

386 self, 

387 rag_config: RAGConfig, 

388 params: Dict[str, Any] 

389 ) -> str: 

390 """Execute a single RAG search and format results asynchronously. 

391 

392 Args: 

393 rag_config: RAG configuration 

394 params: Parameters for query templating 

395 

396 Returns: 

397 Formatted RAG content string 

398 

399 Raises: 

400 KeyError: If adapter not found 

401 Exception: If search fails 

402 """ 

403 # Get adapter 

404 adapter_name = rag_config.get("adapter_name") 

405 if not adapter_name: 

406 raise ValueError("RAG config missing 'adapter_name'") 

407 

408 if adapter_name not in self.adapters: 

409 raise KeyError( 

410 f"Adapter '{adapter_name}' not found. " 

411 f"Available adapters: {list(self.adapters.keys())}" 

412 ) 

413 

414 adapter = self.adapters[adapter_name] 

415 

416 # Render query template 

417 query_template = rag_config.get("query", "") 

418 query = self._render_rag_query(query_template, params) 

419 

420 # Execute search (async) 

421 k = rag_config.get("k", 5) 

422 filters = rag_config.get("filters") 

423 search_results = await adapter.search(query=query, k=k, filters=filters) 

424 

425 # Format results 

426 formatted_content = self._format_rag_results( 

427 results=search_results, 

428 rag_config=rag_config, 

429 params=params 

430 ) 

431 

432 return formatted_content 

433 

434 async def _execute_single_rag_with_metadata( 

435 self, 

436 rag_config: RAGConfig, 

437 params: Dict[str, Any] 

438 ) -> tuple[str, Dict[str, Any]]: 

439 """Execute a single RAG search with metadata capture. 

440 

441 This method executes a RAG search and captures detailed metadata 

442 including the query, results, and query hash for caching. 

443 

444 Args: 

445 rag_config: RAG configuration 

446 params: Parameters for query templating 

447 

448 Returns: 

449 Tuple of (formatted_content, metadata): 

450 - formatted_content: Formatted RAG content string 

451 - metadata: Dictionary with RAG metadata including: 

452 - adapter_name: Name of the adapter used 

453 - query: Rendered query string 

454 - query_hash: SHA256 hash for cache matching 

455 - k: Number of results requested 

456 - filters: Filters applied to search 

457 - timestamp: ISO format timestamp 

458 - results: Raw search results 

459 - formatted_content: Formatted output 

460 - item_template: Template used for formatting 

461 - header: Header text used 

462 

463 Raises: 

464 KeyError: If adapter not found 

465 Exception: If search fails 

466 """ 

467 from datetime import datetime 

468 

469 # Get adapter 

470 adapter_name = rag_config.get("adapter_name") 

471 if not adapter_name: 

472 raise ValueError("RAG config missing 'adapter_name'") 

473 

474 if adapter_name not in self.adapters: 

475 raise KeyError( 

476 f"Adapter '{adapter_name}' not found. " 

477 f"Available adapters: {list(self.adapters.keys())}" 

478 ) 

479 

480 adapter = self.adapters[adapter_name] 

481 

482 # Render query template 

483 query_template = rag_config.get("query", "") 

484 query = self._render_rag_query(query_template, params) 

485 

486 # Compute query hash for cache matching 

487 query_hash = self._compute_rag_query_hash(adapter_name, query) 

488 

489 # Execute search (async) 

490 k = rag_config.get("k", 5) 

491 filters = rag_config.get("filters") 

492 search_results = await adapter.search(query=query, k=k, filters=filters) 

493 

494 # Format results 

495 formatted_content = self._format_rag_results( 

496 results=search_results, 

497 rag_config=rag_config, 

498 params=params 

499 ) 

500 

501 # Build metadata 

502 metadata = { 

503 "adapter_name": adapter_name, 

504 "query": query, 

505 "query_hash": query_hash, 

506 "k": k, 

507 "filters": filters, 

508 "timestamp": datetime.now().isoformat(), 

509 "results": search_results, # Store raw results 

510 "formatted_content": formatted_content, 

511 "item_template": rag_config.get("item_template"), 

512 "header": rag_config.get("header"), 

513 } 

514 

515 return formatted_content, metadata