Coverage for src/dataknobs_llm/llm/base.py: 81%

206 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 13:51 -0700

1"""Base LLM abstraction components. 

2 

3This module provides the base abstractions for unified LLM operations across 

4different providers (OpenAI, Anthropic, Ollama, etc.). It defines standard 

5interfaces for completions, streaming, embeddings, and function calling. 

6 

7The architecture follows a provider pattern where all LLM providers implement 

8common interfaces (AsyncLLMProvider or SyncLLMProvider) and use standardized 

9data structures (LLMMessage, LLMResponse, LLMConfig). 

10 

11Key Components: 

12 - LLMProvider: Base provider interface with initialization and lifecycle 

13 - AsyncLLMProvider: Async provider with complete(), stream_complete(), embed() 

14 - SyncLLMProvider: Synchronous version for non-async applications 

15 - LLMMessage: Standard message format for conversations 

16 - LLMResponse: Standard response with content, usage, and cost tracking 

17 - LLMConfig: Comprehensive configuration with 20+ parameters 

18 - LLMAdapter: Format adapters for provider-specific APIs 

19 - LLMMiddleware: Request/response processing pipeline 

20 

21Example: 

22 ```python 

23 from dataknobs_llm import create_llm_provider 

24 from dataknobs_llm.llm.base import LLMConfig, LLMMessage 

25 

26 # Create provider with config 

27 config = LLMConfig( 

28 provider="openai", 

29 model="gpt-4", 

30 temperature=0.7, 

31 max_tokens=500 

32 ) 

33 

34 # Async usage 

35 async with create_llm_provider(config) as llm: 

36 # Simple completion 

37 response = await llm.complete("What is Python?") 

38 print(response.content) 

39 

40 # Streaming 

41 async for chunk in llm.stream_complete("Tell me a story"): 

42 print(chunk.delta, end="", flush=True) 

43 

44 # Multi-turn conversation 

45 messages = [ 

46 LLMMessage(role="system", content="You are helpful"), 

47 LLMMessage(role="user", content="Hello!"), 

48 ] 

49 response = await llm.complete(messages) 

50 ``` 

51 

52See Also: 

53 - dataknobs_llm.llm.providers: Provider implementations 

54 - dataknobs_llm.conversations: Multi-turn conversation management 

55 - dataknobs_llm.prompts: Prompt rendering and RAG integration 

56""" 

57 

58from abc import ABC, abstractmethod 

59from dataclasses import dataclass, field 

60from enum import Enum 

61from typing import ( 

62 Any, Dict, List, Union, AsyncIterator, Iterator, 

63 Callable, Protocol 

64) 

65from datetime import datetime 

66 

67# Import prompt builder types - clean one-way dependency (llm depends on prompts) 

68from dataknobs_llm.prompts import AsyncPromptBuilder, PromptBuilder 

69from dataknobs_config.config import Config 

70 

71 

72class CompletionMode(Enum): 

73 """LLM completion modes. 

74 

75 Defines the operation mode for LLM requests. Different modes use 

76 different APIs and formatting requirements. 

77 

78 Attributes: 

79 CHAT: Chat completion with conversational message history 

80 TEXT: Raw text completion (legacy models) 

81 INSTRUCT: Instruction-following mode 

82 EMBEDDING: Generate vector embeddings for semantic search 

83 FUNCTION: Function/tool calling mode 

84 

85 Example: 

86 ```python 

87 from dataknobs_llm.llm.base import LLMConfig, CompletionMode 

88 

89 # Chat mode (default for modern models) 

90 config = LLMConfig( 

91 provider="openai", 

92 model="gpt-4", 

93 mode=CompletionMode.CHAT 

94 ) 

95 

96 # Embedding mode for vector search 

97 embedding_config = LLMConfig( 

98 provider="openai", 

99 model="text-embedding-ada-002", 

100 mode=CompletionMode.EMBEDDING 

101 ) 

102 ``` 

103 """ 

104 CHAT = "chat" # Chat completion with message history 

105 TEXT = "text" # Text completion 

106 INSTRUCT = "instruct" # Instruction following 

107 EMBEDDING = "embedding" # Generate embeddings 

108 FUNCTION = "function" # Function calling 

109 

110 

111class ModelCapability(Enum): 

112 """Model capabilities. 

113 

114 Enumerates the capabilities that different LLM models support. 

115 Providers use this to advertise what features are available for 

116 a specific model. 

117 

118 Attributes: 

119 TEXT_GENERATION: Basic text generation 

120 CHAT: Multi-turn conversational interactions 

121 EMBEDDINGS: Vector embedding generation 

122 FUNCTION_CALLING: Tool/function calling support 

123 VISION: Image understanding capabilities 

124 CODE: Code generation and analysis 

125 JSON_MODE: Structured JSON output 

126 STREAMING: Incremental response streaming 

127 

128 Example: 

129 ```python 

130 from dataknobs_llm import create_llm_provider 

131 from dataknobs_llm.llm.base import ModelCapability 

132 

133 # Check model capabilities 

134 llm = create_llm_provider("openai", model="gpt-4") 

135 capabilities = llm.get_capabilities() 

136 

137 if ModelCapability.STREAMING in capabilities: 

138 # Use streaming 

139 async for chunk in llm.stream_complete("Hello"): 

140 print(chunk.delta, end="") 

141 

142 if ModelCapability.FUNCTION_CALLING in capabilities: 

143 # Use function calling 

144 response = await llm.function_call(messages, functions) 

145 ``` 

146 """ 

147 TEXT_GENERATION = "text_generation" 

148 CHAT = "chat" 

149 EMBEDDINGS = "embeddings" 

150 FUNCTION_CALLING = "function_calling" 

151 VISION = "vision" 

152 CODE = "code" 

153 JSON_MODE = "json_mode" 

154 STREAMING = "streaming" 

155 

156 

157@dataclass 

158class LLMMessage: 

159 """Represents a message in LLM conversation. 

160 

161 Standard message format used across all providers. Messages are the 

162 fundamental unit of LLM interactions, containing role-based content 

163 for multi-turn conversations. 

164 

165 Attributes: 

166 role: Message role - 'system', 'user', 'assistant', or 'function' 

167 content: Message content text 

168 name: Optional name for function messages or multi-user scenarios 

169 function_call: Function call data for tool-using models 

170 metadata: Additional metadata (timestamps, IDs, etc.) 

171 

172 Example: 

173 ```python 

174 from dataknobs_llm.llm.base import LLMMessage 

175 

176 # System message 

177 system_msg = LLMMessage( 

178 role="system", 

179 content="You are a helpful coding assistant." 

180 ) 

181 

182 # User message 

183 user_msg = LLMMessage( 

184 role="user", 

185 content="How do I reverse a list in Python?" 

186 ) 

187 

188 # Assistant message 

189 assistant_msg = LLMMessage( 

190 role="assistant", 

191 content="Use the reverse() method or [::-1] slicing." 

192 ) 

193 

194 # Function result message 

195 function_msg = LLMMessage( 

196 role="function", 

197 name="search_docs", 

198 content='{"result": "Found 3 examples"}' 

199 ) 

200 

201 # Build conversation 

202 messages = [system_msg, user_msg, assistant_msg] 

203 ``` 

204 """ 

205 role: str # 'system', 'user', 'assistant', 'function' 

206 content: str 

207 name: str | None = None # For function messages 

208 function_call: Dict[str, Any] | None = None # For function calling 

209 metadata: Dict[str, Any] = field(default_factory=dict) 

210 

211 

212@dataclass 

213class LLMResponse: 

214 """Response from LLM. 

215 

216 Standard response format returned by all LLM providers. Contains the 

217 generated content along with metadata about token usage, cost, and 

218 completion status. 

219 

220 Attributes: 

221 content: Generated text content 

222 model: Model identifier that generated the response 

223 finish_reason: Why generation stopped - 'stop', 'length', 'function_call' 

224 usage: Token usage stats (prompt_tokens, completion_tokens, total_tokens) 

225 function_call: Function call data if model requested tool use 

226 metadata: Provider-specific metadata 

227 created_at: Response timestamp 

228 cost_usd: Estimated cost in USD for this request 

229 cumulative_cost_usd: Running total cost for conversation 

230 

231 Example: 

232 ```python 

233 from dataknobs_llm import create_llm_provider 

234 

235 llm = create_llm_provider("openai", model="gpt-4") 

236 response = await llm.complete("What is Python?") 

237 

238 # Access response data 

239 print(response.content) 

240 # => "Python is a high-level programming language..." 

241 

242 # Check token usage 

243 print(f"Tokens used: {response.usage['total_tokens']}") 

244 # => Tokens used: 87 

245 

246 # Monitor costs 

247 if response.cost_usd: 

248 print(f"Cost: ${response.cost_usd:.4f}") 

249 print(f"Total: ${response.cumulative_cost_usd:.4f}") 

250 

251 # Check completion status 

252 if response.finish_reason == "length": 

253 print("Response truncated due to max_tokens limit") 

254 ``` 

255 

256 See Also: 

257 LLMMessage: Request message format 

258 LLMStreamResponse: Streaming response format 

259 """ 

260 content: str 

261 model: str 

262 finish_reason: str | None = None # 'stop', 'length', 'function_call' 

263 usage: Dict[str, int] | None = None # tokens used 

264 function_call: Dict[str, Any] | None = None 

265 metadata: Dict[str, Any] = field(default_factory=dict) 

266 created_at: datetime = field(default_factory=datetime.now) 

267 

268 # Cost tracking (optional enhancement for DynaBot) 

269 cost_usd: float | None = None # Estimated cost in USD 

270 cumulative_cost_usd: float | None = None # Running total for conversation 

271 

272 

273@dataclass 

274class LLMStreamResponse: 

275 r"""Streaming response from LLM. 

276 

277 Represents a single chunk in a streaming LLM response. Streaming 

278 allows displaying generated text incrementally as it's produced, 

279 providing better user experience for long responses. 

280 

281 Attributes: 

282 delta: Incremental content for this chunk (not cumulative) 

283 is_final: True if this is the last chunk in the stream 

284 finish_reason: Why generation stopped (only set on final chunk) 

285 usage: Token usage stats (only set on final chunk) 

286 metadata: Additional chunk metadata 

287 

288 Example: 

289 ```python 

290 from dataknobs_llm import create_llm_provider 

291 

292 llm = create_llm_provider("openai", model="gpt-4") 

293 

294 # Stream and display in real-time 

295 async for chunk in llm.stream_complete("Write a poem"): 

296 print(chunk.delta, end="", flush=True) 

297 

298 if chunk.is_final: 

299 print(f"\n\nFinished: {chunk.finish_reason}") 

300 print(f"Tokens: {chunk.usage['total_tokens']}") 

301 

302 # Accumulate full response 

303 full_text = "" 

304 chunks_received = 0 

305 

306 async for chunk in llm.stream_complete("Explain Python"): 

307 full_text += chunk.delta 

308 chunks_received += 1 

309 

310 # Optional: show progress 

311 if chunks_received % 10 == 0: 

312 print(f"Received {chunks_received} chunks...") 

313 

314 print(f"\nComplete response ({len(full_text)} chars)") 

315 print(full_text) 

316 ``` 

317 

318 See Also: 

319 LLMResponse: Non-streaming response format 

320 AsyncLLMProvider.stream_complete: Streaming method 

321 """ 

322 delta: str # Incremental content 

323 is_final: bool = False 

324 finish_reason: str | None = None 

325 usage: Dict[str, int] | None = None 

326 metadata: Dict[str, Any] = field(default_factory=dict) 

327 

328 

329@dataclass 

330class LLMConfig: 

331 """Configuration for LLM operations. 

332 

333 Comprehensive configuration for LLM providers with 20+ parameters 

334 controlling generation, rate limiting, function calling, and more. 

335 Works seamlessly with both direct instantiation and dataknobs Config objects. 

336 

337 This class supports: 

338 - All major LLM providers (OpenAI, Anthropic, Ollama, HuggingFace) 

339 - Generation parameters (temperature, max_tokens, top_p, etc.) 

340 - Function/tool calling configuration 

341 - Streaming with callbacks 

342 - Rate limiting and retry logic 

343 - Provider-specific options via options dict 

344 

345 Example: 

346 ```python 

347 from dataknobs_llm.llm.base import LLMConfig, CompletionMode 

348 

349 # Basic configuration 

350 config = LLMConfig( 

351 provider="openai", 

352 model="gpt-4", 

353 api_key="sk-...", 

354 temperature=0.7, 

355 max_tokens=500 

356 ) 

357 

358 # Creative writing config 

359 creative_config = LLMConfig( 

360 provider="anthropic", 

361 model="claude-3-sonnet", 

362 temperature=1.2, 

363 top_p=0.95, 

364 max_tokens=2000 

365 ) 

366 

367 # Deterministic config for testing 

368 test_config = LLMConfig( 

369 provider="openai", 

370 model="gpt-4", 

371 temperature=0.0, 

372 seed=42, # Reproducible outputs 

373 max_tokens=100 

374 ) 

375 

376 # Function calling config 

377 function_config = LLMConfig( 

378 provider="openai", 

379 model="gpt-4", 

380 functions=[{ 

381 "name": "search_docs", 

382 "description": "Search documentation", 

383 "parameters": {"type": "object", "properties": {...}} 

384 }], 

385 function_call="auto" 

386 ) 

387 

388 # Streaming with callback 

389 def on_chunk(chunk): 

390 print(chunk.delta, end="") 

391 

392 streaming_config = LLMConfig( 

393 provider="openai", 

394 model="gpt-4", 

395 stream=True, 

396 stream_callback=on_chunk 

397 ) 

398 

399 # From dictionary (Config compatibility) 

400 config_dict = { 

401 "provider": "ollama", 

402 "model": "llama2", 

403 "type": "llm", # Config metadata (ignored) 

404 "temperature": 0.8 

405 } 

406 config = LLMConfig.from_dict(config_dict) 

407 

408 # Clone with overrides 

409 new_config = config.clone(temperature=1.0, max_tokens=1000) 

410 ``` 

411 

412 See Also: 

413 normalize_llm_config: Convert various formats to LLMConfig 

414 CompletionMode: Available completion modes 

415 """ 

416 provider: str # 'openai', 'anthropic', 'ollama', etc. 

417 model: str # Model name/identifier 

418 api_key: str | None = None 

419 api_base: str | None = None # Custom API endpoint 

420 

421 # Generation parameters 

422 temperature: float = 0.7 

423 max_tokens: int | None = None 

424 top_p: float = 1.0 

425 frequency_penalty: float = 0.0 

426 presence_penalty: float = 0.0 

427 stop_sequences: List[str] | None = None 

428 

429 # Mode settings 

430 mode: CompletionMode = CompletionMode.CHAT 

431 system_prompt: str | None = None 

432 response_format: str | None = None # 'text' or 'json' 

433 

434 # Function calling 

435 functions: List[Dict[str, Any]] | None = None 

436 function_call: Union[str, Dict[str, str]] | None = None # 'auto', 'none', or specific function 

437 

438 # Streaming 

439 stream: bool = False 

440 stream_callback: Callable[[LLMStreamResponse], None] | None = None 

441 

442 # Rate limiting 

443 rate_limit: int | None = None # Requests per minute 

444 retry_count: int = 3 

445 retry_delay: float = 1.0 

446 timeout: float = 60.0 

447 

448 # Advanced settings 

449 seed: int | None = None # For reproducibility 

450 logit_bias: Dict[str, float] | None = None 

451 user_id: str | None = None 

452 

453 # Provider-specific options 

454 options: Dict[str, Any] = field(default_factory=dict) 

455 

456 @classmethod 

457 def from_dict(cls, config_dict: Dict[str, Any]) -> "LLMConfig": 

458 """Create LLMConfig from a dictionary. 

459 

460 This method handles dictionaries from dataknobs Config objects, 

461 which may include 'type', 'name', and 'factory' attributes. 

462 These attributes are ignored during LLMConfig construction. 

463 

464 Args: 

465 config_dict: Configuration dictionary 

466 

467 Returns: 

468 LLMConfig instance 

469 """ 

470 # Filter out Config-specific attributes 

471 config_data = { 

472 k: v for k, v in config_dict.items() 

473 if k not in ('type', 'name', 'factory') 

474 } 

475 

476 # Handle mode conversion if it's a string 

477 if 'mode' in config_data and isinstance(config_data['mode'], str): 

478 config_data['mode'] = CompletionMode(config_data['mode']) 

479 

480 # Get dataclass fields to filter unknown attributes 

481 valid_fields = {f.name for f in cls.__dataclass_fields__.values()} 

482 filtered_data = {k: v for k, v in config_data.items() if k in valid_fields} 

483 

484 return cls(**filtered_data) 

485 

486 def to_dict(self, include_config_attrs: bool = False) -> Dict[str, Any]: 

487 """Convert LLMConfig to a dictionary. 

488 

489 Args: 

490 include_config_attrs: If True, includes 'type' attribute for Config compatibility 

491 

492 Returns: 

493 Configuration dictionary 

494 """ 

495 result = {} 

496 

497 for field_info in self.__dataclass_fields__.values(): 

498 value = getattr(self, field_info.name) 

499 

500 # Handle enum conversion 

501 if isinstance(value, Enum): 

502 result[field_info.name] = value.value 

503 # Skip None values for optional fields 

504 elif value is not None: 

505 result[field_info.name] = value 

506 # Include default factories even if empty for certain fields 

507 elif field_info.name == 'options': 

508 result[field_info.name] = {} 

509 

510 # Optionally add Config-compatible type attribute 

511 if include_config_attrs: 

512 result['type'] = 'llm' 

513 

514 return result 

515 

516 def clone(self, **overrides: Any) -> "LLMConfig": 

517 """Create a copy of this config with optional overrides. 

518 

519 This method is useful for creating runtime configuration variations 

520 without mutating the original config. All dataclass fields can be 

521 overridden via keyword arguments. 

522 

523 Args: 

524 **overrides: Field values to override in the cloned config 

525 

526 Returns: 

527 New LLMConfig instance with overrides applied 

528 

529 Example: 

530 >>> base_config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7) 

531 >>> creative_config = base_config.clone(temperature=1.2, max_tokens=500) 

532 """ 

533 from dataclasses import replace 

534 return replace(self, **overrides) 

535 

536 

537def normalize_llm_config(config: Union["LLMConfig", Config, Dict[str, Any]]) -> "LLMConfig": 

538 """Normalize various config formats to LLMConfig. 

539 

540 This helper function accepts LLMConfig instances, dataknobs Config objects, 

541 or plain dictionaries and returns a standardized LLMConfig instance. 

542 

543 Args: 

544 config: Configuration as LLMConfig, Config object, or dictionary 

545 

546 Returns: 

547 LLMConfig instance 

548 

549 Raises: 

550 TypeError: If config type is not supported 

551 """ 

552 # Already an LLMConfig instance 

553 if isinstance(config, LLMConfig): 

554 return config 

555 

556 # Dictionary (possibly from Config.get()) 

557 if isinstance(config, dict): 

558 return LLMConfig.from_dict(config) 

559 

560 # dataknobs Config object - try to get the config dict 

561 # We check for the get method to identify Config objects 

562 if hasattr(config, 'get') and hasattr(config, 'get_types'): 

563 # It's a Config object, extract the llm configuration 

564 # Try to get first llm config, or fall back to first available type 

565 try: 

566 config_dict = config.get('llm', 0) 

567 except Exception as e: 

568 # If no 'llm' type, try to get first available config of any type 

569 types = config.get_types() 

570 if types: 

571 config_dict = config.get(types[0], 0) 

572 else: 

573 raise ValueError("Config object has no configurations") from e 

574 

575 return LLMConfig.from_dict(config_dict) 

576 

577 raise TypeError( 

578 f"Unsupported config type: {type(config).__name__}. " 

579 f"Expected LLMConfig, Config, or dict." 

580 ) 

581 

582 

583class LLMProvider(ABC): 

584 """Base LLM provider interface.""" 

585 

586 def __init__( 

587 self, 

588 config: Union[LLMConfig, Config, Dict[str, Any]], 

589 prompt_builder: Union[PromptBuilder, AsyncPromptBuilder] | None = None 

590 ): 

591 """Initialize provider with configuration. 

592 

593 Args: 

594 config: Configuration as LLMConfig, dataknobs Config object, or dict 

595 prompt_builder: Optional prompt builder for integrated prompting 

596 """ 

597 self.config = normalize_llm_config(config) 

598 self.prompt_builder = prompt_builder 

599 self._client = None 

600 self._is_initialized = False 

601 

602 def _validate_prompt_builder(self, expected_type: type) -> None: 

603 """Validate that prompt builder is configured and of correct type. 

604 

605 Args: 

606 expected_type: Expected builder type (PromptBuilder or AsyncPromptBuilder) 

607 

608 Raises: 

609 ValueError: If prompt_builder not configured 

610 TypeError: If prompt_builder is wrong type 

611 """ 

612 if not self.prompt_builder: 

613 raise ValueError( 

614 "No prompt_builder configured. Pass prompt_builder to __init__() " 

615 "or use complete() directly with pre-rendered messages." 

616 ) 

617 

618 if not isinstance(self.prompt_builder, expected_type): 

619 raise TypeError( 

620 f"{self.__class__.__name__} requires {expected_type.__name__}, " 

621 f"got {type(self.prompt_builder).__name__}" 

622 ) 

623 

624 def _validate_render_params( 

625 self, 

626 prompt_type: str 

627 ) -> None: 

628 """Validate render parameters. 

629 

630 Args: 

631 prompt_type: Type of prompt to render 

632 

633 Raises: 

634 ValueError: If prompt_type is invalid 

635 """ 

636 if prompt_type not in ("system", "user", "both"): 

637 raise ValueError( 

638 f"Invalid prompt_type: {prompt_type}. " 

639 f"Must be 'system', 'user', or 'both'" 

640 ) 

641 

642 @abstractmethod 

643 def initialize(self) -> None: 

644 """Initialize the LLM client.""" 

645 pass 

646 

647 @abstractmethod 

648 def close(self) -> None: 

649 """Close the LLM client.""" 

650 pass 

651 

652 @abstractmethod 

653 def validate_model(self) -> bool: 

654 """Validate that the model is available.""" 

655 pass 

656 

657 @abstractmethod 

658 def get_capabilities(self) -> List[ModelCapability]: 

659 """Get model capabilities.""" 

660 pass 

661 

662 @property 

663 def is_initialized(self) -> bool: 

664 """Check if provider is initialized.""" 

665 return self._is_initialized 

666 

667 def __enter__(self): 

668 """Context manager entry.""" 

669 self.initialize() 

670 return self 

671 

672 def __exit__(self, exc_type, exc_val, exc_tb): 

673 """Context manager exit.""" 

674 self.close() 

675 

676 

677class AsyncLLMProvider(LLMProvider): 

678 """Async LLM provider interface.""" 

679 

680 @abstractmethod 

681 async def complete( 

682 self, 

683 messages: Union[str, List[LLMMessage]], 

684 **kwargs 

685 ) -> LLMResponse: 

686 """Generate completion asynchronously. 

687 

688 Primary method for getting LLM responses. Accepts either a simple 

689 string prompt or a list of LLMMessage objects for multi-turn 

690 conversations. This is the recommended async method for most use cases. 

691 

692 Args: 

693 messages: Either a single string prompt or a list of LLMMessage 

694 objects for multi-turn conversations. 

695 **kwargs: Additional provider-specific parameters. Common options: 

696 - temperature (float): Sampling temperature (0.0-2.0) 

697 - max_tokens (int): Maximum tokens to generate 

698 - top_p (float): Nucleus sampling parameter (0.0-1.0) 

699 - stop (List[str]): Stop sequences 

700 - presence_penalty (float): Presence penalty (-2.0 to 2.0) 

701 - frequency_penalty (float): Frequency penalty (-2.0 to 2.0) 

702 

703 Returns: 

704 LLMResponse containing generated content, usage stats, and metadata 

705 

706 Raises: 

707 ValueError: If messages format is invalid 

708 ConnectionError: If API connection fails 

709 TimeoutError: If request exceeds timeout 

710 

711 Example: 

712 ```python 

713 from dataknobs_llm import create_llm_provider 

714 from dataknobs_llm.llm.base import LLMMessage 

715 

716 llm = create_llm_provider("openai", model="gpt-4") 

717 

718 # Simple string prompt 

719 response = await llm.complete("What is Python?") 

720 print(response.content) 

721 # => "Python is a high-level programming language..." 

722 

723 # With parameters 

724 response = await llm.complete( 

725 "Write a haiku about coding", 

726 temperature=0.9, 

727 max_tokens=100 

728 ) 

729 

730 # Multi-turn conversation 

731 messages = [ 

732 LLMMessage(role="system", content="You are a helpful tutor"), 

733 LLMMessage(role="user", content="Explain recursion"), 

734 LLMMessage(role="assistant", content="Recursion is when..."), 

735 LLMMessage(role="user", content="Can you give an example?") 

736 ] 

737 response = await llm.complete(messages) 

738 

739 # Check token usage 

740 print(f"Tokens: {response.usage['total_tokens']}") 

741 print(f"Cost: ${response.cost_usd:.4f}") 

742 ``` 

743 

744 See Also: 

745 stream_complete: Streaming version 

746 render_and_complete: Complete with prompt rendering 

747 """ 

748 pass 

749 

750 async def render_and_complete( 

751 self, 

752 prompt_name: str, 

753 params: Dict[str, Any] | None = None, 

754 prompt_type: str = "user", 

755 index: int = 0, 

756 include_rag: bool = True, 

757 **llm_kwargs 

758 ) -> LLMResponse: 

759 """Render prompt from library and execute LLM completion. 

760 

761 This is a convenience method for one-off interactions that combines 

762 prompt rendering with LLM execution. For multi-turn conversations, 

763 use ConversationManager instead. 

764 

765 Args: 

766 prompt_name: Name of prompt in library 

767 params: Parameters for template rendering 

768 prompt_type: Type of prompt ("system", "user", or "both") 

769 index: Prompt variant index (for user prompts) 

770 include_rag: Whether to execute RAG searches 

771 **llm_kwargs: Additional arguments passed to complete() 

772 

773 Returns: 

774 LLM response 

775 

776 Raises: 

777 ValueError: If prompt_builder not configured or invalid prompt_type 

778 TypeError: If prompt_builder is not AsyncPromptBuilder 

779 

780 Example: 

781 >>> llm = OpenAIProvider(config, prompt_builder=builder) 

782 >>> result = await llm.render_and_complete( 

783 ... "analyze_code", 

784 ... params={"code": code, "language": "python"} 

785 ... ) 

786 """ 

787 # Validate 

788 from dataknobs_llm.prompts import AsyncPromptBuilder 

789 self._validate_prompt_builder(AsyncPromptBuilder) 

790 self._validate_render_params(prompt_type) 

791 

792 # Render messages 

793 messages = await self._render_messages( 

794 prompt_name, params, prompt_type, index, include_rag 

795 ) 

796 

797 # Execute LLM 

798 return await self.complete(messages, **llm_kwargs) 

799 

800 async def render_and_stream( 

801 self, 

802 prompt_name: str, 

803 params: Dict[str, Any] | None = None, 

804 prompt_type: str = "user", 

805 index: int = 0, 

806 include_rag: bool = True, 

807 **llm_kwargs 

808 ) -> AsyncIterator[LLMStreamResponse]: 

809 """Render prompt and stream LLM response. 

810 

811 Same as render_and_complete() but returns streaming response. 

812 

813 Args: 

814 prompt_name: Name of prompt in library 

815 params: Parameters for template rendering 

816 prompt_type: Type of prompt ("system", "user", or "both") 

817 index: Prompt variant index 

818 include_rag: Whether to execute RAG searches 

819 **llm_kwargs: Additional arguments passed to stream_complete() 

820 

821 Yields: 

822 Streaming response chunks 

823 

824 Raises: 

825 ValueError: If prompt_builder not configured or invalid prompt_type 

826 TypeError: If prompt_builder is not AsyncPromptBuilder 

827 

828 Example: 

829 >>> async for chunk in llm.render_and_stream("analyze_code", params={"code": code}): 

830 ... print(chunk.delta, end="") 

831 """ 

832 # Validate 

833 from dataknobs_llm.prompts import AsyncPromptBuilder 

834 self._validate_prompt_builder(AsyncPromptBuilder) 

835 self._validate_render_params(prompt_type) 

836 

837 # Render messages 

838 messages = await self._render_messages( 

839 prompt_name, params, prompt_type, index, include_rag 

840 ) 

841 

842 # Stream LLM response 

843 async for chunk in self.stream_complete(messages, **llm_kwargs): 

844 yield chunk 

845 

846 async def _render_messages( 

847 self, 

848 prompt_name: str, 

849 params: Dict[str, Any] | None, 

850 prompt_type: str, 

851 index: int, 

852 include_rag: bool 

853 ) -> List[LLMMessage]: 

854 """Render messages from prompt library (async version). 

855 

856 Args: 

857 prompt_name: Name of prompt in library 

858 params: Parameters for template rendering 

859 prompt_type: Type of prompt ("system", "user", or "both") 

860 index: Prompt variant index 

861 include_rag: Whether to execute RAG searches 

862 

863 Returns: 

864 List of rendered LLM messages 

865 """ 

866 from dataknobs_llm.prompts import AsyncPromptBuilder 

867 builder: AsyncPromptBuilder = self.prompt_builder # type: ignore 

868 

869 messages: List[LLMMessage] = [] 

870 params = params or {} 

871 

872 if prompt_type in ("system", "both"): 

873 result = await builder.render_system_prompt( 

874 prompt_name, params=params, include_rag=include_rag 

875 ) 

876 messages.append(LLMMessage(role="system", content=result.content)) 

877 

878 if prompt_type in ("user", "both"): 

879 result = await builder.render_user_prompt( 

880 prompt_name, index=index, params=params, include_rag=include_rag 

881 ) 

882 messages.append(LLMMessage(role="user", content=result.content)) 

883 

884 return messages 

885 

886 @abstractmethod 

887 async def stream_complete( 

888 self, 

889 messages: Union[str, List[LLMMessage]], 

890 **kwargs 

891 ) -> AsyncIterator[LLMStreamResponse]: 

892 r"""Generate streaming completion asynchronously. 

893 

894 Streams response chunks as they are generated, enabling real-time 

895 display of LLM output. Each chunk contains incremental content 

896 (delta), and the final chunk includes usage statistics. 

897 

898 Args: 

899 messages: Either a single string prompt or list of LLMMessage objects 

900 **kwargs: Provider-specific parameters (same as complete()) 

901 

902 Yields: 

903 LLMStreamResponse chunks containing incremental content. The final 

904 chunk has is_final=True and includes finish_reason and usage stats. 

905 

906 Raises: 

907 ValueError: If messages format is invalid 

908 ConnectionError: If API connection fails 

909 TimeoutError: If request exceeds timeout 

910 

911 Example: 

912 ```python 

913 from dataknobs_llm import create_llm_provider 

914 

915 llm = create_llm_provider("openai", model="gpt-4") 

916 

917 # Stream and display in real-time 

918 async for chunk in llm.stream_complete("Tell me a story"): 

919 print(chunk.delta, end="", flush=True) 

920 

921 if chunk.is_final: 

922 print(f"\n\nFinished: {chunk.finish_reason}") 

923 print(f"Total tokens: {chunk.usage['total_tokens']}") 

924 

925 # Accumulate full response 

926 full_text = "" 

927 chunk_count = 0 

928 

929 async for chunk in llm.stream_complete("Explain quantum computing"): 

930 full_text += chunk.delta 

931 chunk_count += 1 

932 

933 print(f"Received {chunk_count} chunks") 

934 print(f"Total length: {len(full_text)} characters") 

935 

936 # Stream with progress callback 

937 async def stream_with_progress(prompt: str): 

938 chunks = [] 

939 async for chunk in llm.stream_complete(prompt): 

940 chunks.append(chunk) 

941 # Update progress UI 

942 if len(chunks) % 5 == 0: 

943 print(f"Processing... ({len(chunks)} chunks)") 

944 return "".join(c.delta for c in chunks) 

945 

946 result = await stream_with_progress("Write a tutorial") 

947 ``` 

948 

949 See Also: 

950 complete: Non-streaming version 

951 render_and_stream: Stream with prompt rendering 

952 LLMStreamResponse: Chunk data structure 

953 """ 

954 pass 

955 

956 @abstractmethod 

957 async def embed( 

958 self, 

959 texts: Union[str, List[str]], 

960 **kwargs 

961 ) -> Union[List[float], List[List[float]]]: 

962 """Generate embeddings asynchronously. 

963 

964 Converts text into dense vector representations for semantic search, 

965 clustering, and similarity comparison. Returns high-dimensional 

966 vectors (typically 768-1536 dimensions depending on model). 

967 

968 Args: 

969 texts: Single text string or list of texts to embed 

970 **kwargs: Provider-specific parameters: 

971 - model (str): Embedding model override 

972 - dimensions (int): Target dimensions (if supported) 

973 

974 Returns: 

975 Single embedding vector (List[float]) if input is a string, 

976 or list of vectors (List[List[float]]) if input is a list 

977 

978 Raises: 

979 ValueError: If texts is empty or invalid 

980 ConnectionError: If API connection fails 

981 

982 Example: 

983 ```python 

984 from dataknobs_llm import create_llm_provider 

985 import numpy as np 

986 

987 # Create embedding provider 

988 llm = create_llm_provider( 

989 "openai", 

990 model="text-embedding-ada-002" 

991 ) 

992 

993 # Single text embedding 

994 embedding = await llm.embed("What is machine learning?") 

995 print(f"Dimensions: {len(embedding)}") 

996 # => Dimensions: 1536 

997 

998 # Batch embedding 

999 texts = [ 

1000 "Python is a programming language", 

1001 "JavaScript is used for web development", 

1002 "Machine learning uses statistical methods" 

1003 ] 

1004 embeddings = await llm.embed(texts) 

1005 print(f"Generated {len(embeddings)} embeddings") 

1006 

1007 # Compute similarity 

1008 def cosine_similarity(v1, v2): 

1009 return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) 

1010 

1011 query_emb = await llm.embed("Tell me about ML") 

1012 similarities = [ 

1013 cosine_similarity(query_emb, emb) 

1014 for emb in embeddings 

1015 ] 

1016 most_similar_idx = np.argmax(similarities) 

1017 print(f"Most similar: {texts[most_similar_idx]}") 

1018 # => Most similar: Machine learning uses statistical methods 

1019 

1020 # Store in vector database 

1021 from dataknobs_data import database_factory 

1022 db = database_factory.create("vector_db") 

1023 for text, emb in zip(texts, embeddings): 

1024 db.create({"text": text, "embedding": emb}) 

1025 ``` 

1026 

1027 See Also: 

1028 complete: Text generation method 

1029 """ 

1030 pass 

1031 

1032 @abstractmethod 

1033 async def function_call( 

1034 self, 

1035 messages: List[LLMMessage], 

1036 functions: List[Dict[str, Any]], 

1037 **kwargs 

1038 ) -> LLMResponse: 

1039 """Execute function calling asynchronously. 

1040 

1041 Enables the LLM to call external functions/tools. The model decides 

1042 which function to call based on the conversation context, and returns 

1043 the function name and arguments in a structured format. 

1044 

1045 Args: 

1046 messages: Conversation messages leading up to the function call 

1047 functions: List of function definitions in JSON Schema format. 

1048 Each function dict must have: 

1049 - name (str): Function name 

1050 - description (str): What the function does 

1051 - parameters (dict): JSON Schema for parameters 

1052 **kwargs: Provider-specific parameters: 

1053 - function_call (str|dict): 'auto', 'none', or specific function 

1054 - temperature (float): Sampling temperature 

1055 - max_tokens (int): Maximum response tokens 

1056 

1057 Returns: 

1058 LLMResponse with function_call field populated containing: 

1059 - name (str): Function to call 

1060 - arguments (str): JSON string of arguments 

1061 

1062 Raises: 

1063 ValueError: If functions format is invalid 

1064 ConnectionError: If API connection fails 

1065 

1066 Example: 

1067 ```python 

1068 from dataknobs_llm import create_llm_provider 

1069 from dataknobs_llm.llm.base import LLMMessage 

1070 import json 

1071 

1072 llm = create_llm_provider("openai", model="gpt-4") 

1073 

1074 # Define available functions 

1075 functions = [ 

1076 { 

1077 "name": "search_docs", 

1078 "description": "Search documentation for information", 

1079 "parameters": { 

1080 "type": "object", 

1081 "properties": { 

1082 "query": { 

1083 "type": "string", 

1084 "description": "Search query" 

1085 }, 

1086 "limit": { 

1087 "type": "integer", 

1088 "description": "Max results" 

1089 } 

1090 }, 

1091 "required": ["query"] 

1092 } 

1093 }, 

1094 { 

1095 "name": "execute_code", 

1096 "description": "Execute Python code", 

1097 "parameters": { 

1098 "type": "object", 

1099 "properties": { 

1100 "code": {"type": "string"} 

1101 }, 

1102 "required": ["code"] 

1103 } 

1104 } 

1105 ] 

1106 

1107 # Ask question that requires function 

1108 messages = [ 

1109 LLMMessage( 

1110 role="user", 

1111 content="Search for information about async/await in Python" 

1112 ) 

1113 ] 

1114 

1115 # Model decides to call function 

1116 response = await llm.function_call(messages, functions) 

1117 

1118 if response.function_call: 

1119 func_name = response.function_call["name"] 

1120 func_args = json.loads(response.function_call["arguments"]) 

1121 

1122 print(f"Function: {func_name}") 

1123 print(f"Arguments: {func_args}") 

1124 # => Function: search_docs 

1125 # => Arguments: {'query': 'async/await Python', 'limit': 5} 

1126 

1127 # Execute function 

1128 results = search_docs(**func_args) 

1129 

1130 # Add function result to conversation 

1131 messages.append(LLMMessage( 

1132 role="function", 

1133 name=func_name, 

1134 content=json.dumps(results) 

1135 )) 

1136 

1137 # Get final response 

1138 final = await llm.complete(messages) 

1139 print(final.content) 

1140 ``` 

1141 

1142 See Also: 

1143 complete: Standard completion without functions 

1144 dataknobs_llm.tools: Tool abstraction framework 

1145 """ 

1146 pass 

1147 

1148 async def initialize(self) -> None: 

1149 """Initialize the async LLM client.""" 

1150 self._is_initialized = True 

1151 

1152 async def close(self) -> None: 

1153 """Close the async LLM client.""" 

1154 self._is_initialized = False 

1155 

1156 async def __aenter__(self): 

1157 """Async context manager entry.""" 

1158 await self.initialize() 

1159 return self 

1160 

1161 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1162 """Async context manager exit.""" 

1163 await self.close() 

1164 

1165 

1166class SyncLLMProvider(LLMProvider): 

1167 """Synchronous LLM provider interface.""" 

1168 

1169 @abstractmethod 

1170 def complete( 

1171 self, 

1172 messages: Union[str, List[LLMMessage]], 

1173 **kwargs 

1174 ) -> LLMResponse: 

1175 """Generate completion synchronously. 

1176 

1177 Args: 

1178 messages: Input messages or prompt 

1179 **kwargs: Additional parameters 

1180 

1181 Returns: 

1182 LLM response 

1183 """ 

1184 pass 

1185 

1186 def render_and_complete( 

1187 self, 

1188 prompt_name: str, 

1189 params: Dict[str, Any] | None = None, 

1190 prompt_type: str = "user", 

1191 index: int = 0, 

1192 include_rag: bool = True, 

1193 **llm_kwargs 

1194 ) -> LLMResponse: 

1195 """Render prompt from library and execute LLM completion. 

1196 

1197 This is a convenience method for one-off interactions that combines 

1198 prompt rendering with LLM execution. For multi-turn conversations, 

1199 use ConversationManager instead. 

1200 

1201 Args: 

1202 prompt_name: Name of prompt in library 

1203 params: Parameters for template rendering 

1204 prompt_type: Type of prompt ("system", "user", or "both") 

1205 index: Prompt variant index (for user prompts) 

1206 include_rag: Whether to execute RAG searches 

1207 **llm_kwargs: Additional arguments passed to complete() 

1208 

1209 Returns: 

1210 LLM response 

1211 

1212 Raises: 

1213 ValueError: If prompt_builder not configured or invalid prompt_type 

1214 TypeError: If prompt_builder is not PromptBuilder 

1215 

1216 Example: 

1217 >>> llm = SyncOpenAIProvider(config, prompt_builder=builder) 

1218 >>> result = llm.render_and_complete( 

1219 ... "analyze_code", 

1220 ... params={"code": code, "language": "python"} 

1221 ... ) 

1222 """ 

1223 # Validate 

1224 from dataknobs_llm.prompts import PromptBuilder 

1225 self._validate_prompt_builder(PromptBuilder) 

1226 self._validate_render_params(prompt_type) 

1227 

1228 # Render messages 

1229 messages = self._render_messages( 

1230 prompt_name, params, prompt_type, index, include_rag 

1231 ) 

1232 

1233 # Execute LLM 

1234 return self.complete(messages, **llm_kwargs) 

1235 

1236 def render_and_stream( 

1237 self, 

1238 prompt_name: str, 

1239 params: Dict[str, Any] | None = None, 

1240 prompt_type: str = "user", 

1241 index: int = 0, 

1242 include_rag: bool = True, 

1243 **llm_kwargs 

1244 ) -> Iterator[LLMStreamResponse]: 

1245 """Render prompt and stream LLM response. 

1246 

1247 Same as render_and_complete() but returns streaming response. 

1248 

1249 Args: 

1250 prompt_name: Name of prompt in library 

1251 params: Parameters for template rendering 

1252 prompt_type: Type of prompt ("system", "user", or "both") 

1253 index: Prompt variant index 

1254 include_rag: Whether to execute RAG searches 

1255 **llm_kwargs: Additional arguments passed to stream_complete() 

1256 

1257 Yields: 

1258 Streaming response chunks 

1259 

1260 Raises: 

1261 ValueError: If prompt_builder not configured or invalid prompt_type 

1262 TypeError: If prompt_builder is not PromptBuilder 

1263 

1264 Example: 

1265 >>> for chunk in llm.render_and_stream("analyze_code", params={"code": code}): 

1266 ... print(chunk.delta, end="") 

1267 """ 

1268 # Validate 

1269 from dataknobs_llm.prompts import PromptBuilder 

1270 self._validate_prompt_builder(PromptBuilder) 

1271 self._validate_render_params(prompt_type) 

1272 

1273 # Render messages 

1274 messages = self._render_messages( 

1275 prompt_name, params, prompt_type, index, include_rag 

1276 ) 

1277 

1278 # Stream LLM response 

1279 for chunk in self.stream_complete(messages, **llm_kwargs): 

1280 yield chunk 

1281 

1282 def _render_messages( 

1283 self, 

1284 prompt_name: str, 

1285 params: Dict[str, Any] | None, 

1286 prompt_type: str, 

1287 index: int, 

1288 include_rag: bool 

1289 ) -> List[LLMMessage]: 

1290 """Render messages from prompt library (sync version). 

1291 

1292 Args: 

1293 prompt_name: Name of prompt in library 

1294 params: Parameters for template rendering 

1295 prompt_type: Type of prompt ("system", "user", or "both") 

1296 index: Prompt variant index 

1297 include_rag: Whether to execute RAG searches 

1298 

1299 Returns: 

1300 List of rendered LLM messages 

1301 """ 

1302 from dataknobs_llm.prompts import PromptBuilder 

1303 builder: PromptBuilder = self.prompt_builder # type: ignore 

1304 

1305 messages: List[LLMMessage] = [] 

1306 params = params or {} 

1307 

1308 if prompt_type in ("system", "both"): 

1309 result = builder.render_system_prompt( 

1310 prompt_name, params=params, include_rag=include_rag 

1311 ) 

1312 messages.append(LLMMessage(role="system", content=result.content)) 

1313 

1314 if prompt_type in ("user", "both"): 

1315 result = builder.render_user_prompt( 

1316 prompt_name, index=index, params=params, include_rag=include_rag 

1317 ) 

1318 messages.append(LLMMessage(role="user", content=result.content)) 

1319 

1320 return messages 

1321 

1322 @abstractmethod 

1323 def stream_complete( 

1324 self, 

1325 messages: Union[str, List[LLMMessage]], 

1326 **kwargs 

1327 ) -> Iterator[LLMStreamResponse]: 

1328 """Generate streaming completion synchronously. 

1329 

1330 Args: 

1331 messages: Input messages or prompt 

1332 **kwargs: Additional parameters 

1333 

1334 Yields: 

1335 Streaming response chunks 

1336 """ 

1337 pass 

1338 

1339 @abstractmethod 

1340 def embed( 

1341 self, 

1342 texts: Union[str, List[str]], 

1343 **kwargs 

1344 ) -> Union[List[float], List[List[float]]]: 

1345 """Generate embeddings synchronously. 

1346  

1347 Args: 

1348 texts: Input text(s) 

1349 **kwargs: Additional parameters 

1350  

1351 Returns: 

1352 Embedding vector(s) 

1353 """ 

1354 pass 

1355 

1356 @abstractmethod 

1357 def function_call( 

1358 self, 

1359 messages: List[LLMMessage], 

1360 functions: List[Dict[str, Any]], 

1361 **kwargs 

1362 ) -> LLMResponse: 

1363 """Execute function calling synchronously. 

1364  

1365 Args: 

1366 messages: Conversation messages 

1367 functions: Available functions 

1368 **kwargs: Additional parameters 

1369  

1370 Returns: 

1371 Response with function call 

1372 """ 

1373 pass 

1374 

1375 def initialize(self) -> None: 

1376 """Initialize the sync LLM client.""" 

1377 self._is_initialized = True 

1378 

1379 def close(self) -> None: 

1380 """Close the sync LLM client.""" 

1381 self._is_initialized = False 

1382 

1383 

1384class LLMAdapter(ABC): 

1385 """Base adapter for converting between different LLM formats. 

1386 

1387 Adapters translate between the standard dataknobs LLM format 

1388 (LLMMessage, LLMResponse, LLMConfig) and provider-specific formats 

1389 (OpenAI, Anthropic, etc.). Each provider implementation should 

1390 have a corresponding adapter. 

1391 

1392 This enables provider-agnostic code that works across different 

1393 LLM APIs without modification. 

1394 

1395 Example: 

1396 ```python 

1397 from dataknobs_llm.llm.base import LLMAdapter, LLMMessage, LLMResponse 

1398 from typing import Any, List, Dict 

1399 

1400 class MyProviderAdapter(LLMAdapter): 

1401 \"\"\"Adapter for custom LLM provider.\"\"\" 

1402 

1403 def adapt_messages( 

1404 self, 

1405 messages: List[LLMMessage] 

1406 ) -> List[Dict[str, str]]: 

1407 \"\"\"Convert to provider format.\"\"\" 

1408 return [ 

1409 {"role": msg.role, "content": msg.content} 

1410 for msg in messages 

1411 ] 

1412 

1413 def adapt_response( 

1414 self, 

1415 response: Any 

1416 ) -> LLMResponse: 

1417 \"\"\"Convert from provider format.\"\"\" 

1418 return LLMResponse( 

1419 content=response["text"], 

1420 model=response["model_id"], 

1421 usage={ 

1422 "total_tokens": response["tokens_used"] 

1423 } 

1424 ) 

1425 

1426 def adapt_config( 

1427 self, 

1428 config: LLMConfig 

1429 ) -> Dict[str, Any]: 

1430 \"\"\"Convert config to provider format.\"\"\" 

1431 return { 

1432 "model_name": config.model, 

1433 "temp": config.temperature, 

1434 "max_length": config.max_tokens 

1435 } 

1436 

1437 # Use adapter in provider 

1438 adapter = MyProviderAdapter() 

1439 provider_messages = adapter.adapt_messages(messages) 

1440 ``` 

1441 

1442 See Also: 

1443 LLMProvider: Base provider interface 

1444 dataknobs_llm.llm.providers.OpenAIAdapter: Example implementation 

1445 """ 

1446 

1447 @abstractmethod 

1448 def adapt_messages( 

1449 self, 

1450 messages: List[LLMMessage] 

1451 ) -> Any: 

1452 """Adapt messages to provider format. 

1453 

1454 Args: 

1455 messages: Standard LLMMessage list 

1456 

1457 Returns: 

1458 Provider-specific message format 

1459 """ 

1460 pass 

1461 

1462 @abstractmethod 

1463 def adapt_response( 

1464 self, 

1465 response: Any 

1466 ) -> LLMResponse: 

1467 """Adapt provider response to standard format. 

1468 

1469 Args: 

1470 response: Provider-specific response object 

1471 

1472 Returns: 

1473 Standard LLMResponse 

1474 """ 

1475 pass 

1476 

1477 @abstractmethod 

1478 def adapt_config( 

1479 self, 

1480 config: LLMConfig 

1481 ) -> Dict[str, Any]: 

1482 """Adapt configuration to provider format. 

1483 

1484 Args: 

1485 config: Standard LLMConfig 

1486 

1487 Returns: 

1488 Provider-specific config dict 

1489 """ 

1490 pass 

1491 

1492 

1493class LLMMiddleware(Protocol): 

1494 """Protocol for LLM middleware. 

1495 

1496 Middleware provides hooks to transform requests before they're sent 

1497 to the LLM and responses before they're returned to the caller. 

1498 Useful for logging, caching, content filtering, rate limiting, etc. 

1499 

1500 Middleware can accept configuration as LLMConfig, dataknobs Config, or dict. 

1501 

1502 Example: 

1503 ```python 

1504 from dataknobs_llm.llm.base import ( 

1505 LLMMiddleware, LLMMessage, LLMResponse, LLMConfig 

1506 ) 

1507 from typing import List, Union, Dict, Any 

1508 import logging 

1509 

1510 class LoggingMiddleware: 

1511 \"\"\"Logs all LLM requests and responses.\"\"\" 

1512 

1513 def __init__(self): 

1514 self.logger = logging.getLogger(__name__) 

1515 

1516 async def process_request( 

1517 self, 

1518 messages: List[LLMMessage], 

1519 config: Union[LLMConfig, Config, Dict[str, Any]] 

1520 ) -> List[LLMMessage]: 

1521 \"\"\"Log request before sending.\"\"\" 

1522 self.logger.info(f"Request: {len(messages)} messages") 

1523 for msg in messages: 

1524 self.logger.debug(f" {msg.role}: {msg.content[:50]}...") 

1525 return messages 

1526 

1527 async def process_response( 

1528 self, 

1529 response: LLMResponse, 

1530 config: Union[LLMConfig, Config, Dict[str, Any]] 

1531 ) -> LLMResponse: 

1532 \"\"\"Log response after receiving.\"\"\" 

1533 self.logger.info(f"Response: {len(response.content)} chars") 

1534 self.logger.info(f"Tokens: {response.usage['total_tokens']}") 

1535 if response.cost_usd: 

1536 self.logger.info(f"Cost: ${response.cost_usd:.4f}") 

1537 return response 

1538 

1539 

1540 class ContentFilterMiddleware: 

1541 \"\"\"Filters sensitive content.\"\"\" 

1542 

1543 def __init__(self, blocked_words: List[str]): 

1544 self.blocked_words = blocked_words 

1545 

1546 async def process_request( 

1547 self, 

1548 messages: List[LLMMessage], 

1549 config: Union[LLMConfig, Config, Dict[str, Any]] 

1550 ) -> List[LLMMessage]: 

1551 \"\"\"Filter input messages.\"\"\" 

1552 filtered = [] 

1553 for msg in messages: 

1554 content = msg.content 

1555 for word in self.blocked_words: 

1556 content = content.replace(word, "***") 

1557 filtered.append(LLMMessage( 

1558 role=msg.role, 

1559 content=content, 

1560 name=msg.name, 

1561 function_call=msg.function_call, 

1562 metadata=msg.metadata 

1563 )) 

1564 return filtered 

1565 

1566 async def process_response( 

1567 self, 

1568 response: LLMResponse, 

1569 config: Union[LLMConfig, Config, Dict[str, Any]] 

1570 ) -> LLMResponse: 

1571 \"\"\"Filter output.\"\"\" 

1572 content = response.content 

1573 for word in self.blocked_words: 

1574 content = content.replace(word, "***") 

1575 

1576 from dataclasses import replace 

1577 return replace(response, content=content) 

1578 

1579 

1580 # Use with ConversationManager 

1581 from dataknobs_llm.conversations import ConversationManager 

1582 

1583 manager = await ConversationManager.create( 

1584 llm=llm, 

1585 prompt_builder=builder, 

1586 middleware=[ 

1587 LoggingMiddleware(), 

1588 ContentFilterMiddleware(["password", "secret"]) 

1589 ] 

1590 ) 

1591 ``` 

1592 

1593 See Also: 

1594 ConversationManager: Uses middleware for request/response processing 

1595 """ 

1596 

1597 async def process_request( 

1598 self, 

1599 messages: List[LLMMessage], 

1600 config: Union[LLMConfig, Config, Dict[str, Any]] 

1601 ) -> List[LLMMessage]: 

1602 """Process request before sending to LLM. 

1603 

1604 Transform, log, validate, or filter messages before they are 

1605 sent to the LLM provider. 

1606 

1607 Args: 

1608 messages: Input messages to be sent to LLM 

1609 config: Configuration (LLMConfig, Config, or dict) 

1610 

1611 Returns: 

1612 Processed messages (can be modified, added to, or filtered) 

1613 

1614 Raises: 

1615 ValueError: If messages are invalid 

1616 """ 

1617 ... 

1618 

1619 async def process_response( 

1620 self, 

1621 response: LLMResponse, 

1622 config: Union[LLMConfig, Config, Dict[str, Any]] 

1623 ) -> LLMResponse: 

1624 """Process response from LLM. 

1625 

1626 Transform, log, validate, or filter the LLM response before 

1627 returning to the caller. 

1628 

1629 Args: 

1630 response: LLM response to process 

1631 config: Configuration (LLMConfig, Config, or dict) 

1632 

1633 Returns: 

1634 Processed response (can be modified) 

1635 

1636 Raises: 

1637 ValueError: If response is invalid 

1638 """ 

1639 ...