Coverage for src / dataknobs_llm / llm / base.py: 66%

270 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 11:16 -0700

1"""Base LLM abstraction components. 

2 

3This module provides the base abstractions for unified LLM operations across 

4different providers (OpenAI, Anthropic, Ollama, etc.). It defines standard 

5interfaces for completions, streaming, embeddings, and function calling. 

6 

7The architecture follows a provider pattern where all LLM providers implement 

8common interfaces (AsyncLLMProvider or SyncLLMProvider) and use standardized 

9data structures (LLMMessage, LLMResponse, LLMConfig). 

10 

11Key Components: 

12 - LLMProvider: Base provider interface with initialization and lifecycle 

13 - AsyncLLMProvider: Async provider with complete(), stream_complete(), embed() 

14 - SyncLLMProvider: Synchronous version for non-async applications 

15 - LLMMessage: Standard message format for conversations 

16 - LLMResponse: Standard response with content, usage, and cost tracking 

17 - LLMConfig: Comprehensive configuration with 20+ parameters 

18 - LLMAdapter: Format adapters for provider-specific APIs 

19 - LLMMiddleware: Request/response processing pipeline 

20 

21Example: 

22 ```python 

23 from dataknobs_llm import create_llm_provider 

24 from dataknobs_llm.llm.base import LLMConfig, LLMMessage 

25 

26 # Create provider with config 

27 config = LLMConfig( 

28 provider="openai", 

29 model="gpt-4", 

30 temperature=0.7, 

31 max_tokens=500 

32 ) 

33 

34 # Async usage 

35 async with create_llm_provider(config) as llm: 

36 # Simple completion 

37 response = await llm.complete("What is Python?") 

38 print(response.content) 

39 

40 # Streaming 

41 async for chunk in llm.stream_complete("Tell me a story"): 

42 print(chunk.delta, end="", flush=True) 

43 

44 # Multi-turn conversation 

45 messages = [ 

46 LLMMessage(role="system", content="You are helpful"), 

47 LLMMessage(role="user", content="Hello!"), 

48 ] 

49 response = await llm.complete(messages) 

50 ``` 

51 

52See Also: 

53 - dataknobs_llm.llm.providers: Provider implementations 

54 - dataknobs_llm.conversations: Multi-turn conversation management 

55 - dataknobs_llm.prompts: Prompt rendering and RAG integration 

56""" 

57 

58from abc import ABC, abstractmethod 

59from dataclasses import dataclass, field 

60from enum import Enum 

61from typing import ( 

62 Any, Dict, List, Union, AsyncIterator, Iterator, 

63 Callable, Protocol 

64) 

65from datetime import datetime 

66 

67# Import prompt builder types - clean one-way dependency (llm depends on prompts) 

68from dataknobs_llm.prompts import AsyncPromptBuilder, PromptBuilder 

69from dataknobs_config.config import Config 

70 

71 

72class CompletionMode(Enum): 

73 """LLM completion modes. 

74 

75 Defines the operation mode for LLM requests. Different modes use 

76 different APIs and formatting requirements. 

77 

78 Attributes: 

79 CHAT: Chat completion with conversational message history 

80 TEXT: Raw text completion (legacy models) 

81 INSTRUCT: Instruction-following mode 

82 EMBEDDING: Generate vector embeddings for semantic search 

83 FUNCTION: Function/tool calling mode 

84 

85 Example: 

86 ```python 

87 from dataknobs_llm.llm.base import LLMConfig, CompletionMode 

88 

89 # Chat mode (default for modern models) 

90 config = LLMConfig( 

91 provider="openai", 

92 model="gpt-4", 

93 mode=CompletionMode.CHAT 

94 ) 

95 

96 # Embedding mode for vector search 

97 embedding_config = LLMConfig( 

98 provider="openai", 

99 model="text-embedding-ada-002", 

100 mode=CompletionMode.EMBEDDING 

101 ) 

102 ``` 

103 """ 

104 CHAT = "chat" # Chat completion with message history 

105 TEXT = "text" # Text completion 

106 INSTRUCT = "instruct" # Instruction following 

107 EMBEDDING = "embedding" # Generate embeddings 

108 FUNCTION = "function" # Function calling 

109 

110 

111class ModelCapability(Enum): 

112 """Model capabilities. 

113 

114 Enumerates the capabilities that different LLM models support. 

115 Providers use this to advertise what features are available for 

116 a specific model. 

117 

118 Attributes: 

119 TEXT_GENERATION: Basic text generation 

120 CHAT: Multi-turn conversational interactions 

121 EMBEDDINGS: Vector embedding generation 

122 FUNCTION_CALLING: Tool/function calling support 

123 VISION: Image understanding capabilities 

124 CODE: Code generation and analysis 

125 JSON_MODE: Structured JSON output 

126 STREAMING: Incremental response streaming 

127 

128 Example: 

129 ```python 

130 from dataknobs_llm import create_llm_provider 

131 from dataknobs_llm.llm.base import ModelCapability 

132 

133 # Check model capabilities 

134 llm = create_llm_provider("openai", model="gpt-4") 

135 capabilities = llm.get_capabilities() 

136 

137 if ModelCapability.STREAMING in capabilities: 

138 # Use streaming 

139 async for chunk in llm.stream_complete("Hello"): 

140 print(chunk.delta, end="") 

141 

142 if ModelCapability.FUNCTION_CALLING in capabilities: 

143 # Use function calling 

144 response = await llm.function_call(messages, functions) 

145 ``` 

146 """ 

147 TEXT_GENERATION = "text_generation" 

148 CHAT = "chat" 

149 EMBEDDINGS = "embeddings" 

150 FUNCTION_CALLING = "function_calling" 

151 VISION = "vision" 

152 CODE = "code" 

153 JSON_MODE = "json_mode" 

154 STREAMING = "streaming" 

155 

156 

157@dataclass 

158class ToolCall: 

159 """Represents a tool call from the LLM. 

160 

161 Used when the LLM wants to invoke a tool/function during reasoning. 

162 

163 Attributes: 

164 name: Name of the tool to call 

165 parameters: Arguments to pass to the tool 

166 id: Optional unique identifier for the tool call 

167 """ 

168 name: str 

169 parameters: Dict[str, Any] 

170 id: str | None = None 

171 

172 

173@dataclass 

174class LLMMessage: 

175 """Represents a message in LLM conversation. 

176 

177 Standard message format used across all providers. Messages are the 

178 fundamental unit of LLM interactions, containing role-based content 

179 for multi-turn conversations. 

180 

181 Attributes: 

182 role: Message role - 'system', 'user', 'assistant', or 'function' 

183 content: Message content text 

184 name: Optional name for function messages or multi-user scenarios 

185 function_call: Function call data for tool-using models 

186 metadata: Additional metadata (timestamps, IDs, etc.) 

187 

188 Example: 

189 ```python 

190 from dataknobs_llm.llm.base import LLMMessage 

191 

192 # System message 

193 system_msg = LLMMessage( 

194 role="system", 

195 content="You are a helpful coding assistant." 

196 ) 

197 

198 # User message 

199 user_msg = LLMMessage( 

200 role="user", 

201 content="How do I reverse a list in Python?" 

202 ) 

203 

204 # Assistant message 

205 assistant_msg = LLMMessage( 

206 role="assistant", 

207 content="Use the reverse() method or [::-1] slicing." 

208 ) 

209 

210 # Function result message 

211 function_msg = LLMMessage( 

212 role="function", 

213 name="search_docs", 

214 content='{"result": "Found 3 examples"}' 

215 ) 

216 

217 # Build conversation 

218 messages = [system_msg, user_msg, assistant_msg] 

219 ``` 

220 """ 

221 role: str # 'system', 'user', 'assistant', 'function' 

222 content: str 

223 name: str | None = None # For function messages 

224 function_call: Dict[str, Any] | None = None # For function calling 

225 metadata: Dict[str, Any] = field(default_factory=dict) 

226 

227 

228@dataclass 

229class LLMResponse: 

230 """Response from LLM. 

231 

232 Standard response format returned by all LLM providers. Contains the 

233 generated content along with metadata about token usage, cost, and 

234 completion status. 

235 

236 Attributes: 

237 content: Generated text content 

238 model: Model identifier that generated the response 

239 finish_reason: Why generation stopped - 'stop', 'length', 'function_call' 

240 usage: Token usage stats (prompt_tokens, completion_tokens, total_tokens) 

241 function_call: Function call data if model requested tool use 

242 metadata: Provider-specific metadata 

243 created_at: Response timestamp 

244 cost_usd: Estimated cost in USD for this request 

245 cumulative_cost_usd: Running total cost for conversation 

246 

247 Example: 

248 ```python 

249 from dataknobs_llm import create_llm_provider 

250 

251 llm = create_llm_provider("openai", model="gpt-4") 

252 response = await llm.complete("What is Python?") 

253 

254 # Access response data 

255 print(response.content) 

256 # => "Python is a high-level programming language..." 

257 

258 # Check token usage 

259 print(f"Tokens used: {response.usage['total_tokens']}") 

260 # => Tokens used: 87 

261 

262 # Monitor costs 

263 if response.cost_usd: 

264 print(f"Cost: ${response.cost_usd:.4f}") 

265 print(f"Total: ${response.cumulative_cost_usd:.4f}") 

266 

267 # Check completion status 

268 if response.finish_reason == "length": 

269 print("Response truncated due to max_tokens limit") 

270 ``` 

271 

272 See Also: 

273 LLMMessage: Request message format 

274 LLMStreamResponse: Streaming response format 

275 """ 

276 content: str 

277 model: str 

278 finish_reason: str | None = None # 'stop', 'length', 'function_call', 'tool_calls' 

279 usage: Dict[str, int] | None = None # tokens used 

280 function_call: Dict[str, Any] | None = None # Legacy single function call 

281 tool_calls: list["ToolCall"] | None = None # List of tool calls (preferred) 

282 metadata: Dict[str, Any] = field(default_factory=dict) 

283 created_at: datetime = field(default_factory=datetime.now) 

284 

285 # Cost tracking (optional enhancement for DynaBot) 

286 cost_usd: float | None = None # Estimated cost in USD 

287 cumulative_cost_usd: float | None = None # Running total for conversation 

288 

289 

290@dataclass 

291class LLMStreamResponse: 

292 r"""Streaming response from LLM. 

293 

294 Represents a single chunk in a streaming LLM response. Streaming 

295 allows displaying generated text incrementally as it's produced, 

296 providing better user experience for long responses. 

297 

298 Attributes: 

299 delta: Incremental content for this chunk (not cumulative) 

300 is_final: True if this is the last chunk in the stream 

301 finish_reason: Why generation stopped (only set on final chunk) 

302 usage: Token usage stats (only set on final chunk) 

303 metadata: Additional chunk metadata 

304 

305 Example: 

306 ```python 

307 from dataknobs_llm import create_llm_provider 

308 

309 llm = create_llm_provider("openai", model="gpt-4") 

310 

311 # Stream and display in real-time 

312 async for chunk in llm.stream_complete("Write a poem"): 

313 print(chunk.delta, end="", flush=True) 

314 

315 if chunk.is_final: 

316 print(f"\n\nFinished: {chunk.finish_reason}") 

317 print(f"Tokens: {chunk.usage['total_tokens']}") 

318 

319 # Accumulate full response 

320 full_text = "" 

321 chunks_received = 0 

322 

323 async for chunk in llm.stream_complete("Explain Python"): 

324 full_text += chunk.delta 

325 chunks_received += 1 

326 

327 # Optional: show progress 

328 if chunks_received % 10 == 0: 

329 print(f"Received {chunks_received} chunks...") 

330 

331 print(f"\nComplete response ({len(full_text)} chars)") 

332 print(full_text) 

333 ``` 

334 

335 See Also: 

336 LLMResponse: Non-streaming response format 

337 AsyncLLMProvider.stream_complete: Streaming method 

338 """ 

339 delta: str # Incremental content 

340 is_final: bool = False 

341 finish_reason: str | None = None 

342 usage: Dict[str, int] | None = None 

343 metadata: Dict[str, Any] = field(default_factory=dict) 

344 

345 

346@dataclass 

347class LLMConfig: 

348 """Configuration for LLM operations. 

349 

350 Comprehensive configuration for LLM providers with 20+ parameters 

351 controlling generation, rate limiting, function calling, and more. 

352 Works seamlessly with both direct instantiation and dataknobs Config objects. 

353 

354 This class supports: 

355 - All major LLM providers (OpenAI, Anthropic, Ollama, HuggingFace) 

356 - Generation parameters (temperature, max_tokens, top_p, etc.) 

357 - Function/tool calling configuration 

358 - Streaming with callbacks 

359 - Rate limiting and retry logic 

360 - Provider-specific options via options dict 

361 

362 Example: 

363 ```python 

364 from dataknobs_llm.llm.base import LLMConfig, CompletionMode 

365 

366 # Basic configuration 

367 config = LLMConfig( 

368 provider="openai", 

369 model="gpt-4", 

370 api_key="sk-...", 

371 temperature=0.7, 

372 max_tokens=500 

373 ) 

374 

375 # Creative writing config 

376 creative_config = LLMConfig( 

377 provider="anthropic", 

378 model="claude-3-sonnet", 

379 temperature=1.2, 

380 top_p=0.95, 

381 max_tokens=2000 

382 ) 

383 

384 # Deterministic config for testing 

385 test_config = LLMConfig( 

386 provider="openai", 

387 model="gpt-4", 

388 temperature=0.0, 

389 seed=42, # Reproducible outputs 

390 max_tokens=100 

391 ) 

392 

393 # Function calling config 

394 function_config = LLMConfig( 

395 provider="openai", 

396 model="gpt-4", 

397 functions=[{ 

398 "name": "search_docs", 

399 "description": "Search documentation", 

400 "parameters": {"type": "object", "properties": {...}} 

401 }], 

402 function_call="auto" 

403 ) 

404 

405 # Streaming with callback 

406 def on_chunk(chunk): 

407 print(chunk.delta, end="") 

408 

409 streaming_config = LLMConfig( 

410 provider="openai", 

411 model="gpt-4", 

412 stream=True, 

413 stream_callback=on_chunk 

414 ) 

415 

416 # From dictionary (Config compatibility) 

417 config_dict = { 

418 "provider": "ollama", 

419 "model": "llama2", 

420 "type": "llm", # Config metadata (ignored) 

421 "temperature": 0.8 

422 } 

423 config = LLMConfig.from_dict(config_dict) 

424 

425 # Clone with overrides 

426 new_config = config.clone(temperature=1.0, max_tokens=1000) 

427 ``` 

428 

429 See Also: 

430 normalize_llm_config: Convert various formats to LLMConfig 

431 CompletionMode: Available completion modes 

432 """ 

433 provider: str # 'openai', 'anthropic', 'ollama', etc. 

434 model: str # Model name/identifier 

435 api_key: str | None = None 

436 api_base: str | None = None # Custom API endpoint 

437 

438 # Generation parameters 

439 temperature: float = 0.7 

440 max_tokens: int | None = None 

441 top_p: float = 1.0 

442 frequency_penalty: float = 0.0 

443 presence_penalty: float = 0.0 

444 stop_sequences: List[str] | None = None 

445 

446 # Mode settings 

447 mode: CompletionMode = CompletionMode.CHAT 

448 system_prompt: str | None = None 

449 response_format: str | None = None # 'text' or 'json' 

450 

451 # Function calling 

452 functions: List[Dict[str, Any]] | None = None 

453 function_call: Union[str, Dict[str, str]] | None = None # 'auto', 'none', or specific function 

454 

455 # Streaming 

456 stream: bool = False 

457 stream_callback: Callable[[LLMStreamResponse], None] | None = None 

458 

459 # Rate limiting 

460 rate_limit: int | None = None # Requests per minute 

461 retry_count: int = 3 

462 retry_delay: float = 1.0 

463 timeout: float = 60.0 

464 

465 # Advanced settings 

466 seed: int | None = None # For reproducibility 

467 logit_bias: Dict[str, float] | None = None 

468 user_id: str | None = None 

469 

470 # Provider-specific options 

471 options: Dict[str, Any] = field(default_factory=dict) 

472 

473 @classmethod 

474 def from_dict(cls, config_dict: Dict[str, Any]) -> "LLMConfig": 

475 """Create LLMConfig from a dictionary. 

476 

477 This method handles dictionaries from dataknobs Config objects, 

478 which may include 'type', 'name', and 'factory' attributes. 

479 These attributes are ignored during LLMConfig construction. 

480 

481 Args: 

482 config_dict: Configuration dictionary 

483 

484 Returns: 

485 LLMConfig instance 

486 """ 

487 # Filter out Config-specific attributes 

488 config_data = { 

489 k: v for k, v in config_dict.items() 

490 if k not in ('type', 'name', 'factory') 

491 } 

492 

493 # Handle mode conversion if it's a string 

494 if 'mode' in config_data and isinstance(config_data['mode'], str): 

495 config_data['mode'] = CompletionMode(config_data['mode']) 

496 

497 # Get dataclass fields to filter unknown attributes 

498 valid_fields = {f.name for f in cls.__dataclass_fields__.values()} 

499 filtered_data = {k: v for k, v in config_data.items() if k in valid_fields} 

500 

501 return cls(**filtered_data) 

502 

503 def to_dict(self, include_config_attrs: bool = False) -> Dict[str, Any]: 

504 """Convert LLMConfig to a dictionary. 

505 

506 Args: 

507 include_config_attrs: If True, includes 'type' attribute for Config compatibility 

508 

509 Returns: 

510 Configuration dictionary 

511 """ 

512 result = {} 

513 

514 for field_info in self.__dataclass_fields__.values(): 

515 value = getattr(self, field_info.name) 

516 

517 # Handle enum conversion 

518 if isinstance(value, Enum): 

519 result[field_info.name] = value.value 

520 # Skip None values for optional fields 

521 elif value is not None: 

522 result[field_info.name] = value 

523 # Include default factories even if empty for certain fields 

524 elif field_info.name == 'options': 

525 result[field_info.name] = {} 

526 

527 # Optionally add Config-compatible type attribute 

528 if include_config_attrs: 

529 result['type'] = 'llm' 

530 

531 return result 

532 

533 def clone(self, **overrides: Any) -> "LLMConfig": 

534 """Create a copy of this config with optional overrides. 

535 

536 This method is useful for creating runtime configuration variations 

537 without mutating the original config. All dataclass fields can be 

538 overridden via keyword arguments. 

539 

540 Args: 

541 **overrides: Field values to override in the cloned config 

542 

543 Returns: 

544 New LLMConfig instance with overrides applied 

545 

546 Example: 

547 >>> base_config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7) 

548 >>> creative_config = base_config.clone(temperature=1.2, max_tokens=500) 

549 """ 

550 from dataclasses import replace 

551 return replace(self, **overrides) 

552 

553 

554def normalize_llm_config(config: Union["LLMConfig", Config, Dict[str, Any]]) -> "LLMConfig": 

555 """Normalize various config formats to LLMConfig. 

556 

557 This helper function accepts LLMConfig instances, dataknobs Config objects, 

558 or plain dictionaries and returns a standardized LLMConfig instance. 

559 

560 Args: 

561 config: Configuration as LLMConfig, Config object, or dictionary 

562 

563 Returns: 

564 LLMConfig instance 

565 

566 Raises: 

567 TypeError: If config type is not supported 

568 """ 

569 # Already an LLMConfig instance 

570 if isinstance(config, LLMConfig): 

571 return config 

572 

573 # Dictionary (possibly from Config.get()) 

574 if isinstance(config, dict): 

575 return LLMConfig.from_dict(config) 

576 

577 # dataknobs Config object - try to get the config dict 

578 # We check for the get method to identify Config objects 

579 if hasattr(config, 'get') and hasattr(config, 'get_types'): 

580 # It's a Config object, extract the llm configuration 

581 # Try to get first llm config, or fall back to first available type 

582 try: 

583 config_dict = config.get('llm', 0) 

584 except Exception as e: 

585 # If no 'llm' type, try to get first available config of any type 

586 types = config.get_types() 

587 if types: 

588 config_dict = config.get(types[0], 0) 

589 else: 

590 raise ValueError("Config object has no configurations") from e 

591 

592 return LLMConfig.from_dict(config_dict) 

593 

594 raise TypeError( 

595 f"Unsupported config type: {type(config).__name__}. " 

596 f"Expected LLMConfig, Config, or dict." 

597 ) 

598 

599 

600class LLMProvider(ABC): 

601 """Base LLM provider interface.""" 

602 

603 def __init__( 

604 self, 

605 config: Union[LLMConfig, Config, Dict[str, Any]], 

606 prompt_builder: Union[PromptBuilder, AsyncPromptBuilder] | None = None 

607 ): 

608 """Initialize provider with configuration. 

609 

610 Args: 

611 config: Configuration as LLMConfig, dataknobs Config object, or dict 

612 prompt_builder: Optional prompt builder for integrated prompting 

613 """ 

614 self.config = normalize_llm_config(config) 

615 self.prompt_builder = prompt_builder 

616 self._client = None 

617 self._is_initialized = False 

618 

619 def _validate_prompt_builder(self, expected_type: type) -> None: 

620 """Validate that prompt builder is configured and of correct type. 

621 

622 Args: 

623 expected_type: Expected builder type (PromptBuilder or AsyncPromptBuilder) 

624 

625 Raises: 

626 ValueError: If prompt_builder not configured 

627 TypeError: If prompt_builder is wrong type 

628 """ 

629 if not self.prompt_builder: 

630 raise ValueError( 

631 "No prompt_builder configured. Pass prompt_builder to __init__() " 

632 "or use complete() directly with pre-rendered messages." 

633 ) 

634 

635 if not isinstance(self.prompt_builder, expected_type): 

636 raise TypeError( 

637 f"{self.__class__.__name__} requires {expected_type.__name__}, " 

638 f"got {type(self.prompt_builder).__name__}" 

639 ) 

640 

641 def _validate_render_params( 

642 self, 

643 prompt_type: str 

644 ) -> None: 

645 """Validate render parameters. 

646 

647 Args: 

648 prompt_type: Type of prompt to render 

649 

650 Raises: 

651 ValueError: If prompt_type is invalid 

652 """ 

653 if prompt_type not in ("system", "user", "both"): 

654 raise ValueError( 

655 f"Invalid prompt_type: {prompt_type}. " 

656 f"Must be 'system', 'user', or 'both'" 

657 ) 

658 

659 @abstractmethod 

660 def initialize(self) -> None: 

661 """Initialize the LLM client.""" 

662 pass 

663 

664 @abstractmethod 

665 def close(self) -> None: 

666 """Close the LLM client.""" 

667 pass 

668 

669 @abstractmethod 

670 def validate_model(self) -> bool: 

671 """Validate that the model is available.""" 

672 pass 

673 

674 @abstractmethod 

675 def get_capabilities(self) -> List[ModelCapability]: 

676 """Get model capabilities.""" 

677 pass 

678 

679 @property 

680 def is_initialized(self) -> bool: 

681 """Check if provider is initialized.""" 

682 return self._is_initialized 

683 

684 def __enter__(self): 

685 """Context manager entry.""" 

686 self.initialize() 

687 return self 

688 

689 def __exit__(self, exc_type, exc_val, exc_tb): 

690 """Context manager exit.""" 

691 self.close() 

692 

693 

694class ConfigOverrideMixin: 

695 """Mixin providing config override functionality for LLM providers. 

696 

697 This mixin provides shared functionality for handling per-request config 

698 overrides, presets, and callbacks. Both AsyncLLMProvider and SyncLLMProvider 

699 inherit from this mixin. 

700 

701 Features: 

702 - Per-request config overrides (model, temperature, etc.) 

703 - Named presets for common override combinations 

704 - Callback hooks for logging/metrics 

705 - Options dict merging 

706 """ 

707 

708 # Supported fields for config overrides (base set) 

709 ALLOWED_CONFIG_OVERRIDES = { 

710 # Core generation parameters 

711 "model", "temperature", "max_tokens", "top_p", "stop_sequences", "seed", 

712 # Provider-specific parameters 

713 "presence_penalty", "frequency_penalty", "logit_bias", "response_format", 

714 # Function calling (dynamic) 

715 "functions", "function_call", 

716 # Provider-specific options dict 

717 "options", 

718 } 

719 

720 # Override presets registry (class-level, shared across all providers) 

721 _override_presets: Dict[str, Dict[str, Any]] = {} 

722 

723 # Override event callbacks (class-level) 

724 _override_callbacks: List[Callable[[Any, Dict[str, Any], LLMConfig], None]] = [] 

725 

726 @classmethod 

727 def register_preset(cls, name: str, overrides: Dict[str, Any]) -> None: 

728 """Register a named override preset. 

729 

730 Presets allow you to define common override combinations that can be 

731 referenced by name instead of repeating the same overrides. 

732 

733 Args: 

734 name: Preset name (e.g., "creative", "precise", "fast") 

735 overrides: Dictionary of override values 

736 

737 Example: 

738 >>> AsyncLLMProvider.register_preset("creative", { 

739 ... "temperature": 1.2, 

740 ... "top_p": 0.95, 

741 ... "presence_penalty": 0.5 

742 ... }) 

743 >>> response = await provider.complete( 

744 ... "Write a poem", 

745 ... config_overrides={"preset": "creative"} 

746 ... ) 

747 """ 

748 cls._override_presets[name] = overrides.copy() 

749 

750 @classmethod 

751 def on_override_applied( 

752 cls, 

753 callback: Callable[[Any, Dict[str, Any], LLMConfig], None] 

754 ) -> None: 

755 """Register a callback for when overrides are applied. 

756 

757 Use this for logging, metrics collection, or auditing override usage. 

758 Callbacks receive the provider instance, the applied overrides dict, 

759 and the resulting runtime config. 

760 

761 Args: 

762 callback: Function(provider, overrides, runtime_config) -> None 

763 

764 Example: 

765 >>> def log_overrides(provider, overrides, runtime_config): 

766 ... print(f"Overrides applied: {overrides}") 

767 ... print(f"Runtime model: {runtime_config.model}") 

768 ... 

769 >>> AsyncLLMProvider.on_override_applied(log_overrides) 

770 """ 

771 cls._override_callbacks.append(callback) 

772 

773 @classmethod 

774 def clear_override_callbacks(cls) -> None: 

775 """Clear all registered override callbacks.""" 

776 cls._override_callbacks.clear() 

777 

778 @classmethod 

779 def get_preset(cls, name: str) -> Dict[str, Any] | None: 

780 """Get a registered override preset by name. 

781 

782 Args: 

783 name: Preset name 

784 

785 Returns: 

786 Preset overrides dict, or None if not found 

787 """ 

788 return cls._override_presets.get(name) 

789 

790 @classmethod 

791 def list_presets(cls) -> List[str]: 

792 """List all registered preset names. 

793 

794 Returns: 

795 List of preset names 

796 """ 

797 return list(cls._override_presets.keys()) 

798 

799 def _validate_config_overrides( 

800 self, 

801 overrides: Dict[str, Any] | None 

802 ) -> None: 

803 """Validate that config override fields are supported. 

804 

805 Args: 

806 overrides: Dictionary of config overrides to validate 

807 

808 Raises: 

809 ValueError: If overrides contains unsupported fields 

810 """ 

811 if not overrides: 

812 return 

813 

814 # Allow "preset" as a special key for named presets 

815 allowed = self.ALLOWED_CONFIG_OVERRIDES | {"preset"} 

816 invalid = set(overrides.keys()) - allowed 

817 if invalid: 

818 raise ValueError( 

819 f"Unsupported config overrides: {invalid}. " 

820 f"Allowed fields: {self.ALLOWED_CONFIG_OVERRIDES}" 

821 ) 

822 

823 def _expand_preset( 

824 self, 

825 overrides: Dict[str, Any] 

826 ) -> Dict[str, Any]: 

827 """Expand preset reference to actual override values. 

828 

829 If overrides contains a 'preset' key, replaces it with the 

830 registered preset values. Explicit overrides take precedence 

831 over preset values. 

832 

833 Args: 

834 overrides: Override dict that may contain a preset reference 

835 

836 Returns: 

837 Expanded overrides dict 

838 

839 Raises: 

840 ValueError: If preset is not registered 

841 """ 

842 if "preset" not in overrides: 

843 return overrides 

844 

845 preset_name = overrides["preset"] 

846 preset_values = self.get_preset(preset_name) 

847 if preset_values is None: 

848 raise ValueError( 

849 f"Unknown preset: '{preset_name}'. " 

850 f"Available presets: {self.list_presets()}" 

851 ) 

852 

853 # Preset values as base, explicit overrides take precedence 

854 expanded = preset_values.copy() 

855 for key, value in overrides.items(): 

856 if key != "preset": 

857 expanded[key] = value 

858 

859 return expanded 

860 

861 def _merge_options( 

862 self, 

863 base_options: Dict[str, Any], 

864 override_options: Dict[str, Any] 

865 ) -> Dict[str, Any]: 

866 """Deep merge options dicts. 

867 

868 Args: 

869 base_options: Base options from config 

870 override_options: Override options to merge 

871 

872 Returns: 

873 Merged options dict 

874 """ 

875 merged = base_options.copy() 

876 merged.update(override_options) 

877 return merged 

878 

879 def _notify_override_callbacks( 

880 self, 

881 overrides: Dict[str, Any], 

882 runtime_config: LLMConfig 

883 ) -> None: 

884 """Notify registered callbacks about applied overrides. 

885 

886 Args: 

887 overrides: The overrides that were applied 

888 runtime_config: The resulting runtime config 

889 """ 

890 for callback in self._override_callbacks: 

891 try: 

892 callback(self, overrides, runtime_config) 

893 except Exception: 

894 # Don't let callback errors break the main flow 

895 pass 

896 

897 def _get_runtime_config( 

898 self, 

899 config_overrides: Dict[str, Any] | None = None 

900 ) -> LLMConfig: 

901 """Get runtime config, applying overrides if provided. 

902 

903 Supports: 

904 - Direct field overrides (model, temperature, etc.) 

905 - Named presets via 'preset' key 

906 - Deep merging of 'options' dict 

907 - Override callback notifications for logging/metrics 

908 

909 Args: 

910 config_overrides: Optional overrides to apply 

911 

912 Returns: 

913 LLMConfig to use for this request (original or cloned with overrides) 

914 """ 

915 if not config_overrides: 

916 return self.config # type: ignore[attr-defined] 

917 

918 self._validate_config_overrides(config_overrides) 

919 

920 # Expand preset if present 

921 expanded = self._expand_preset(config_overrides) 

922 

923 # Handle options merging specially 

924 if "options" in expanded and self.config.options: # type: ignore[attr-defined] 

925 expanded["options"] = self._merge_options( 

926 self.config.options, # type: ignore[attr-defined] 

927 expanded["options"] 

928 ) 

929 

930 runtime_config = self.config.clone(**expanded) # type: ignore[attr-defined] 

931 

932 # Notify callbacks 

933 self._notify_override_callbacks(config_overrides, runtime_config) 

934 

935 return runtime_config 

936 

937 

938class AsyncLLMProvider(LLMProvider, ConfigOverrideMixin): 

939 """Async LLM provider interface.""" 

940 

941 @abstractmethod 

942 async def complete( 

943 self, 

944 messages: Union[str, List[LLMMessage]], 

945 config_overrides: Dict[str, Any] | None = None, 

946 **kwargs 

947 ) -> LLMResponse: 

948 """Generate completion asynchronously. 

949 

950 Primary method for getting LLM responses. Accepts either a simple 

951 string prompt or a list of LLMMessage objects for multi-turn 

952 conversations. This is the recommended async method for most use cases. 

953 

954 Args: 

955 messages: Either a single string prompt or a list of LLMMessage 

956 objects for multi-turn conversations. 

957 config_overrides: Optional dict to override config fields for this 

958 request only. Supported fields: model, temperature, max_tokens, 

959 top_p, stop_sequences, seed. The original config is not modified. 

960 **kwargs: Additional provider-specific parameters. Common options: 

961 - temperature (float): Sampling temperature (0.0-2.0) 

962 - max_tokens (int): Maximum tokens to generate 

963 - top_p (float): Nucleus sampling parameter (0.0-1.0) 

964 - stop (List[str]): Stop sequences 

965 - presence_penalty (float): Presence penalty (-2.0 to 2.0) 

966 - frequency_penalty (float): Frequency penalty (-2.0 to 2.0) 

967 

968 Returns: 

969 LLMResponse containing generated content, usage stats, and metadata 

970 

971 Raises: 

972 ValueError: If messages format is invalid or config_overrides contains 

973 unsupported fields 

974 ConnectionError: If API connection fails 

975 TimeoutError: If request exceeds timeout 

976 

977 Example: 

978 ```python 

979 from dataknobs_llm import create_llm_provider 

980 from dataknobs_llm.llm.base import LLMMessage 

981 

982 llm = create_llm_provider("openai", model="gpt-4") 

983 

984 # Simple string prompt 

985 response = await llm.complete("What is Python?") 

986 print(response.content) 

987 # => "Python is a high-level programming language..." 

988 

989 # With config overrides (switch model per-request) 

990 response = await llm.complete( 

991 "Write a haiku about coding", 

992 config_overrides={"model": "gpt-4-turbo", "temperature": 0.9} 

993 ) 

994 

995 # Multi-turn conversation 

996 messages = [ 

997 LLMMessage(role="system", content="You are a helpful tutor"), 

998 LLMMessage(role="user", content="Explain recursion"), 

999 LLMMessage(role="assistant", content="Recursion is when..."), 

1000 LLMMessage(role="user", content="Can you give an example?") 

1001 ] 

1002 response = await llm.complete(messages) 

1003 

1004 # Check token usage 

1005 print(f"Tokens: {response.usage['total_tokens']}") 

1006 print(f"Cost: ${response.cost_usd:.4f}") 

1007 ``` 

1008 

1009 See Also: 

1010 stream_complete: Streaming version 

1011 render_and_complete: Complete with prompt rendering 

1012 """ 

1013 pass 

1014 

1015 async def render_and_complete( 

1016 self, 

1017 prompt_name: str, 

1018 params: Dict[str, Any] | None = None, 

1019 prompt_type: str = "user", 

1020 index: int = 0, 

1021 include_rag: bool = True, 

1022 **llm_kwargs 

1023 ) -> LLMResponse: 

1024 """Render prompt from library and execute LLM completion. 

1025 

1026 This is a convenience method for one-off interactions that combines 

1027 prompt rendering with LLM execution. For multi-turn conversations, 

1028 use ConversationManager instead. 

1029 

1030 Args: 

1031 prompt_name: Name of prompt in library 

1032 params: Parameters for template rendering 

1033 prompt_type: Type of prompt ("system", "user", or "both") 

1034 index: Prompt variant index (for user prompts) 

1035 include_rag: Whether to execute RAG searches 

1036 **llm_kwargs: Additional arguments passed to complete() 

1037 

1038 Returns: 

1039 LLM response 

1040 

1041 Raises: 

1042 ValueError: If prompt_builder not configured or invalid prompt_type 

1043 TypeError: If prompt_builder is not AsyncPromptBuilder 

1044 

1045 Example: 

1046 >>> llm = OpenAIProvider(config, prompt_builder=builder) 

1047 >>> result = await llm.render_and_complete( 

1048 ... "analyze_code", 

1049 ... params={"code": code, "language": "python"} 

1050 ... ) 

1051 """ 

1052 # Validate 

1053 from dataknobs_llm.prompts import AsyncPromptBuilder 

1054 self._validate_prompt_builder(AsyncPromptBuilder) 

1055 self._validate_render_params(prompt_type) 

1056 

1057 # Render messages 

1058 messages = await self._render_messages( 

1059 prompt_name, params, prompt_type, index, include_rag 

1060 ) 

1061 

1062 # Execute LLM 

1063 return await self.complete(messages, **llm_kwargs) 

1064 

1065 async def render_and_stream( 

1066 self, 

1067 prompt_name: str, 

1068 params: Dict[str, Any] | None = None, 

1069 prompt_type: str = "user", 

1070 index: int = 0, 

1071 include_rag: bool = True, 

1072 **llm_kwargs 

1073 ) -> AsyncIterator[LLMStreamResponse]: 

1074 """Render prompt and stream LLM response. 

1075 

1076 Same as render_and_complete() but returns streaming response. 

1077 

1078 Args: 

1079 prompt_name: Name of prompt in library 

1080 params: Parameters for template rendering 

1081 prompt_type: Type of prompt ("system", "user", or "both") 

1082 index: Prompt variant index 

1083 include_rag: Whether to execute RAG searches 

1084 **llm_kwargs: Additional arguments passed to stream_complete() 

1085 

1086 Yields: 

1087 Streaming response chunks 

1088 

1089 Raises: 

1090 ValueError: If prompt_builder not configured or invalid prompt_type 

1091 TypeError: If prompt_builder is not AsyncPromptBuilder 

1092 

1093 Example: 

1094 >>> async for chunk in llm.render_and_stream("analyze_code", params={"code": code}): 

1095 ... print(chunk.delta, end="") 

1096 """ 

1097 # Validate 

1098 from dataknobs_llm.prompts import AsyncPromptBuilder 

1099 self._validate_prompt_builder(AsyncPromptBuilder) 

1100 self._validate_render_params(prompt_type) 

1101 

1102 # Render messages 

1103 messages = await self._render_messages( 

1104 prompt_name, params, prompt_type, index, include_rag 

1105 ) 

1106 

1107 # Stream LLM response 

1108 async for chunk in self.stream_complete(messages, **llm_kwargs): 

1109 yield chunk 

1110 

1111 async def _render_messages( 

1112 self, 

1113 prompt_name: str, 

1114 params: Dict[str, Any] | None, 

1115 prompt_type: str, 

1116 index: int, 

1117 include_rag: bool 

1118 ) -> List[LLMMessage]: 

1119 """Render messages from prompt library (async version). 

1120 

1121 Args: 

1122 prompt_name: Name of prompt in library 

1123 params: Parameters for template rendering 

1124 prompt_type: Type of prompt ("system", "user", or "both") 

1125 index: Prompt variant index 

1126 include_rag: Whether to execute RAG searches 

1127 

1128 Returns: 

1129 List of rendered LLM messages 

1130 """ 

1131 from dataknobs_llm.prompts import AsyncPromptBuilder 

1132 builder: AsyncPromptBuilder = self.prompt_builder # type: ignore 

1133 

1134 messages: List[LLMMessage] = [] 

1135 params = params or {} 

1136 

1137 if prompt_type in ("system", "both"): 

1138 result = await builder.render_system_prompt( 

1139 prompt_name, params=params, include_rag=include_rag 

1140 ) 

1141 messages.append(LLMMessage(role="system", content=result.content)) 

1142 

1143 if prompt_type in ("user", "both"): 

1144 result = await builder.render_user_prompt( 

1145 prompt_name, index=index, params=params, include_rag=include_rag 

1146 ) 

1147 messages.append(LLMMessage(role="user", content=result.content)) 

1148 

1149 return messages 

1150 

1151 @abstractmethod 

1152 async def stream_complete( 

1153 self, 

1154 messages: Union[str, List[LLMMessage]], 

1155 config_overrides: Dict[str, Any] | None = None, 

1156 **kwargs 

1157 ) -> AsyncIterator[LLMStreamResponse]: 

1158 r"""Generate streaming completion asynchronously. 

1159 

1160 Streams response chunks as they are generated, enabling real-time 

1161 display of LLM output. Each chunk contains incremental content 

1162 (delta), and the final chunk includes usage statistics. 

1163 

1164 Args: 

1165 messages: Either a single string prompt or list of LLMMessage objects 

1166 config_overrides: Optional dict to override config fields for this 

1167 request only. Supported fields: model, temperature, max_tokens, 

1168 top_p, stop_sequences, seed. The original config is not modified. 

1169 **kwargs: Provider-specific parameters (same as complete()) 

1170 

1171 Yields: 

1172 LLMStreamResponse chunks containing incremental content. The final 

1173 chunk has is_final=True and includes finish_reason and usage stats. 

1174 

1175 Raises: 

1176 ValueError: If messages format is invalid or config_overrides contains 

1177 unsupported fields 

1178 ConnectionError: If API connection fails 

1179 TimeoutError: If request exceeds timeout 

1180 

1181 Example: 

1182 ```python 

1183 from dataknobs_llm import create_llm_provider 

1184 

1185 llm = create_llm_provider("openai", model="gpt-4") 

1186 

1187 # Stream and display in real-time 

1188 async for chunk in llm.stream_complete("Tell me a story"): 

1189 print(chunk.delta, end="", flush=True) 

1190 

1191 if chunk.is_final: 

1192 print(f"\n\nFinished: {chunk.finish_reason}") 

1193 print(f"Total tokens: {chunk.usage['total_tokens']}") 

1194 

1195 # Stream with config overrides 

1196 async for chunk in llm.stream_complete( 

1197 "Write a poem", 

1198 config_overrides={"model": "gpt-4-turbo", "temperature": 1.0} 

1199 ): 

1200 print(chunk.delta, end="", flush=True) 

1201 

1202 # Accumulate full response 

1203 full_text = "" 

1204 chunk_count = 0 

1205 

1206 async for chunk in llm.stream_complete("Explain quantum computing"): 

1207 full_text += chunk.delta 

1208 chunk_count += 1 

1209 

1210 print(f"Received {chunk_count} chunks") 

1211 print(f"Total length: {len(full_text)} characters") 

1212 ``` 

1213 

1214 See Also: 

1215 complete: Non-streaming version 

1216 render_and_stream: Stream with prompt rendering 

1217 LLMStreamResponse: Chunk data structure 

1218 """ 

1219 pass 

1220 

1221 @abstractmethod 

1222 async def embed( 

1223 self, 

1224 texts: Union[str, List[str]], 

1225 **kwargs 

1226 ) -> Union[List[float], List[List[float]]]: 

1227 """Generate embeddings asynchronously. 

1228 

1229 Converts text into dense vector representations for semantic search, 

1230 clustering, and similarity comparison. Returns high-dimensional 

1231 vectors (typically 768-1536 dimensions depending on model). 

1232 

1233 Args: 

1234 texts: Single text string or list of texts to embed 

1235 **kwargs: Provider-specific parameters: 

1236 - model (str): Embedding model override 

1237 - dimensions (int): Target dimensions (if supported) 

1238 

1239 Returns: 

1240 Single embedding vector (List[float]) if input is a string, 

1241 or list of vectors (List[List[float]]) if input is a list 

1242 

1243 Raises: 

1244 ValueError: If texts is empty or invalid 

1245 ConnectionError: If API connection fails 

1246 

1247 Example: 

1248 ```python 

1249 from dataknobs_llm import create_llm_provider 

1250 import numpy as np 

1251 

1252 # Create embedding provider 

1253 llm = create_llm_provider( 

1254 "openai", 

1255 model="text-embedding-ada-002" 

1256 ) 

1257 

1258 # Single text embedding 

1259 embedding = await llm.embed("What is machine learning?") 

1260 print(f"Dimensions: {len(embedding)}") 

1261 # => Dimensions: 1536 

1262 

1263 # Batch embedding 

1264 texts = [ 

1265 "Python is a programming language", 

1266 "JavaScript is used for web development", 

1267 "Machine learning uses statistical methods" 

1268 ] 

1269 embeddings = await llm.embed(texts) 

1270 print(f"Generated {len(embeddings)} embeddings") 

1271 

1272 # Compute similarity 

1273 def cosine_similarity(v1, v2): 

1274 return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) 

1275 

1276 query_emb = await llm.embed("Tell me about ML") 

1277 similarities = [ 

1278 cosine_similarity(query_emb, emb) 

1279 for emb in embeddings 

1280 ] 

1281 most_similar_idx = np.argmax(similarities) 

1282 print(f"Most similar: {texts[most_similar_idx]}") 

1283 # => Most similar: Machine learning uses statistical methods 

1284 

1285 # Store in vector database 

1286 from dataknobs_data import database_factory 

1287 db = database_factory.create("vector_db") 

1288 for text, emb in zip(texts, embeddings): 

1289 db.create({"text": text, "embedding": emb}) 

1290 ``` 

1291 

1292 See Also: 

1293 complete: Text generation method 

1294 """ 

1295 pass 

1296 

1297 @abstractmethod 

1298 async def function_call( 

1299 self, 

1300 messages: List[LLMMessage], 

1301 functions: List[Dict[str, Any]], 

1302 **kwargs 

1303 ) -> LLMResponse: 

1304 """Execute function calling asynchronously. 

1305 

1306 Enables the LLM to call external functions/tools. The model decides 

1307 which function to call based on the conversation context, and returns 

1308 the function name and arguments in a structured format. 

1309 

1310 Args: 

1311 messages: Conversation messages leading up to the function call 

1312 functions: List of function definitions in JSON Schema format. 

1313 Each function dict must have: 

1314 - name (str): Function name 

1315 - description (str): What the function does 

1316 - parameters (dict): JSON Schema for parameters 

1317 **kwargs: Provider-specific parameters: 

1318 - function_call (str|dict): 'auto', 'none', or specific function 

1319 - temperature (float): Sampling temperature 

1320 - max_tokens (int): Maximum response tokens 

1321 

1322 Returns: 

1323 LLMResponse with function_call field populated containing: 

1324 - name (str): Function to call 

1325 - arguments (str): JSON string of arguments 

1326 

1327 Raises: 

1328 ValueError: If functions format is invalid 

1329 ConnectionError: If API connection fails 

1330 

1331 Example: 

1332 ```python 

1333 from dataknobs_llm import create_llm_provider 

1334 from dataknobs_llm.llm.base import LLMMessage 

1335 import json 

1336 

1337 llm = create_llm_provider("openai", model="gpt-4") 

1338 

1339 # Define available functions 

1340 functions = [ 

1341 { 

1342 "name": "search_docs", 

1343 "description": "Search documentation for information", 

1344 "parameters": { 

1345 "type": "object", 

1346 "properties": { 

1347 "query": { 

1348 "type": "string", 

1349 "description": "Search query" 

1350 }, 

1351 "limit": { 

1352 "type": "integer", 

1353 "description": "Max results" 

1354 } 

1355 }, 

1356 "required": ["query"] 

1357 } 

1358 }, 

1359 { 

1360 "name": "execute_code", 

1361 "description": "Execute Python code", 

1362 "parameters": { 

1363 "type": "object", 

1364 "properties": { 

1365 "code": {"type": "string"} 

1366 }, 

1367 "required": ["code"] 

1368 } 

1369 } 

1370 ] 

1371 

1372 # Ask question that requires function 

1373 messages = [ 

1374 LLMMessage( 

1375 role="user", 

1376 content="Search for information about async/await in Python" 

1377 ) 

1378 ] 

1379 

1380 # Model decides to call function 

1381 response = await llm.function_call(messages, functions) 

1382 

1383 if response.function_call: 

1384 func_name = response.function_call["name"] 

1385 func_args = json.loads(response.function_call["arguments"]) 

1386 

1387 print(f"Function: {func_name}") 

1388 print(f"Arguments: {func_args}") 

1389 # => Function: search_docs 

1390 # => Arguments: {'query': 'async/await Python', 'limit': 5} 

1391 

1392 # Execute function 

1393 results = search_docs(**func_args) 

1394 

1395 # Add function result to conversation 

1396 messages.append(LLMMessage( 

1397 role="function", 

1398 name=func_name, 

1399 content=json.dumps(results) 

1400 )) 

1401 

1402 # Get final response 

1403 final = await llm.complete(messages) 

1404 print(final.content) 

1405 ``` 

1406 

1407 See Also: 

1408 complete: Standard completion without functions 

1409 dataknobs_llm.tools: Tool abstraction framework 

1410 """ 

1411 pass 

1412 

1413 async def initialize(self) -> None: 

1414 """Initialize the async LLM client.""" 

1415 self._is_initialized = True 

1416 

1417 async def close(self) -> None: 

1418 """Close the async LLM client.""" 

1419 self._is_initialized = False 

1420 

1421 async def __aenter__(self): 

1422 """Async context manager entry.""" 

1423 await self.initialize() 

1424 return self 

1425 

1426 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1427 """Async context manager exit.""" 

1428 await self.close() 

1429 

1430 

1431class SyncLLMProvider(LLMProvider, ConfigOverrideMixin): 

1432 """Synchronous LLM provider interface.""" 

1433 

1434 @abstractmethod 

1435 def complete( 

1436 self, 

1437 messages: Union[str, List[LLMMessage]], 

1438 config_overrides: Dict[str, Any] | None = None, 

1439 **kwargs 

1440 ) -> LLMResponse: 

1441 """Generate completion synchronously. 

1442 

1443 Args: 

1444 messages: Input messages or prompt 

1445 config_overrides: Optional dict to override config fields for this 

1446 request only. Supported fields: model, temperature, max_tokens, 

1447 top_p, stop_sequences, seed. The original config is not modified. 

1448 **kwargs: Additional parameters 

1449 

1450 Returns: 

1451 LLM response 

1452 """ 

1453 pass 

1454 

1455 def render_and_complete( 

1456 self, 

1457 prompt_name: str, 

1458 params: Dict[str, Any] | None = None, 

1459 prompt_type: str = "user", 

1460 index: int = 0, 

1461 include_rag: bool = True, 

1462 **llm_kwargs 

1463 ) -> LLMResponse: 

1464 """Render prompt from library and execute LLM completion. 

1465 

1466 This is a convenience method for one-off interactions that combines 

1467 prompt rendering with LLM execution. For multi-turn conversations, 

1468 use ConversationManager instead. 

1469 

1470 Args: 

1471 prompt_name: Name of prompt in library 

1472 params: Parameters for template rendering 

1473 prompt_type: Type of prompt ("system", "user", or "both") 

1474 index: Prompt variant index (for user prompts) 

1475 include_rag: Whether to execute RAG searches 

1476 **llm_kwargs: Additional arguments passed to complete() 

1477 

1478 Returns: 

1479 LLM response 

1480 

1481 Raises: 

1482 ValueError: If prompt_builder not configured or invalid prompt_type 

1483 TypeError: If prompt_builder is not PromptBuilder 

1484 

1485 Example: 

1486 >>> llm = SyncOpenAIProvider(config, prompt_builder=builder) 

1487 >>> result = llm.render_and_complete( 

1488 ... "analyze_code", 

1489 ... params={"code": code, "language": "python"} 

1490 ... ) 

1491 """ 

1492 # Validate 

1493 from dataknobs_llm.prompts import PromptBuilder 

1494 self._validate_prompt_builder(PromptBuilder) 

1495 self._validate_render_params(prompt_type) 

1496 

1497 # Render messages 

1498 messages = self._render_messages( 

1499 prompt_name, params, prompt_type, index, include_rag 

1500 ) 

1501 

1502 # Execute LLM 

1503 return self.complete(messages, **llm_kwargs) 

1504 

1505 def render_and_stream( 

1506 self, 

1507 prompt_name: str, 

1508 params: Dict[str, Any] | None = None, 

1509 prompt_type: str = "user", 

1510 index: int = 0, 

1511 include_rag: bool = True, 

1512 **llm_kwargs 

1513 ) -> Iterator[LLMStreamResponse]: 

1514 """Render prompt and stream LLM response. 

1515 

1516 Same as render_and_complete() but returns streaming response. 

1517 

1518 Args: 

1519 prompt_name: Name of prompt in library 

1520 params: Parameters for template rendering 

1521 prompt_type: Type of prompt ("system", "user", or "both") 

1522 index: Prompt variant index 

1523 include_rag: Whether to execute RAG searches 

1524 **llm_kwargs: Additional arguments passed to stream_complete() 

1525 

1526 Yields: 

1527 Streaming response chunks 

1528 

1529 Raises: 

1530 ValueError: If prompt_builder not configured or invalid prompt_type 

1531 TypeError: If prompt_builder is not PromptBuilder 

1532 

1533 Example: 

1534 >>> for chunk in llm.render_and_stream("analyze_code", params={"code": code}): 

1535 ... print(chunk.delta, end="") 

1536 """ 

1537 # Validate 

1538 from dataknobs_llm.prompts import PromptBuilder 

1539 self._validate_prompt_builder(PromptBuilder) 

1540 self._validate_render_params(prompt_type) 

1541 

1542 # Render messages 

1543 messages = self._render_messages( 

1544 prompt_name, params, prompt_type, index, include_rag 

1545 ) 

1546 

1547 # Stream LLM response 

1548 for chunk in self.stream_complete(messages, **llm_kwargs): 

1549 yield chunk 

1550 

1551 def _render_messages( 

1552 self, 

1553 prompt_name: str, 

1554 params: Dict[str, Any] | None, 

1555 prompt_type: str, 

1556 index: int, 

1557 include_rag: bool 

1558 ) -> List[LLMMessage]: 

1559 """Render messages from prompt library (sync version). 

1560 

1561 Args: 

1562 prompt_name: Name of prompt in library 

1563 params: Parameters for template rendering 

1564 prompt_type: Type of prompt ("system", "user", or "both") 

1565 index: Prompt variant index 

1566 include_rag: Whether to execute RAG searches 

1567 

1568 Returns: 

1569 List of rendered LLM messages 

1570 """ 

1571 from dataknobs_llm.prompts import PromptBuilder 

1572 builder: PromptBuilder = self.prompt_builder # type: ignore 

1573 

1574 messages: List[LLMMessage] = [] 

1575 params = params or {} 

1576 

1577 if prompt_type in ("system", "both"): 

1578 result = builder.render_system_prompt( 

1579 prompt_name, params=params, include_rag=include_rag 

1580 ) 

1581 messages.append(LLMMessage(role="system", content=result.content)) 

1582 

1583 if prompt_type in ("user", "both"): 

1584 result = builder.render_user_prompt( 

1585 prompt_name, index=index, params=params, include_rag=include_rag 

1586 ) 

1587 messages.append(LLMMessage(role="user", content=result.content)) 

1588 

1589 return messages 

1590 

1591 @abstractmethod 

1592 def stream_complete( 

1593 self, 

1594 messages: Union[str, List[LLMMessage]], 

1595 config_overrides: Dict[str, Any] | None = None, 

1596 **kwargs 

1597 ) -> Iterator[LLMStreamResponse]: 

1598 """Generate streaming completion synchronously. 

1599 

1600 Args: 

1601 messages: Input messages or prompt 

1602 config_overrides: Optional dict to override config fields for this 

1603 request only. Supported fields: model, temperature, max_tokens, 

1604 top_p, stop_sequences, seed. The original config is not modified. 

1605 **kwargs: Additional parameters 

1606 

1607 Yields: 

1608 Streaming response chunks 

1609 """ 

1610 pass 

1611 

1612 @abstractmethod 

1613 def embed( 

1614 self, 

1615 texts: Union[str, List[str]], 

1616 **kwargs 

1617 ) -> Union[List[float], List[List[float]]]: 

1618 """Generate embeddings synchronously. 

1619  

1620 Args: 

1621 texts: Input text(s) 

1622 **kwargs: Additional parameters 

1623  

1624 Returns: 

1625 Embedding vector(s) 

1626 """ 

1627 pass 

1628 

1629 @abstractmethod 

1630 def function_call( 

1631 self, 

1632 messages: List[LLMMessage], 

1633 functions: List[Dict[str, Any]], 

1634 **kwargs 

1635 ) -> LLMResponse: 

1636 """Execute function calling synchronously. 

1637  

1638 Args: 

1639 messages: Conversation messages 

1640 functions: Available functions 

1641 **kwargs: Additional parameters 

1642  

1643 Returns: 

1644 Response with function call 

1645 """ 

1646 pass 

1647 

1648 def initialize(self) -> None: 

1649 """Initialize the sync LLM client.""" 

1650 self._is_initialized = True 

1651 

1652 def close(self) -> None: 

1653 """Close the sync LLM client.""" 

1654 self._is_initialized = False 

1655 

1656 

1657class LLMAdapter(ABC): 

1658 """Base adapter for converting between different LLM formats. 

1659 

1660 Adapters translate between the standard dataknobs LLM format 

1661 (LLMMessage, LLMResponse, LLMConfig) and provider-specific formats 

1662 (OpenAI, Anthropic, etc.). Each provider implementation should 

1663 have a corresponding adapter. 

1664 

1665 This enables provider-agnostic code that works across different 

1666 LLM APIs without modification. 

1667 

1668 Example: 

1669 ```python 

1670 from dataknobs_llm.llm.base import LLMAdapter, LLMMessage, LLMResponse 

1671 from typing import Any, List, Dict 

1672 

1673 class MyProviderAdapter(LLMAdapter): 

1674 \"\"\"Adapter for custom LLM provider.\"\"\" 

1675 

1676 def adapt_messages( 

1677 self, 

1678 messages: List[LLMMessage] 

1679 ) -> List[Dict[str, str]]: 

1680 \"\"\"Convert to provider format.\"\"\" 

1681 return [ 

1682 {"role": msg.role, "content": msg.content} 

1683 for msg in messages 

1684 ] 

1685 

1686 def adapt_response( 

1687 self, 

1688 response: Any 

1689 ) -> LLMResponse: 

1690 \"\"\"Convert from provider format.\"\"\" 

1691 return LLMResponse( 

1692 content=response["text"], 

1693 model=response["model_id"], 

1694 usage={ 

1695 "total_tokens": response["tokens_used"] 

1696 } 

1697 ) 

1698 

1699 def adapt_config( 

1700 self, 

1701 config: LLMConfig 

1702 ) -> Dict[str, Any]: 

1703 \"\"\"Convert config to provider format.\"\"\" 

1704 return { 

1705 "model_name": config.model, 

1706 "temp": config.temperature, 

1707 "max_length": config.max_tokens 

1708 } 

1709 

1710 # Use adapter in provider 

1711 adapter = MyProviderAdapter() 

1712 provider_messages = adapter.adapt_messages(messages) 

1713 ``` 

1714 

1715 See Also: 

1716 LLMProvider: Base provider interface 

1717 dataknobs_llm.llm.providers.OpenAIAdapter: Example implementation 

1718 """ 

1719 

1720 @abstractmethod 

1721 def adapt_messages( 

1722 self, 

1723 messages: List[LLMMessage] 

1724 ) -> Any: 

1725 """Adapt messages to provider format. 

1726 

1727 Args: 

1728 messages: Standard LLMMessage list 

1729 

1730 Returns: 

1731 Provider-specific message format 

1732 """ 

1733 pass 

1734 

1735 @abstractmethod 

1736 def adapt_response( 

1737 self, 

1738 response: Any 

1739 ) -> LLMResponse: 

1740 """Adapt provider response to standard format. 

1741 

1742 Args: 

1743 response: Provider-specific response object 

1744 

1745 Returns: 

1746 Standard LLMResponse 

1747 """ 

1748 pass 

1749 

1750 @abstractmethod 

1751 def adapt_config( 

1752 self, 

1753 config: LLMConfig 

1754 ) -> Dict[str, Any]: 

1755 """Adapt configuration to provider format. 

1756 

1757 Args: 

1758 config: Standard LLMConfig 

1759 

1760 Returns: 

1761 Provider-specific config dict 

1762 """ 

1763 pass 

1764 

1765 

1766class LLMMiddleware(Protocol): 

1767 """Protocol for LLM middleware. 

1768 

1769 Middleware provides hooks to transform requests before they're sent 

1770 to the LLM and responses before they're returned to the caller. 

1771 Useful for logging, caching, content filtering, rate limiting, etc. 

1772 

1773 Middleware can accept configuration as LLMConfig, dataknobs Config, or dict. 

1774 

1775 Example: 

1776 ```python 

1777 from dataknobs_llm.llm.base import ( 

1778 LLMMiddleware, LLMMessage, LLMResponse, LLMConfig 

1779 ) 

1780 from typing import List, Union, Dict, Any 

1781 import logging 

1782 

1783 class LoggingMiddleware: 

1784 \"\"\"Logs all LLM requests and responses.\"\"\" 

1785 

1786 def __init__(self): 

1787 self.logger = logging.getLogger(__name__) 

1788 

1789 async def process_request( 

1790 self, 

1791 messages: List[LLMMessage], 

1792 config: Union[LLMConfig, Config, Dict[str, Any]] 

1793 ) -> List[LLMMessage]: 

1794 \"\"\"Log request before sending.\"\"\" 

1795 self.logger.info(f"Request: {len(messages)} messages") 

1796 for msg in messages: 

1797 self.logger.debug(f" {msg.role}: {msg.content[:50]}...") 

1798 return messages 

1799 

1800 async def process_response( 

1801 self, 

1802 response: LLMResponse, 

1803 config: Union[LLMConfig, Config, Dict[str, Any]] 

1804 ) -> LLMResponse: 

1805 \"\"\"Log response after receiving.\"\"\" 

1806 self.logger.info(f"Response: {len(response.content)} chars") 

1807 self.logger.info(f"Tokens: {response.usage['total_tokens']}") 

1808 if response.cost_usd: 

1809 self.logger.info(f"Cost: ${response.cost_usd:.4f}") 

1810 return response 

1811 

1812 

1813 class ContentFilterMiddleware: 

1814 \"\"\"Filters sensitive content.\"\"\" 

1815 

1816 def __init__(self, blocked_words: List[str]): 

1817 self.blocked_words = blocked_words 

1818 

1819 async def process_request( 

1820 self, 

1821 messages: List[LLMMessage], 

1822 config: Union[LLMConfig, Config, Dict[str, Any]] 

1823 ) -> List[LLMMessage]: 

1824 \"\"\"Filter input messages.\"\"\" 

1825 filtered = [] 

1826 for msg in messages: 

1827 content = msg.content 

1828 for word in self.blocked_words: 

1829 content = content.replace(word, "***") 

1830 filtered.append(LLMMessage( 

1831 role=msg.role, 

1832 content=content, 

1833 name=msg.name, 

1834 function_call=msg.function_call, 

1835 metadata=msg.metadata 

1836 )) 

1837 return filtered 

1838 

1839 async def process_response( 

1840 self, 

1841 response: LLMResponse, 

1842 config: Union[LLMConfig, Config, Dict[str, Any]] 

1843 ) -> LLMResponse: 

1844 \"\"\"Filter output.\"\"\" 

1845 content = response.content 

1846 for word in self.blocked_words: 

1847 content = content.replace(word, "***") 

1848 

1849 from dataclasses import replace 

1850 return replace(response, content=content) 

1851 

1852 

1853 # Use with ConversationManager 

1854 from dataknobs_llm.conversations import ConversationManager 

1855 

1856 manager = await ConversationManager.create( 

1857 llm=llm, 

1858 prompt_builder=builder, 

1859 middleware=[ 

1860 LoggingMiddleware(), 

1861 ContentFilterMiddleware(["password", "secret"]) 

1862 ] 

1863 ) 

1864 ``` 

1865 

1866 See Also: 

1867 ConversationManager: Uses middleware for request/response processing 

1868 """ 

1869 

1870 async def process_request( 

1871 self, 

1872 messages: List[LLMMessage], 

1873 config: Union[LLMConfig, Config, Dict[str, Any]] 

1874 ) -> List[LLMMessage]: 

1875 """Process request before sending to LLM. 

1876 

1877 Transform, log, validate, or filter messages before they are 

1878 sent to the LLM provider. 

1879 

1880 Args: 

1881 messages: Input messages to be sent to LLM 

1882 config: Configuration (LLMConfig, Config, or dict) 

1883 

1884 Returns: 

1885 Processed messages (can be modified, added to, or filtered) 

1886 

1887 Raises: 

1888 ValueError: If messages are invalid 

1889 """ 

1890 ... 

1891 

1892 async def process_response( 

1893 self, 

1894 response: LLMResponse, 

1895 config: Union[LLMConfig, Config, Dict[str, Any]] 

1896 ) -> LLMResponse: 

1897 """Process response from LLM. 

1898 

1899 Transform, log, validate, or filter the LLM response before 

1900 returning to the caller. 

1901 

1902 Args: 

1903 response: LLM response to process 

1904 config: Configuration (LLMConfig, Config, or dict) 

1905 

1906 Returns: 

1907 Processed response (can be modified) 

1908 

1909 Raises: 

1910 ValueError: If response is invalid 

1911 """ 

1912 ...