Coverage for src/dataknobs_llm/llm/base.py: 50%

200 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-31 16:04 -0600

1"""Base LLM abstraction components. 

2 

3This module provides the base abstractions for unified LLM operations. 

4""" 

5 

6from abc import ABC, abstractmethod 

7from dataclasses import dataclass, field 

8from enum import Enum 

9from typing import ( 

10 Any, Dict, List, Union, AsyncIterator, Iterator, 

11 Callable, Protocol, Optional 

12) 

13from datetime import datetime 

14 

15# Import prompt builder types - clean one-way dependency (llm depends on prompts) 

16from dataknobs_llm.prompts import AsyncPromptBuilder, PromptBuilder 

17 

18 

19class CompletionMode(Enum): 

20 """LLM completion modes.""" 

21 CHAT = "chat" # Chat completion with message history 

22 TEXT = "text" # Text completion 

23 INSTRUCT = "instruct" # Instruction following 

24 EMBEDDING = "embedding" # Generate embeddings 

25 FUNCTION = "function" # Function calling 

26 

27 

28class ModelCapability(Enum): 

29 """Model capabilities.""" 

30 TEXT_GENERATION = "text_generation" 

31 CHAT = "chat" 

32 EMBEDDINGS = "embeddings" 

33 FUNCTION_CALLING = "function_calling" 

34 VISION = "vision" 

35 CODE = "code" 

36 JSON_MODE = "json_mode" 

37 STREAMING = "streaming" 

38 

39 

40@dataclass 

41class LLMMessage: 

42 """Represents a message in LLM conversation.""" 

43 role: str # 'system', 'user', 'assistant', 'function' 

44 content: str 

45 name: str | None = None # For function messages 

46 function_call: Dict[str, Any] | None = None # For function calling 

47 metadata: Dict[str, Any] = field(default_factory=dict) 

48 

49 

50@dataclass 

51class LLMResponse: 

52 """Response from LLM.""" 

53 content: str 

54 model: str 

55 finish_reason: str | None = None # 'stop', 'length', 'function_call' 

56 usage: Dict[str, int] | None = None # tokens used 

57 function_call: Dict[str, Any] | None = None 

58 metadata: Dict[str, Any] = field(default_factory=dict) 

59 created_at: datetime = field(default_factory=datetime.now) 

60 

61 

62@dataclass 

63class LLMStreamResponse: 

64 """Streaming response from LLM.""" 

65 delta: str # Incremental content 

66 is_final: bool = False 

67 finish_reason: str | None = None 

68 usage: Dict[str, int] | None = None 

69 metadata: Dict[str, Any] = field(default_factory=dict) 

70 

71 

72@dataclass 

73class LLMConfig: 

74 """Configuration for LLM operations. 

75 

76 This configuration class works with both direct instantiation and 

77 dataknobs Config objects. It can be created from dictionaries that 

78 include optional 'type' and 'factory' attributes used by Config. 

79 """ 

80 provider: str # 'openai', 'anthropic', 'ollama', etc. 

81 model: str # Model name/identifier 

82 api_key: str | None = None 

83 api_base: str | None = None # Custom API endpoint 

84 

85 # Generation parameters 

86 temperature: float = 0.7 

87 max_tokens: int | None = None 

88 top_p: float = 1.0 

89 frequency_penalty: float = 0.0 

90 presence_penalty: float = 0.0 

91 stop_sequences: List[str] | None = None 

92 

93 # Mode settings 

94 mode: CompletionMode = CompletionMode.CHAT 

95 system_prompt: str | None = None 

96 response_format: str | None = None # 'text' or 'json' 

97 

98 # Function calling 

99 functions: List[Dict[str, Any]] | None = None 

100 function_call: Union[str, Dict[str, str]] | None = None # 'auto', 'none', or specific function 

101 

102 # Streaming 

103 stream: bool = False 

104 stream_callback: Callable[[LLMStreamResponse], None] | None = None 

105 

106 # Rate limiting 

107 rate_limit: int | None = None # Requests per minute 

108 retry_count: int = 3 

109 retry_delay: float = 1.0 

110 timeout: float = 60.0 

111 

112 # Advanced settings 

113 seed: int | None = None # For reproducibility 

114 logit_bias: Dict[str, float] | None = None 

115 user_id: str | None = None 

116 

117 # Provider-specific options 

118 options: Dict[str, Any] = field(default_factory=dict) 

119 

120 @classmethod 

121 def from_dict(cls, config_dict: Dict[str, Any]) -> "LLMConfig": 

122 """Create LLMConfig from a dictionary. 

123 

124 This method handles dictionaries from dataknobs Config objects, 

125 which may include 'type', 'name', and 'factory' attributes. 

126 These attributes are ignored during LLMConfig construction. 

127 

128 Args: 

129 config_dict: Configuration dictionary 

130 

131 Returns: 

132 LLMConfig instance 

133 """ 

134 # Filter out Config-specific attributes 

135 config_data = { 

136 k: v for k, v in config_dict.items() 

137 if k not in ('type', 'name', 'factory') 

138 } 

139 

140 # Handle mode conversion if it's a string 

141 if 'mode' in config_data and isinstance(config_data['mode'], str): 

142 config_data['mode'] = CompletionMode(config_data['mode']) 

143 

144 # Get dataclass fields to filter unknown attributes 

145 valid_fields = {f.name for f in cls.__dataclass_fields__.values()} 

146 filtered_data = {k: v for k, v in config_data.items() if k in valid_fields} 

147 

148 return cls(**filtered_data) 

149 

150 def to_dict(self, include_config_attrs: bool = False) -> Dict[str, Any]: 

151 """Convert LLMConfig to a dictionary. 

152 

153 Args: 

154 include_config_attrs: If True, includes 'type' attribute for Config compatibility 

155 

156 Returns: 

157 Configuration dictionary 

158 """ 

159 result = {} 

160 

161 for field_info in self.__dataclass_fields__.values(): 

162 value = getattr(self, field_info.name) 

163 

164 # Handle enum conversion 

165 if isinstance(value, Enum): 

166 result[field_info.name] = value.value 

167 # Skip None values for optional fields 

168 elif value is not None: 

169 result[field_info.name] = value 

170 # Include default factories even if empty for certain fields 

171 elif field_info.name == 'options': 

172 result[field_info.name] = {} 

173 

174 # Optionally add Config-compatible type attribute 

175 if include_config_attrs: 

176 result['type'] = 'llm' 

177 

178 return result 

179 

180 

181def normalize_llm_config(config: Union["LLMConfig", "Config", Dict[str, Any]]) -> "LLMConfig": 

182 """Normalize various config formats to LLMConfig. 

183 

184 This helper function accepts LLMConfig instances, dataknobs Config objects, 

185 or plain dictionaries and returns a standardized LLMConfig instance. 

186 

187 Args: 

188 config: Configuration as LLMConfig, Config object, or dictionary 

189 

190 Returns: 

191 LLMConfig instance 

192 

193 Raises: 

194 TypeError: If config type is not supported 

195 """ 

196 # Already an LLMConfig instance 

197 if isinstance(config, LLMConfig): 

198 return config 

199 

200 # Dictionary (possibly from Config.get()) 

201 if isinstance(config, dict): 

202 return LLMConfig.from_dict(config) 

203 

204 # dataknobs Config object - try to get the config dict 

205 # We check for the get method to identify Config objects 

206 if hasattr(config, 'get') and hasattr(config, 'get_types'): 

207 # It's a Config object, extract the llm configuration 

208 # Try to get first llm config, or fall back to first available type 

209 try: 

210 config_dict = config.get('llm', 0) 

211 except Exception: 

212 # If no 'llm' type, try to get first available config of any type 

213 types = config.get_types() 

214 if types: 

215 config_dict = config.get(types[0], 0) 

216 else: 

217 raise ValueError("Config object has no configurations") 

218 

219 return LLMConfig.from_dict(config_dict) 

220 

221 raise TypeError( 

222 f"Unsupported config type: {type(config).__name__}. " 

223 f"Expected LLMConfig, Config, or dict." 

224 ) 

225 

226 

227class LLMProvider(ABC): 

228 """Base LLM provider interface.""" 

229 

230 def __init__( 

231 self, 

232 config: Union[LLMConfig, "Config", Dict[str, Any]], 

233 prompt_builder: Optional[Union[PromptBuilder, AsyncPromptBuilder]] = None 

234 ): 

235 """Initialize provider with configuration. 

236 

237 Args: 

238 config: Configuration as LLMConfig, dataknobs Config object, or dict 

239 prompt_builder: Optional prompt builder for integrated prompting 

240 """ 

241 self.config = normalize_llm_config(config) 

242 self.prompt_builder = prompt_builder 

243 self._client = None 

244 self._is_initialized = False 

245 

246 def _validate_prompt_builder(self, expected_type: type) -> None: 

247 """Validate that prompt builder is configured and of correct type. 

248 

249 Args: 

250 expected_type: Expected builder type (PromptBuilder or AsyncPromptBuilder) 

251 

252 Raises: 

253 ValueError: If prompt_builder not configured 

254 TypeError: If prompt_builder is wrong type 

255 """ 

256 if not self.prompt_builder: 

257 raise ValueError( 

258 "No prompt_builder configured. Pass prompt_builder to __init__() " 

259 "or use complete() directly with pre-rendered messages." 

260 ) 

261 

262 if not isinstance(self.prompt_builder, expected_type): 

263 raise TypeError( 

264 f"{self.__class__.__name__} requires {expected_type.__name__}, " 

265 f"got {type(self.prompt_builder).__name__}" 

266 ) 

267 

268 def _validate_render_params( 

269 self, 

270 prompt_type: str 

271 ) -> None: 

272 """Validate render parameters. 

273 

274 Args: 

275 prompt_type: Type of prompt to render 

276 

277 Raises: 

278 ValueError: If prompt_type is invalid 

279 """ 

280 if prompt_type not in ("system", "user", "both"): 

281 raise ValueError( 

282 f"Invalid prompt_type: {prompt_type}. " 

283 f"Must be 'system', 'user', or 'both'" 

284 ) 

285 

286 @abstractmethod 

287 def initialize(self) -> None: 

288 """Initialize the LLM client.""" 

289 pass 

290 

291 @abstractmethod 

292 def close(self) -> None: 

293 """Close the LLM client.""" 

294 pass 

295 

296 @abstractmethod 

297 def validate_model(self) -> bool: 

298 """Validate that the model is available.""" 

299 pass 

300 

301 @abstractmethod 

302 def get_capabilities(self) -> List[ModelCapability]: 

303 """Get model capabilities.""" 

304 pass 

305 

306 @property 

307 def is_initialized(self) -> bool: 

308 """Check if provider is initialized.""" 

309 return self._is_initialized 

310 

311 def __enter__(self): 

312 """Context manager entry.""" 

313 self.initialize() 

314 return self 

315 

316 def __exit__(self, exc_type, exc_val, exc_tb): 

317 """Context manager exit.""" 

318 self.close() 

319 

320 

321class AsyncLLMProvider(LLMProvider): 

322 """Async LLM provider interface.""" 

323 

324 @abstractmethod 

325 async def complete( 

326 self, 

327 messages: Union[str, List[LLMMessage]], 

328 **kwargs 

329 ) -> LLMResponse: 

330 """Generate completion asynchronously. 

331 

332 Args: 

333 messages: Input messages or prompt 

334 **kwargs: Additional parameters 

335 

336 Returns: 

337 LLM response 

338 """ 

339 pass 

340 

341 async def render_and_complete( 

342 self, 

343 prompt_name: str, 

344 params: Optional[Dict[str, Any]] = None, 

345 prompt_type: str = "user", 

346 index: int = 0, 

347 include_rag: bool = True, 

348 **llm_kwargs 

349 ) -> LLMResponse: 

350 """Render prompt from library and execute LLM completion. 

351 

352 This is a convenience method for one-off interactions that combines 

353 prompt rendering with LLM execution. For multi-turn conversations, 

354 use ConversationManager instead. 

355 

356 Args: 

357 prompt_name: Name of prompt in library 

358 params: Parameters for template rendering 

359 prompt_type: Type of prompt ("system", "user", or "both") 

360 index: Prompt variant index (for user prompts) 

361 include_rag: Whether to execute RAG searches 

362 **llm_kwargs: Additional arguments passed to complete() 

363 

364 Returns: 

365 LLM response 

366 

367 Raises: 

368 ValueError: If prompt_builder not configured or invalid prompt_type 

369 TypeError: If prompt_builder is not AsyncPromptBuilder 

370 

371 Example: 

372 >>> llm = OpenAIProvider(config, prompt_builder=builder) 

373 >>> result = await llm.render_and_complete( 

374 ... "analyze_code", 

375 ... params={"code": code, "language": "python"} 

376 ... ) 

377 """ 

378 # Validate 

379 from dataknobs_llm.prompts import AsyncPromptBuilder 

380 self._validate_prompt_builder(AsyncPromptBuilder) 

381 self._validate_render_params(prompt_type) 

382 

383 # Render messages 

384 messages = await self._render_messages( 

385 prompt_name, params, prompt_type, index, include_rag 

386 ) 

387 

388 # Execute LLM 

389 return await self.complete(messages, **llm_kwargs) 

390 

391 async def render_and_stream( 

392 self, 

393 prompt_name: str, 

394 params: Optional[Dict[str, Any]] = None, 

395 prompt_type: str = "user", 

396 index: int = 0, 

397 include_rag: bool = True, 

398 **llm_kwargs 

399 ) -> AsyncIterator[LLMStreamResponse]: 

400 """Render prompt and stream LLM response. 

401 

402 Same as render_and_complete() but returns streaming response. 

403 

404 Args: 

405 prompt_name: Name of prompt in library 

406 params: Parameters for template rendering 

407 prompt_type: Type of prompt ("system", "user", or "both") 

408 index: Prompt variant index 

409 include_rag: Whether to execute RAG searches 

410 **llm_kwargs: Additional arguments passed to stream_complete() 

411 

412 Yields: 

413 Streaming response chunks 

414 

415 Raises: 

416 ValueError: If prompt_builder not configured or invalid prompt_type 

417 TypeError: If prompt_builder is not AsyncPromptBuilder 

418 

419 Example: 

420 >>> async for chunk in llm.render_and_stream("analyze_code", params={"code": code}): 

421 ... print(chunk.delta, end="") 

422 """ 

423 # Validate 

424 from dataknobs_llm.prompts import AsyncPromptBuilder 

425 self._validate_prompt_builder(AsyncPromptBuilder) 

426 self._validate_render_params(prompt_type) 

427 

428 # Render messages 

429 messages = await self._render_messages( 

430 prompt_name, params, prompt_type, index, include_rag 

431 ) 

432 

433 # Stream LLM response 

434 async for chunk in self.stream_complete(messages, **llm_kwargs): 

435 yield chunk 

436 

437 async def _render_messages( 

438 self, 

439 prompt_name: str, 

440 params: Optional[Dict[str, Any]], 

441 prompt_type: str, 

442 index: int, 

443 include_rag: bool 

444 ) -> List[LLMMessage]: 

445 """Render messages from prompt library (async version). 

446 

447 Args: 

448 prompt_name: Name of prompt in library 

449 params: Parameters for template rendering 

450 prompt_type: Type of prompt ("system", "user", or "both") 

451 index: Prompt variant index 

452 include_rag: Whether to execute RAG searches 

453 

454 Returns: 

455 List of rendered LLM messages 

456 """ 

457 from dataknobs_llm.prompts import AsyncPromptBuilder 

458 builder: AsyncPromptBuilder = self.prompt_builder # type: ignore 

459 

460 messages: List[LLMMessage] = [] 

461 params = params or {} 

462 

463 if prompt_type in ("system", "both"): 

464 result = await builder.render_system_prompt( 

465 prompt_name, params=params, include_rag=include_rag 

466 ) 

467 messages.append(LLMMessage(role="system", content=result.content)) 

468 

469 if prompt_type in ("user", "both"): 

470 result = await builder.render_user_prompt( 

471 prompt_name, index=index, params=params, include_rag=include_rag 

472 ) 

473 messages.append(LLMMessage(role="user", content=result.content)) 

474 

475 return messages 

476 

477 @abstractmethod 

478 async def stream_complete( 

479 self, 

480 messages: Union[str, List[LLMMessage]], 

481 **kwargs 

482 ) -> AsyncIterator[LLMStreamResponse]: 

483 """Generate streaming completion asynchronously. 

484  

485 Args: 

486 messages: Input messages or prompt 

487 **kwargs: Additional parameters 

488  

489 Yields: 

490 Streaming response chunks 

491 """ 

492 pass 

493 

494 @abstractmethod 

495 async def embed( 

496 self, 

497 texts: Union[str, List[str]], 

498 **kwargs 

499 ) -> Union[List[float], List[List[float]]]: 

500 """Generate embeddings asynchronously. 

501  

502 Args: 

503 texts: Input text(s) 

504 **kwargs: Additional parameters 

505  

506 Returns: 

507 Embedding vector(s) 

508 """ 

509 pass 

510 

511 @abstractmethod 

512 async def function_call( 

513 self, 

514 messages: List[LLMMessage], 

515 functions: List[Dict[str, Any]], 

516 **kwargs 

517 ) -> LLMResponse: 

518 """Execute function calling asynchronously. 

519  

520 Args: 

521 messages: Conversation messages 

522 functions: Available functions 

523 **kwargs: Additional parameters 

524  

525 Returns: 

526 Response with function call 

527 """ 

528 pass 

529 

530 async def initialize(self) -> None: 

531 """Initialize the async LLM client.""" 

532 self._is_initialized = True 

533 

534 async def close(self) -> None: 

535 """Close the async LLM client.""" 

536 self._is_initialized = False 

537 

538 async def __aenter__(self): 

539 """Async context manager entry.""" 

540 await self.initialize() 

541 return self 

542 

543 async def __aexit__(self, exc_type, exc_val, exc_tb): 

544 """Async context manager exit.""" 

545 await self.close() 

546 

547 

548class SyncLLMProvider(LLMProvider): 

549 """Synchronous LLM provider interface.""" 

550 

551 @abstractmethod 

552 def complete( 

553 self, 

554 messages: Union[str, List[LLMMessage]], 

555 **kwargs 

556 ) -> LLMResponse: 

557 """Generate completion synchronously. 

558 

559 Args: 

560 messages: Input messages or prompt 

561 **kwargs: Additional parameters 

562 

563 Returns: 

564 LLM response 

565 """ 

566 pass 

567 

568 def render_and_complete( 

569 self, 

570 prompt_name: str, 

571 params: Optional[Dict[str, Any]] = None, 

572 prompt_type: str = "user", 

573 index: int = 0, 

574 include_rag: bool = True, 

575 **llm_kwargs 

576 ) -> LLMResponse: 

577 """Render prompt from library and execute LLM completion. 

578 

579 This is a convenience method for one-off interactions that combines 

580 prompt rendering with LLM execution. For multi-turn conversations, 

581 use ConversationManager instead. 

582 

583 Args: 

584 prompt_name: Name of prompt in library 

585 params: Parameters for template rendering 

586 prompt_type: Type of prompt ("system", "user", or "both") 

587 index: Prompt variant index (for user prompts) 

588 include_rag: Whether to execute RAG searches 

589 **llm_kwargs: Additional arguments passed to complete() 

590 

591 Returns: 

592 LLM response 

593 

594 Raises: 

595 ValueError: If prompt_builder not configured or invalid prompt_type 

596 TypeError: If prompt_builder is not PromptBuilder 

597 

598 Example: 

599 >>> llm = SyncOpenAIProvider(config, prompt_builder=builder) 

600 >>> result = llm.render_and_complete( 

601 ... "analyze_code", 

602 ... params={"code": code, "language": "python"} 

603 ... ) 

604 """ 

605 # Validate 

606 from dataknobs_llm.prompts import PromptBuilder 

607 self._validate_prompt_builder(PromptBuilder) 

608 self._validate_render_params(prompt_type) 

609 

610 # Render messages 

611 messages = self._render_messages( 

612 prompt_name, params, prompt_type, index, include_rag 

613 ) 

614 

615 # Execute LLM 

616 return self.complete(messages, **llm_kwargs) 

617 

618 def render_and_stream( 

619 self, 

620 prompt_name: str, 

621 params: Optional[Dict[str, Any]] = None, 

622 prompt_type: str = "user", 

623 index: int = 0, 

624 include_rag: bool = True, 

625 **llm_kwargs 

626 ) -> Iterator[LLMStreamResponse]: 

627 """Render prompt and stream LLM response. 

628 

629 Same as render_and_complete() but returns streaming response. 

630 

631 Args: 

632 prompt_name: Name of prompt in library 

633 params: Parameters for template rendering 

634 prompt_type: Type of prompt ("system", "user", or "both") 

635 index: Prompt variant index 

636 include_rag: Whether to execute RAG searches 

637 **llm_kwargs: Additional arguments passed to stream_complete() 

638 

639 Yields: 

640 Streaming response chunks 

641 

642 Raises: 

643 ValueError: If prompt_builder not configured or invalid prompt_type 

644 TypeError: If prompt_builder is not PromptBuilder 

645 

646 Example: 

647 >>> for chunk in llm.render_and_stream("analyze_code", params={"code": code}): 

648 ... print(chunk.delta, end="") 

649 """ 

650 # Validate 

651 from dataknobs_llm.prompts import PromptBuilder 

652 self._validate_prompt_builder(PromptBuilder) 

653 self._validate_render_params(prompt_type) 

654 

655 # Render messages 

656 messages = self._render_messages( 

657 prompt_name, params, prompt_type, index, include_rag 

658 ) 

659 

660 # Stream LLM response 

661 for chunk in self.stream_complete(messages, **llm_kwargs): 

662 yield chunk 

663 

664 def _render_messages( 

665 self, 

666 prompt_name: str, 

667 params: Optional[Dict[str, Any]], 

668 prompt_type: str, 

669 index: int, 

670 include_rag: bool 

671 ) -> List[LLMMessage]: 

672 """Render messages from prompt library (sync version). 

673 

674 Args: 

675 prompt_name: Name of prompt in library 

676 params: Parameters for template rendering 

677 prompt_type: Type of prompt ("system", "user", or "both") 

678 index: Prompt variant index 

679 include_rag: Whether to execute RAG searches 

680 

681 Returns: 

682 List of rendered LLM messages 

683 """ 

684 from dataknobs_llm.prompts import PromptBuilder 

685 builder: PromptBuilder = self.prompt_builder # type: ignore 

686 

687 messages: List[LLMMessage] = [] 

688 params = params or {} 

689 

690 if prompt_type in ("system", "both"): 

691 result = builder.render_system_prompt( 

692 prompt_name, params=params, include_rag=include_rag 

693 ) 

694 messages.append(LLMMessage(role="system", content=result.content)) 

695 

696 if prompt_type in ("user", "both"): 

697 result = builder.render_user_prompt( 

698 prompt_name, index=index, params=params, include_rag=include_rag 

699 ) 

700 messages.append(LLMMessage(role="user", content=result.content)) 

701 

702 return messages 

703 

704 @abstractmethod 

705 def stream_complete( 

706 self, 

707 messages: Union[str, List[LLMMessage]], 

708 **kwargs 

709 ) -> Iterator[LLMStreamResponse]: 

710 """Generate streaming completion synchronously. 

711 

712 Args: 

713 messages: Input messages or prompt 

714 **kwargs: Additional parameters 

715 

716 Yields: 

717 Streaming response chunks 

718 """ 

719 pass 

720 

721 @abstractmethod 

722 def embed( 

723 self, 

724 texts: Union[str, List[str]], 

725 **kwargs 

726 ) -> Union[List[float], List[List[float]]]: 

727 """Generate embeddings synchronously. 

728  

729 Args: 

730 texts: Input text(s) 

731 **kwargs: Additional parameters 

732  

733 Returns: 

734 Embedding vector(s) 

735 """ 

736 pass 

737 

738 @abstractmethod 

739 def function_call( 

740 self, 

741 messages: List[LLMMessage], 

742 functions: List[Dict[str, Any]], 

743 **kwargs 

744 ) -> LLMResponse: 

745 """Execute function calling synchronously. 

746  

747 Args: 

748 messages: Conversation messages 

749 functions: Available functions 

750 **kwargs: Additional parameters 

751  

752 Returns: 

753 Response with function call 

754 """ 

755 pass 

756 

757 def initialize(self) -> None: 

758 """Initialize the sync LLM client.""" 

759 self._is_initialized = True 

760 

761 def close(self) -> None: 

762 """Close the sync LLM client.""" 

763 self._is_initialized = False 

764 

765 

766class LLMAdapter(ABC): 

767 """Base adapter for converting between different LLM formats.""" 

768 

769 @abstractmethod 

770 def adapt_messages( 

771 self, 

772 messages: List[LLMMessage] 

773 ) -> Any: 

774 """Adapt messages to provider format.""" 

775 pass 

776 

777 @abstractmethod 

778 def adapt_response( 

779 self, 

780 response: Any 

781 ) -> LLMResponse: 

782 """Adapt provider response to standard format.""" 

783 pass 

784 

785 @abstractmethod 

786 def adapt_config( 

787 self, 

788 config: LLMConfig 

789 ) -> Dict[str, Any]: 

790 """Adapt configuration to provider format.""" 

791 pass 

792 

793 

794class LLMMiddleware(Protocol): 

795 """Protocol for LLM middleware. 

796 

797 Middleware can accept configuration as LLMConfig, dataknobs Config, or dict. 

798 """ 

799 

800 async def process_request( 

801 self, 

802 messages: List[LLMMessage], 

803 config: Union[LLMConfig, "Config", Dict[str, Any]] 

804 ) -> List[LLMMessage]: 

805 """Process request before sending to LLM. 

806 

807 Args: 

808 messages: Input messages 

809 config: Configuration (LLMConfig, Config, or dict) 

810 

811 Returns: 

812 Processed messages 

813 """ 

814 ... 

815 

816 async def process_response( 

817 self, 

818 response: LLMResponse, 

819 config: Union[LLMConfig, "Config", Dict[str, Any]] 

820 ) -> LLMResponse: 

821 """Process response from LLM. 

822 

823 Args: 

824 response: LLM response 

825 config: Configuration (LLMConfig, Config, or dict) 

826 

827 Returns: 

828 Processed response 

829 """ 

830 ...