Coverage for src / dataknobs_llm / llm / base.py: 66%
270 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 11:16 -0700
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 11:16 -0700
1"""Base LLM abstraction components.
3This module provides the base abstractions for unified LLM operations across
4different providers (OpenAI, Anthropic, Ollama, etc.). It defines standard
5interfaces for completions, streaming, embeddings, and function calling.
7The architecture follows a provider pattern where all LLM providers implement
8common interfaces (AsyncLLMProvider or SyncLLMProvider) and use standardized
9data structures (LLMMessage, LLMResponse, LLMConfig).
11Key Components:
12 - LLMProvider: Base provider interface with initialization and lifecycle
13 - AsyncLLMProvider: Async provider with complete(), stream_complete(), embed()
14 - SyncLLMProvider: Synchronous version for non-async applications
15 - LLMMessage: Standard message format for conversations
16 - LLMResponse: Standard response with content, usage, and cost tracking
17 - LLMConfig: Comprehensive configuration with 20+ parameters
18 - LLMAdapter: Format adapters for provider-specific APIs
19 - LLMMiddleware: Request/response processing pipeline
21Example:
22 ```python
23 from dataknobs_llm import create_llm_provider
24 from dataknobs_llm.llm.base import LLMConfig, LLMMessage
26 # Create provider with config
27 config = LLMConfig(
28 provider="openai",
29 model="gpt-4",
30 temperature=0.7,
31 max_tokens=500
32 )
34 # Async usage
35 async with create_llm_provider(config) as llm:
36 # Simple completion
37 response = await llm.complete("What is Python?")
38 print(response.content)
40 # Streaming
41 async for chunk in llm.stream_complete("Tell me a story"):
42 print(chunk.delta, end="", flush=True)
44 # Multi-turn conversation
45 messages = [
46 LLMMessage(role="system", content="You are helpful"),
47 LLMMessage(role="user", content="Hello!"),
48 ]
49 response = await llm.complete(messages)
50 ```
52See Also:
53 - dataknobs_llm.llm.providers: Provider implementations
54 - dataknobs_llm.conversations: Multi-turn conversation management
55 - dataknobs_llm.prompts: Prompt rendering and RAG integration
56"""
58from abc import ABC, abstractmethod
59from dataclasses import dataclass, field
60from enum import Enum
61from typing import (
62 Any, Dict, List, Union, AsyncIterator, Iterator,
63 Callable, Protocol
64)
65from datetime import datetime
67# Import prompt builder types - clean one-way dependency (llm depends on prompts)
68from dataknobs_llm.prompts import AsyncPromptBuilder, PromptBuilder
69from dataknobs_config.config import Config
72class CompletionMode(Enum):
73 """LLM completion modes.
75 Defines the operation mode for LLM requests. Different modes use
76 different APIs and formatting requirements.
78 Attributes:
79 CHAT: Chat completion with conversational message history
80 TEXT: Raw text completion (legacy models)
81 INSTRUCT: Instruction-following mode
82 EMBEDDING: Generate vector embeddings for semantic search
83 FUNCTION: Function/tool calling mode
85 Example:
86 ```python
87 from dataknobs_llm.llm.base import LLMConfig, CompletionMode
89 # Chat mode (default for modern models)
90 config = LLMConfig(
91 provider="openai",
92 model="gpt-4",
93 mode=CompletionMode.CHAT
94 )
96 # Embedding mode for vector search
97 embedding_config = LLMConfig(
98 provider="openai",
99 model="text-embedding-ada-002",
100 mode=CompletionMode.EMBEDDING
101 )
102 ```
103 """
104 CHAT = "chat" # Chat completion with message history
105 TEXT = "text" # Text completion
106 INSTRUCT = "instruct" # Instruction following
107 EMBEDDING = "embedding" # Generate embeddings
108 FUNCTION = "function" # Function calling
111class ModelCapability(Enum):
112 """Model capabilities.
114 Enumerates the capabilities that different LLM models support.
115 Providers use this to advertise what features are available for
116 a specific model.
118 Attributes:
119 TEXT_GENERATION: Basic text generation
120 CHAT: Multi-turn conversational interactions
121 EMBEDDINGS: Vector embedding generation
122 FUNCTION_CALLING: Tool/function calling support
123 VISION: Image understanding capabilities
124 CODE: Code generation and analysis
125 JSON_MODE: Structured JSON output
126 STREAMING: Incremental response streaming
128 Example:
129 ```python
130 from dataknobs_llm import create_llm_provider
131 from dataknobs_llm.llm.base import ModelCapability
133 # Check model capabilities
134 llm = create_llm_provider("openai", model="gpt-4")
135 capabilities = llm.get_capabilities()
137 if ModelCapability.STREAMING in capabilities:
138 # Use streaming
139 async for chunk in llm.stream_complete("Hello"):
140 print(chunk.delta, end="")
142 if ModelCapability.FUNCTION_CALLING in capabilities:
143 # Use function calling
144 response = await llm.function_call(messages, functions)
145 ```
146 """
147 TEXT_GENERATION = "text_generation"
148 CHAT = "chat"
149 EMBEDDINGS = "embeddings"
150 FUNCTION_CALLING = "function_calling"
151 VISION = "vision"
152 CODE = "code"
153 JSON_MODE = "json_mode"
154 STREAMING = "streaming"
157@dataclass
158class ToolCall:
159 """Represents a tool call from the LLM.
161 Used when the LLM wants to invoke a tool/function during reasoning.
163 Attributes:
164 name: Name of the tool to call
165 parameters: Arguments to pass to the tool
166 id: Optional unique identifier for the tool call
167 """
168 name: str
169 parameters: Dict[str, Any]
170 id: str | None = None
173@dataclass
174class LLMMessage:
175 """Represents a message in LLM conversation.
177 Standard message format used across all providers. Messages are the
178 fundamental unit of LLM interactions, containing role-based content
179 for multi-turn conversations.
181 Attributes:
182 role: Message role - 'system', 'user', 'assistant', or 'function'
183 content: Message content text
184 name: Optional name for function messages or multi-user scenarios
185 function_call: Function call data for tool-using models
186 metadata: Additional metadata (timestamps, IDs, etc.)
188 Example:
189 ```python
190 from dataknobs_llm.llm.base import LLMMessage
192 # System message
193 system_msg = LLMMessage(
194 role="system",
195 content="You are a helpful coding assistant."
196 )
198 # User message
199 user_msg = LLMMessage(
200 role="user",
201 content="How do I reverse a list in Python?"
202 )
204 # Assistant message
205 assistant_msg = LLMMessage(
206 role="assistant",
207 content="Use the reverse() method or [::-1] slicing."
208 )
210 # Function result message
211 function_msg = LLMMessage(
212 role="function",
213 name="search_docs",
214 content='{"result": "Found 3 examples"}'
215 )
217 # Build conversation
218 messages = [system_msg, user_msg, assistant_msg]
219 ```
220 """
221 role: str # 'system', 'user', 'assistant', 'function'
222 content: str
223 name: str | None = None # For function messages
224 function_call: Dict[str, Any] | None = None # For function calling
225 metadata: Dict[str, Any] = field(default_factory=dict)
228@dataclass
229class LLMResponse:
230 """Response from LLM.
232 Standard response format returned by all LLM providers. Contains the
233 generated content along with metadata about token usage, cost, and
234 completion status.
236 Attributes:
237 content: Generated text content
238 model: Model identifier that generated the response
239 finish_reason: Why generation stopped - 'stop', 'length', 'function_call'
240 usage: Token usage stats (prompt_tokens, completion_tokens, total_tokens)
241 function_call: Function call data if model requested tool use
242 metadata: Provider-specific metadata
243 created_at: Response timestamp
244 cost_usd: Estimated cost in USD for this request
245 cumulative_cost_usd: Running total cost for conversation
247 Example:
248 ```python
249 from dataknobs_llm import create_llm_provider
251 llm = create_llm_provider("openai", model="gpt-4")
252 response = await llm.complete("What is Python?")
254 # Access response data
255 print(response.content)
256 # => "Python is a high-level programming language..."
258 # Check token usage
259 print(f"Tokens used: {response.usage['total_tokens']}")
260 # => Tokens used: 87
262 # Monitor costs
263 if response.cost_usd:
264 print(f"Cost: ${response.cost_usd:.4f}")
265 print(f"Total: ${response.cumulative_cost_usd:.4f}")
267 # Check completion status
268 if response.finish_reason == "length":
269 print("Response truncated due to max_tokens limit")
270 ```
272 See Also:
273 LLMMessage: Request message format
274 LLMStreamResponse: Streaming response format
275 """
276 content: str
277 model: str
278 finish_reason: str | None = None # 'stop', 'length', 'function_call', 'tool_calls'
279 usage: Dict[str, int] | None = None # tokens used
280 function_call: Dict[str, Any] | None = None # Legacy single function call
281 tool_calls: list["ToolCall"] | None = None # List of tool calls (preferred)
282 metadata: Dict[str, Any] = field(default_factory=dict)
283 created_at: datetime = field(default_factory=datetime.now)
285 # Cost tracking (optional enhancement for DynaBot)
286 cost_usd: float | None = None # Estimated cost in USD
287 cumulative_cost_usd: float | None = None # Running total for conversation
290@dataclass
291class LLMStreamResponse:
292 r"""Streaming response from LLM.
294 Represents a single chunk in a streaming LLM response. Streaming
295 allows displaying generated text incrementally as it's produced,
296 providing better user experience for long responses.
298 Attributes:
299 delta: Incremental content for this chunk (not cumulative)
300 is_final: True if this is the last chunk in the stream
301 finish_reason: Why generation stopped (only set on final chunk)
302 usage: Token usage stats (only set on final chunk)
303 metadata: Additional chunk metadata
305 Example:
306 ```python
307 from dataknobs_llm import create_llm_provider
309 llm = create_llm_provider("openai", model="gpt-4")
311 # Stream and display in real-time
312 async for chunk in llm.stream_complete("Write a poem"):
313 print(chunk.delta, end="", flush=True)
315 if chunk.is_final:
316 print(f"\n\nFinished: {chunk.finish_reason}")
317 print(f"Tokens: {chunk.usage['total_tokens']}")
319 # Accumulate full response
320 full_text = ""
321 chunks_received = 0
323 async for chunk in llm.stream_complete("Explain Python"):
324 full_text += chunk.delta
325 chunks_received += 1
327 # Optional: show progress
328 if chunks_received % 10 == 0:
329 print(f"Received {chunks_received} chunks...")
331 print(f"\nComplete response ({len(full_text)} chars)")
332 print(full_text)
333 ```
335 See Also:
336 LLMResponse: Non-streaming response format
337 AsyncLLMProvider.stream_complete: Streaming method
338 """
339 delta: str # Incremental content
340 is_final: bool = False
341 finish_reason: str | None = None
342 usage: Dict[str, int] | None = None
343 metadata: Dict[str, Any] = field(default_factory=dict)
346@dataclass
347class LLMConfig:
348 """Configuration for LLM operations.
350 Comprehensive configuration for LLM providers with 20+ parameters
351 controlling generation, rate limiting, function calling, and more.
352 Works seamlessly with both direct instantiation and dataknobs Config objects.
354 This class supports:
355 - All major LLM providers (OpenAI, Anthropic, Ollama, HuggingFace)
356 - Generation parameters (temperature, max_tokens, top_p, etc.)
357 - Function/tool calling configuration
358 - Streaming with callbacks
359 - Rate limiting and retry logic
360 - Provider-specific options via options dict
362 Example:
363 ```python
364 from dataknobs_llm.llm.base import LLMConfig, CompletionMode
366 # Basic configuration
367 config = LLMConfig(
368 provider="openai",
369 model="gpt-4",
370 api_key="sk-...",
371 temperature=0.7,
372 max_tokens=500
373 )
375 # Creative writing config
376 creative_config = LLMConfig(
377 provider="anthropic",
378 model="claude-3-sonnet",
379 temperature=1.2,
380 top_p=0.95,
381 max_tokens=2000
382 )
384 # Deterministic config for testing
385 test_config = LLMConfig(
386 provider="openai",
387 model="gpt-4",
388 temperature=0.0,
389 seed=42, # Reproducible outputs
390 max_tokens=100
391 )
393 # Function calling config
394 function_config = LLMConfig(
395 provider="openai",
396 model="gpt-4",
397 functions=[{
398 "name": "search_docs",
399 "description": "Search documentation",
400 "parameters": {"type": "object", "properties": {...}}
401 }],
402 function_call="auto"
403 )
405 # Streaming with callback
406 def on_chunk(chunk):
407 print(chunk.delta, end="")
409 streaming_config = LLMConfig(
410 provider="openai",
411 model="gpt-4",
412 stream=True,
413 stream_callback=on_chunk
414 )
416 # From dictionary (Config compatibility)
417 config_dict = {
418 "provider": "ollama",
419 "model": "llama2",
420 "type": "llm", # Config metadata (ignored)
421 "temperature": 0.8
422 }
423 config = LLMConfig.from_dict(config_dict)
425 # Clone with overrides
426 new_config = config.clone(temperature=1.0, max_tokens=1000)
427 ```
429 See Also:
430 normalize_llm_config: Convert various formats to LLMConfig
431 CompletionMode: Available completion modes
432 """
433 provider: str # 'openai', 'anthropic', 'ollama', etc.
434 model: str # Model name/identifier
435 api_key: str | None = None
436 api_base: str | None = None # Custom API endpoint
438 # Generation parameters
439 temperature: float = 0.7
440 max_tokens: int | None = None
441 top_p: float = 1.0
442 frequency_penalty: float = 0.0
443 presence_penalty: float = 0.0
444 stop_sequences: List[str] | None = None
446 # Mode settings
447 mode: CompletionMode = CompletionMode.CHAT
448 system_prompt: str | None = None
449 response_format: str | None = None # 'text' or 'json'
451 # Function calling
452 functions: List[Dict[str, Any]] | None = None
453 function_call: Union[str, Dict[str, str]] | None = None # 'auto', 'none', or specific function
455 # Streaming
456 stream: bool = False
457 stream_callback: Callable[[LLMStreamResponse], None] | None = None
459 # Rate limiting
460 rate_limit: int | None = None # Requests per minute
461 retry_count: int = 3
462 retry_delay: float = 1.0
463 timeout: float = 60.0
465 # Advanced settings
466 seed: int | None = None # For reproducibility
467 logit_bias: Dict[str, float] | None = None
468 user_id: str | None = None
470 # Provider-specific options
471 options: Dict[str, Any] = field(default_factory=dict)
473 @classmethod
474 def from_dict(cls, config_dict: Dict[str, Any]) -> "LLMConfig":
475 """Create LLMConfig from a dictionary.
477 This method handles dictionaries from dataknobs Config objects,
478 which may include 'type', 'name', and 'factory' attributes.
479 These attributes are ignored during LLMConfig construction.
481 Args:
482 config_dict: Configuration dictionary
484 Returns:
485 LLMConfig instance
486 """
487 # Filter out Config-specific attributes
488 config_data = {
489 k: v for k, v in config_dict.items()
490 if k not in ('type', 'name', 'factory')
491 }
493 # Handle mode conversion if it's a string
494 if 'mode' in config_data and isinstance(config_data['mode'], str):
495 config_data['mode'] = CompletionMode(config_data['mode'])
497 # Get dataclass fields to filter unknown attributes
498 valid_fields = {f.name for f in cls.__dataclass_fields__.values()}
499 filtered_data = {k: v for k, v in config_data.items() if k in valid_fields}
501 return cls(**filtered_data)
503 def to_dict(self, include_config_attrs: bool = False) -> Dict[str, Any]:
504 """Convert LLMConfig to a dictionary.
506 Args:
507 include_config_attrs: If True, includes 'type' attribute for Config compatibility
509 Returns:
510 Configuration dictionary
511 """
512 result = {}
514 for field_info in self.__dataclass_fields__.values():
515 value = getattr(self, field_info.name)
517 # Handle enum conversion
518 if isinstance(value, Enum):
519 result[field_info.name] = value.value
520 # Skip None values for optional fields
521 elif value is not None:
522 result[field_info.name] = value
523 # Include default factories even if empty for certain fields
524 elif field_info.name == 'options':
525 result[field_info.name] = {}
527 # Optionally add Config-compatible type attribute
528 if include_config_attrs:
529 result['type'] = 'llm'
531 return result
533 def clone(self, **overrides: Any) -> "LLMConfig":
534 """Create a copy of this config with optional overrides.
536 This method is useful for creating runtime configuration variations
537 without mutating the original config. All dataclass fields can be
538 overridden via keyword arguments.
540 Args:
541 **overrides: Field values to override in the cloned config
543 Returns:
544 New LLMConfig instance with overrides applied
546 Example:
547 >>> base_config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7)
548 >>> creative_config = base_config.clone(temperature=1.2, max_tokens=500)
549 """
550 from dataclasses import replace
551 return replace(self, **overrides)
554def normalize_llm_config(config: Union["LLMConfig", Config, Dict[str, Any]]) -> "LLMConfig":
555 """Normalize various config formats to LLMConfig.
557 This helper function accepts LLMConfig instances, dataknobs Config objects,
558 or plain dictionaries and returns a standardized LLMConfig instance.
560 Args:
561 config: Configuration as LLMConfig, Config object, or dictionary
563 Returns:
564 LLMConfig instance
566 Raises:
567 TypeError: If config type is not supported
568 """
569 # Already an LLMConfig instance
570 if isinstance(config, LLMConfig):
571 return config
573 # Dictionary (possibly from Config.get())
574 if isinstance(config, dict):
575 return LLMConfig.from_dict(config)
577 # dataknobs Config object - try to get the config dict
578 # We check for the get method to identify Config objects
579 if hasattr(config, 'get') and hasattr(config, 'get_types'):
580 # It's a Config object, extract the llm configuration
581 # Try to get first llm config, or fall back to first available type
582 try:
583 config_dict = config.get('llm', 0)
584 except Exception as e:
585 # If no 'llm' type, try to get first available config of any type
586 types = config.get_types()
587 if types:
588 config_dict = config.get(types[0], 0)
589 else:
590 raise ValueError("Config object has no configurations") from e
592 return LLMConfig.from_dict(config_dict)
594 raise TypeError(
595 f"Unsupported config type: {type(config).__name__}. "
596 f"Expected LLMConfig, Config, or dict."
597 )
600class LLMProvider(ABC):
601 """Base LLM provider interface."""
603 def __init__(
604 self,
605 config: Union[LLMConfig, Config, Dict[str, Any]],
606 prompt_builder: Union[PromptBuilder, AsyncPromptBuilder] | None = None
607 ):
608 """Initialize provider with configuration.
610 Args:
611 config: Configuration as LLMConfig, dataknobs Config object, or dict
612 prompt_builder: Optional prompt builder for integrated prompting
613 """
614 self.config = normalize_llm_config(config)
615 self.prompt_builder = prompt_builder
616 self._client = None
617 self._is_initialized = False
619 def _validate_prompt_builder(self, expected_type: type) -> None:
620 """Validate that prompt builder is configured and of correct type.
622 Args:
623 expected_type: Expected builder type (PromptBuilder or AsyncPromptBuilder)
625 Raises:
626 ValueError: If prompt_builder not configured
627 TypeError: If prompt_builder is wrong type
628 """
629 if not self.prompt_builder:
630 raise ValueError(
631 "No prompt_builder configured. Pass prompt_builder to __init__() "
632 "or use complete() directly with pre-rendered messages."
633 )
635 if not isinstance(self.prompt_builder, expected_type):
636 raise TypeError(
637 f"{self.__class__.__name__} requires {expected_type.__name__}, "
638 f"got {type(self.prompt_builder).__name__}"
639 )
641 def _validate_render_params(
642 self,
643 prompt_type: str
644 ) -> None:
645 """Validate render parameters.
647 Args:
648 prompt_type: Type of prompt to render
650 Raises:
651 ValueError: If prompt_type is invalid
652 """
653 if prompt_type not in ("system", "user", "both"):
654 raise ValueError(
655 f"Invalid prompt_type: {prompt_type}. "
656 f"Must be 'system', 'user', or 'both'"
657 )
659 @abstractmethod
660 def initialize(self) -> None:
661 """Initialize the LLM client."""
662 pass
664 @abstractmethod
665 def close(self) -> None:
666 """Close the LLM client."""
667 pass
669 @abstractmethod
670 def validate_model(self) -> bool:
671 """Validate that the model is available."""
672 pass
674 @abstractmethod
675 def get_capabilities(self) -> List[ModelCapability]:
676 """Get model capabilities."""
677 pass
679 @property
680 def is_initialized(self) -> bool:
681 """Check if provider is initialized."""
682 return self._is_initialized
684 def __enter__(self):
685 """Context manager entry."""
686 self.initialize()
687 return self
689 def __exit__(self, exc_type, exc_val, exc_tb):
690 """Context manager exit."""
691 self.close()
694class ConfigOverrideMixin:
695 """Mixin providing config override functionality for LLM providers.
697 This mixin provides shared functionality for handling per-request config
698 overrides, presets, and callbacks. Both AsyncLLMProvider and SyncLLMProvider
699 inherit from this mixin.
701 Features:
702 - Per-request config overrides (model, temperature, etc.)
703 - Named presets for common override combinations
704 - Callback hooks for logging/metrics
705 - Options dict merging
706 """
708 # Supported fields for config overrides (base set)
709 ALLOWED_CONFIG_OVERRIDES = {
710 # Core generation parameters
711 "model", "temperature", "max_tokens", "top_p", "stop_sequences", "seed",
712 # Provider-specific parameters
713 "presence_penalty", "frequency_penalty", "logit_bias", "response_format",
714 # Function calling (dynamic)
715 "functions", "function_call",
716 # Provider-specific options dict
717 "options",
718 }
720 # Override presets registry (class-level, shared across all providers)
721 _override_presets: Dict[str, Dict[str, Any]] = {}
723 # Override event callbacks (class-level)
724 _override_callbacks: List[Callable[[Any, Dict[str, Any], LLMConfig], None]] = []
726 @classmethod
727 def register_preset(cls, name: str, overrides: Dict[str, Any]) -> None:
728 """Register a named override preset.
730 Presets allow you to define common override combinations that can be
731 referenced by name instead of repeating the same overrides.
733 Args:
734 name: Preset name (e.g., "creative", "precise", "fast")
735 overrides: Dictionary of override values
737 Example:
738 >>> AsyncLLMProvider.register_preset("creative", {
739 ... "temperature": 1.2,
740 ... "top_p": 0.95,
741 ... "presence_penalty": 0.5
742 ... })
743 >>> response = await provider.complete(
744 ... "Write a poem",
745 ... config_overrides={"preset": "creative"}
746 ... )
747 """
748 cls._override_presets[name] = overrides.copy()
750 @classmethod
751 def on_override_applied(
752 cls,
753 callback: Callable[[Any, Dict[str, Any], LLMConfig], None]
754 ) -> None:
755 """Register a callback for when overrides are applied.
757 Use this for logging, metrics collection, or auditing override usage.
758 Callbacks receive the provider instance, the applied overrides dict,
759 and the resulting runtime config.
761 Args:
762 callback: Function(provider, overrides, runtime_config) -> None
764 Example:
765 >>> def log_overrides(provider, overrides, runtime_config):
766 ... print(f"Overrides applied: {overrides}")
767 ... print(f"Runtime model: {runtime_config.model}")
768 ...
769 >>> AsyncLLMProvider.on_override_applied(log_overrides)
770 """
771 cls._override_callbacks.append(callback)
773 @classmethod
774 def clear_override_callbacks(cls) -> None:
775 """Clear all registered override callbacks."""
776 cls._override_callbacks.clear()
778 @classmethod
779 def get_preset(cls, name: str) -> Dict[str, Any] | None:
780 """Get a registered override preset by name.
782 Args:
783 name: Preset name
785 Returns:
786 Preset overrides dict, or None if not found
787 """
788 return cls._override_presets.get(name)
790 @classmethod
791 def list_presets(cls) -> List[str]:
792 """List all registered preset names.
794 Returns:
795 List of preset names
796 """
797 return list(cls._override_presets.keys())
799 def _validate_config_overrides(
800 self,
801 overrides: Dict[str, Any] | None
802 ) -> None:
803 """Validate that config override fields are supported.
805 Args:
806 overrides: Dictionary of config overrides to validate
808 Raises:
809 ValueError: If overrides contains unsupported fields
810 """
811 if not overrides:
812 return
814 # Allow "preset" as a special key for named presets
815 allowed = self.ALLOWED_CONFIG_OVERRIDES | {"preset"}
816 invalid = set(overrides.keys()) - allowed
817 if invalid:
818 raise ValueError(
819 f"Unsupported config overrides: {invalid}. "
820 f"Allowed fields: {self.ALLOWED_CONFIG_OVERRIDES}"
821 )
823 def _expand_preset(
824 self,
825 overrides: Dict[str, Any]
826 ) -> Dict[str, Any]:
827 """Expand preset reference to actual override values.
829 If overrides contains a 'preset' key, replaces it with the
830 registered preset values. Explicit overrides take precedence
831 over preset values.
833 Args:
834 overrides: Override dict that may contain a preset reference
836 Returns:
837 Expanded overrides dict
839 Raises:
840 ValueError: If preset is not registered
841 """
842 if "preset" not in overrides:
843 return overrides
845 preset_name = overrides["preset"]
846 preset_values = self.get_preset(preset_name)
847 if preset_values is None:
848 raise ValueError(
849 f"Unknown preset: '{preset_name}'. "
850 f"Available presets: {self.list_presets()}"
851 )
853 # Preset values as base, explicit overrides take precedence
854 expanded = preset_values.copy()
855 for key, value in overrides.items():
856 if key != "preset":
857 expanded[key] = value
859 return expanded
861 def _merge_options(
862 self,
863 base_options: Dict[str, Any],
864 override_options: Dict[str, Any]
865 ) -> Dict[str, Any]:
866 """Deep merge options dicts.
868 Args:
869 base_options: Base options from config
870 override_options: Override options to merge
872 Returns:
873 Merged options dict
874 """
875 merged = base_options.copy()
876 merged.update(override_options)
877 return merged
879 def _notify_override_callbacks(
880 self,
881 overrides: Dict[str, Any],
882 runtime_config: LLMConfig
883 ) -> None:
884 """Notify registered callbacks about applied overrides.
886 Args:
887 overrides: The overrides that were applied
888 runtime_config: The resulting runtime config
889 """
890 for callback in self._override_callbacks:
891 try:
892 callback(self, overrides, runtime_config)
893 except Exception:
894 # Don't let callback errors break the main flow
895 pass
897 def _get_runtime_config(
898 self,
899 config_overrides: Dict[str, Any] | None = None
900 ) -> LLMConfig:
901 """Get runtime config, applying overrides if provided.
903 Supports:
904 - Direct field overrides (model, temperature, etc.)
905 - Named presets via 'preset' key
906 - Deep merging of 'options' dict
907 - Override callback notifications for logging/metrics
909 Args:
910 config_overrides: Optional overrides to apply
912 Returns:
913 LLMConfig to use for this request (original or cloned with overrides)
914 """
915 if not config_overrides:
916 return self.config # type: ignore[attr-defined]
918 self._validate_config_overrides(config_overrides)
920 # Expand preset if present
921 expanded = self._expand_preset(config_overrides)
923 # Handle options merging specially
924 if "options" in expanded and self.config.options: # type: ignore[attr-defined]
925 expanded["options"] = self._merge_options(
926 self.config.options, # type: ignore[attr-defined]
927 expanded["options"]
928 )
930 runtime_config = self.config.clone(**expanded) # type: ignore[attr-defined]
932 # Notify callbacks
933 self._notify_override_callbacks(config_overrides, runtime_config)
935 return runtime_config
938class AsyncLLMProvider(LLMProvider, ConfigOverrideMixin):
939 """Async LLM provider interface."""
941 @abstractmethod
942 async def complete(
943 self,
944 messages: Union[str, List[LLMMessage]],
945 config_overrides: Dict[str, Any] | None = None,
946 **kwargs
947 ) -> LLMResponse:
948 """Generate completion asynchronously.
950 Primary method for getting LLM responses. Accepts either a simple
951 string prompt or a list of LLMMessage objects for multi-turn
952 conversations. This is the recommended async method for most use cases.
954 Args:
955 messages: Either a single string prompt or a list of LLMMessage
956 objects for multi-turn conversations.
957 config_overrides: Optional dict to override config fields for this
958 request only. Supported fields: model, temperature, max_tokens,
959 top_p, stop_sequences, seed. The original config is not modified.
960 **kwargs: Additional provider-specific parameters. Common options:
961 - temperature (float): Sampling temperature (0.0-2.0)
962 - max_tokens (int): Maximum tokens to generate
963 - top_p (float): Nucleus sampling parameter (0.0-1.0)
964 - stop (List[str]): Stop sequences
965 - presence_penalty (float): Presence penalty (-2.0 to 2.0)
966 - frequency_penalty (float): Frequency penalty (-2.0 to 2.0)
968 Returns:
969 LLMResponse containing generated content, usage stats, and metadata
971 Raises:
972 ValueError: If messages format is invalid or config_overrides contains
973 unsupported fields
974 ConnectionError: If API connection fails
975 TimeoutError: If request exceeds timeout
977 Example:
978 ```python
979 from dataknobs_llm import create_llm_provider
980 from dataknobs_llm.llm.base import LLMMessage
982 llm = create_llm_provider("openai", model="gpt-4")
984 # Simple string prompt
985 response = await llm.complete("What is Python?")
986 print(response.content)
987 # => "Python is a high-level programming language..."
989 # With config overrides (switch model per-request)
990 response = await llm.complete(
991 "Write a haiku about coding",
992 config_overrides={"model": "gpt-4-turbo", "temperature": 0.9}
993 )
995 # Multi-turn conversation
996 messages = [
997 LLMMessage(role="system", content="You are a helpful tutor"),
998 LLMMessage(role="user", content="Explain recursion"),
999 LLMMessage(role="assistant", content="Recursion is when..."),
1000 LLMMessage(role="user", content="Can you give an example?")
1001 ]
1002 response = await llm.complete(messages)
1004 # Check token usage
1005 print(f"Tokens: {response.usage['total_tokens']}")
1006 print(f"Cost: ${response.cost_usd:.4f}")
1007 ```
1009 See Also:
1010 stream_complete: Streaming version
1011 render_and_complete: Complete with prompt rendering
1012 """
1013 pass
1015 async def render_and_complete(
1016 self,
1017 prompt_name: str,
1018 params: Dict[str, Any] | None = None,
1019 prompt_type: str = "user",
1020 index: int = 0,
1021 include_rag: bool = True,
1022 **llm_kwargs
1023 ) -> LLMResponse:
1024 """Render prompt from library and execute LLM completion.
1026 This is a convenience method for one-off interactions that combines
1027 prompt rendering with LLM execution. For multi-turn conversations,
1028 use ConversationManager instead.
1030 Args:
1031 prompt_name: Name of prompt in library
1032 params: Parameters for template rendering
1033 prompt_type: Type of prompt ("system", "user", or "both")
1034 index: Prompt variant index (for user prompts)
1035 include_rag: Whether to execute RAG searches
1036 **llm_kwargs: Additional arguments passed to complete()
1038 Returns:
1039 LLM response
1041 Raises:
1042 ValueError: If prompt_builder not configured or invalid prompt_type
1043 TypeError: If prompt_builder is not AsyncPromptBuilder
1045 Example:
1046 >>> llm = OpenAIProvider(config, prompt_builder=builder)
1047 >>> result = await llm.render_and_complete(
1048 ... "analyze_code",
1049 ... params={"code": code, "language": "python"}
1050 ... )
1051 """
1052 # Validate
1053 from dataknobs_llm.prompts import AsyncPromptBuilder
1054 self._validate_prompt_builder(AsyncPromptBuilder)
1055 self._validate_render_params(prompt_type)
1057 # Render messages
1058 messages = await self._render_messages(
1059 prompt_name, params, prompt_type, index, include_rag
1060 )
1062 # Execute LLM
1063 return await self.complete(messages, **llm_kwargs)
1065 async def render_and_stream(
1066 self,
1067 prompt_name: str,
1068 params: Dict[str, Any] | None = None,
1069 prompt_type: str = "user",
1070 index: int = 0,
1071 include_rag: bool = True,
1072 **llm_kwargs
1073 ) -> AsyncIterator[LLMStreamResponse]:
1074 """Render prompt and stream LLM response.
1076 Same as render_and_complete() but returns streaming response.
1078 Args:
1079 prompt_name: Name of prompt in library
1080 params: Parameters for template rendering
1081 prompt_type: Type of prompt ("system", "user", or "both")
1082 index: Prompt variant index
1083 include_rag: Whether to execute RAG searches
1084 **llm_kwargs: Additional arguments passed to stream_complete()
1086 Yields:
1087 Streaming response chunks
1089 Raises:
1090 ValueError: If prompt_builder not configured or invalid prompt_type
1091 TypeError: If prompt_builder is not AsyncPromptBuilder
1093 Example:
1094 >>> async for chunk in llm.render_and_stream("analyze_code", params={"code": code}):
1095 ... print(chunk.delta, end="")
1096 """
1097 # Validate
1098 from dataknobs_llm.prompts import AsyncPromptBuilder
1099 self._validate_prompt_builder(AsyncPromptBuilder)
1100 self._validate_render_params(prompt_type)
1102 # Render messages
1103 messages = await self._render_messages(
1104 prompt_name, params, prompt_type, index, include_rag
1105 )
1107 # Stream LLM response
1108 async for chunk in self.stream_complete(messages, **llm_kwargs):
1109 yield chunk
1111 async def _render_messages(
1112 self,
1113 prompt_name: str,
1114 params: Dict[str, Any] | None,
1115 prompt_type: str,
1116 index: int,
1117 include_rag: bool
1118 ) -> List[LLMMessage]:
1119 """Render messages from prompt library (async version).
1121 Args:
1122 prompt_name: Name of prompt in library
1123 params: Parameters for template rendering
1124 prompt_type: Type of prompt ("system", "user", or "both")
1125 index: Prompt variant index
1126 include_rag: Whether to execute RAG searches
1128 Returns:
1129 List of rendered LLM messages
1130 """
1131 from dataknobs_llm.prompts import AsyncPromptBuilder
1132 builder: AsyncPromptBuilder = self.prompt_builder # type: ignore
1134 messages: List[LLMMessage] = []
1135 params = params or {}
1137 if prompt_type in ("system", "both"):
1138 result = await builder.render_system_prompt(
1139 prompt_name, params=params, include_rag=include_rag
1140 )
1141 messages.append(LLMMessage(role="system", content=result.content))
1143 if prompt_type in ("user", "both"):
1144 result = await builder.render_user_prompt(
1145 prompt_name, index=index, params=params, include_rag=include_rag
1146 )
1147 messages.append(LLMMessage(role="user", content=result.content))
1149 return messages
1151 @abstractmethod
1152 async def stream_complete(
1153 self,
1154 messages: Union[str, List[LLMMessage]],
1155 config_overrides: Dict[str, Any] | None = None,
1156 **kwargs
1157 ) -> AsyncIterator[LLMStreamResponse]:
1158 r"""Generate streaming completion asynchronously.
1160 Streams response chunks as they are generated, enabling real-time
1161 display of LLM output. Each chunk contains incremental content
1162 (delta), and the final chunk includes usage statistics.
1164 Args:
1165 messages: Either a single string prompt or list of LLMMessage objects
1166 config_overrides: Optional dict to override config fields for this
1167 request only. Supported fields: model, temperature, max_tokens,
1168 top_p, stop_sequences, seed. The original config is not modified.
1169 **kwargs: Provider-specific parameters (same as complete())
1171 Yields:
1172 LLMStreamResponse chunks containing incremental content. The final
1173 chunk has is_final=True and includes finish_reason and usage stats.
1175 Raises:
1176 ValueError: If messages format is invalid or config_overrides contains
1177 unsupported fields
1178 ConnectionError: If API connection fails
1179 TimeoutError: If request exceeds timeout
1181 Example:
1182 ```python
1183 from dataknobs_llm import create_llm_provider
1185 llm = create_llm_provider("openai", model="gpt-4")
1187 # Stream and display in real-time
1188 async for chunk in llm.stream_complete("Tell me a story"):
1189 print(chunk.delta, end="", flush=True)
1191 if chunk.is_final:
1192 print(f"\n\nFinished: {chunk.finish_reason}")
1193 print(f"Total tokens: {chunk.usage['total_tokens']}")
1195 # Stream with config overrides
1196 async for chunk in llm.stream_complete(
1197 "Write a poem",
1198 config_overrides={"model": "gpt-4-turbo", "temperature": 1.0}
1199 ):
1200 print(chunk.delta, end="", flush=True)
1202 # Accumulate full response
1203 full_text = ""
1204 chunk_count = 0
1206 async for chunk in llm.stream_complete("Explain quantum computing"):
1207 full_text += chunk.delta
1208 chunk_count += 1
1210 print(f"Received {chunk_count} chunks")
1211 print(f"Total length: {len(full_text)} characters")
1212 ```
1214 See Also:
1215 complete: Non-streaming version
1216 render_and_stream: Stream with prompt rendering
1217 LLMStreamResponse: Chunk data structure
1218 """
1219 pass
1221 @abstractmethod
1222 async def embed(
1223 self,
1224 texts: Union[str, List[str]],
1225 **kwargs
1226 ) -> Union[List[float], List[List[float]]]:
1227 """Generate embeddings asynchronously.
1229 Converts text into dense vector representations for semantic search,
1230 clustering, and similarity comparison. Returns high-dimensional
1231 vectors (typically 768-1536 dimensions depending on model).
1233 Args:
1234 texts: Single text string or list of texts to embed
1235 **kwargs: Provider-specific parameters:
1236 - model (str): Embedding model override
1237 - dimensions (int): Target dimensions (if supported)
1239 Returns:
1240 Single embedding vector (List[float]) if input is a string,
1241 or list of vectors (List[List[float]]) if input is a list
1243 Raises:
1244 ValueError: If texts is empty or invalid
1245 ConnectionError: If API connection fails
1247 Example:
1248 ```python
1249 from dataknobs_llm import create_llm_provider
1250 import numpy as np
1252 # Create embedding provider
1253 llm = create_llm_provider(
1254 "openai",
1255 model="text-embedding-ada-002"
1256 )
1258 # Single text embedding
1259 embedding = await llm.embed("What is machine learning?")
1260 print(f"Dimensions: {len(embedding)}")
1261 # => Dimensions: 1536
1263 # Batch embedding
1264 texts = [
1265 "Python is a programming language",
1266 "JavaScript is used for web development",
1267 "Machine learning uses statistical methods"
1268 ]
1269 embeddings = await llm.embed(texts)
1270 print(f"Generated {len(embeddings)} embeddings")
1272 # Compute similarity
1273 def cosine_similarity(v1, v2):
1274 return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
1276 query_emb = await llm.embed("Tell me about ML")
1277 similarities = [
1278 cosine_similarity(query_emb, emb)
1279 for emb in embeddings
1280 ]
1281 most_similar_idx = np.argmax(similarities)
1282 print(f"Most similar: {texts[most_similar_idx]}")
1283 # => Most similar: Machine learning uses statistical methods
1285 # Store in vector database
1286 from dataknobs_data import database_factory
1287 db = database_factory.create("vector_db")
1288 for text, emb in zip(texts, embeddings):
1289 db.create({"text": text, "embedding": emb})
1290 ```
1292 See Also:
1293 complete: Text generation method
1294 """
1295 pass
1297 @abstractmethod
1298 async def function_call(
1299 self,
1300 messages: List[LLMMessage],
1301 functions: List[Dict[str, Any]],
1302 **kwargs
1303 ) -> LLMResponse:
1304 """Execute function calling asynchronously.
1306 Enables the LLM to call external functions/tools. The model decides
1307 which function to call based on the conversation context, and returns
1308 the function name and arguments in a structured format.
1310 Args:
1311 messages: Conversation messages leading up to the function call
1312 functions: List of function definitions in JSON Schema format.
1313 Each function dict must have:
1314 - name (str): Function name
1315 - description (str): What the function does
1316 - parameters (dict): JSON Schema for parameters
1317 **kwargs: Provider-specific parameters:
1318 - function_call (str|dict): 'auto', 'none', or specific function
1319 - temperature (float): Sampling temperature
1320 - max_tokens (int): Maximum response tokens
1322 Returns:
1323 LLMResponse with function_call field populated containing:
1324 - name (str): Function to call
1325 - arguments (str): JSON string of arguments
1327 Raises:
1328 ValueError: If functions format is invalid
1329 ConnectionError: If API connection fails
1331 Example:
1332 ```python
1333 from dataknobs_llm import create_llm_provider
1334 from dataknobs_llm.llm.base import LLMMessage
1335 import json
1337 llm = create_llm_provider("openai", model="gpt-4")
1339 # Define available functions
1340 functions = [
1341 {
1342 "name": "search_docs",
1343 "description": "Search documentation for information",
1344 "parameters": {
1345 "type": "object",
1346 "properties": {
1347 "query": {
1348 "type": "string",
1349 "description": "Search query"
1350 },
1351 "limit": {
1352 "type": "integer",
1353 "description": "Max results"
1354 }
1355 },
1356 "required": ["query"]
1357 }
1358 },
1359 {
1360 "name": "execute_code",
1361 "description": "Execute Python code",
1362 "parameters": {
1363 "type": "object",
1364 "properties": {
1365 "code": {"type": "string"}
1366 },
1367 "required": ["code"]
1368 }
1369 }
1370 ]
1372 # Ask question that requires function
1373 messages = [
1374 LLMMessage(
1375 role="user",
1376 content="Search for information about async/await in Python"
1377 )
1378 ]
1380 # Model decides to call function
1381 response = await llm.function_call(messages, functions)
1383 if response.function_call:
1384 func_name = response.function_call["name"]
1385 func_args = json.loads(response.function_call["arguments"])
1387 print(f"Function: {func_name}")
1388 print(f"Arguments: {func_args}")
1389 # => Function: search_docs
1390 # => Arguments: {'query': 'async/await Python', 'limit': 5}
1392 # Execute function
1393 results = search_docs(**func_args)
1395 # Add function result to conversation
1396 messages.append(LLMMessage(
1397 role="function",
1398 name=func_name,
1399 content=json.dumps(results)
1400 ))
1402 # Get final response
1403 final = await llm.complete(messages)
1404 print(final.content)
1405 ```
1407 See Also:
1408 complete: Standard completion without functions
1409 dataknobs_llm.tools: Tool abstraction framework
1410 """
1411 pass
1413 async def initialize(self) -> None:
1414 """Initialize the async LLM client."""
1415 self._is_initialized = True
1417 async def close(self) -> None:
1418 """Close the async LLM client."""
1419 self._is_initialized = False
1421 async def __aenter__(self):
1422 """Async context manager entry."""
1423 await self.initialize()
1424 return self
1426 async def __aexit__(self, exc_type, exc_val, exc_tb):
1427 """Async context manager exit."""
1428 await self.close()
1431class SyncLLMProvider(LLMProvider, ConfigOverrideMixin):
1432 """Synchronous LLM provider interface."""
1434 @abstractmethod
1435 def complete(
1436 self,
1437 messages: Union[str, List[LLMMessage]],
1438 config_overrides: Dict[str, Any] | None = None,
1439 **kwargs
1440 ) -> LLMResponse:
1441 """Generate completion synchronously.
1443 Args:
1444 messages: Input messages or prompt
1445 config_overrides: Optional dict to override config fields for this
1446 request only. Supported fields: model, temperature, max_tokens,
1447 top_p, stop_sequences, seed. The original config is not modified.
1448 **kwargs: Additional parameters
1450 Returns:
1451 LLM response
1452 """
1453 pass
1455 def render_and_complete(
1456 self,
1457 prompt_name: str,
1458 params: Dict[str, Any] | None = None,
1459 prompt_type: str = "user",
1460 index: int = 0,
1461 include_rag: bool = True,
1462 **llm_kwargs
1463 ) -> LLMResponse:
1464 """Render prompt from library and execute LLM completion.
1466 This is a convenience method for one-off interactions that combines
1467 prompt rendering with LLM execution. For multi-turn conversations,
1468 use ConversationManager instead.
1470 Args:
1471 prompt_name: Name of prompt in library
1472 params: Parameters for template rendering
1473 prompt_type: Type of prompt ("system", "user", or "both")
1474 index: Prompt variant index (for user prompts)
1475 include_rag: Whether to execute RAG searches
1476 **llm_kwargs: Additional arguments passed to complete()
1478 Returns:
1479 LLM response
1481 Raises:
1482 ValueError: If prompt_builder not configured or invalid prompt_type
1483 TypeError: If prompt_builder is not PromptBuilder
1485 Example:
1486 >>> llm = SyncOpenAIProvider(config, prompt_builder=builder)
1487 >>> result = llm.render_and_complete(
1488 ... "analyze_code",
1489 ... params={"code": code, "language": "python"}
1490 ... )
1491 """
1492 # Validate
1493 from dataknobs_llm.prompts import PromptBuilder
1494 self._validate_prompt_builder(PromptBuilder)
1495 self._validate_render_params(prompt_type)
1497 # Render messages
1498 messages = self._render_messages(
1499 prompt_name, params, prompt_type, index, include_rag
1500 )
1502 # Execute LLM
1503 return self.complete(messages, **llm_kwargs)
1505 def render_and_stream(
1506 self,
1507 prompt_name: str,
1508 params: Dict[str, Any] | None = None,
1509 prompt_type: str = "user",
1510 index: int = 0,
1511 include_rag: bool = True,
1512 **llm_kwargs
1513 ) -> Iterator[LLMStreamResponse]:
1514 """Render prompt and stream LLM response.
1516 Same as render_and_complete() but returns streaming response.
1518 Args:
1519 prompt_name: Name of prompt in library
1520 params: Parameters for template rendering
1521 prompt_type: Type of prompt ("system", "user", or "both")
1522 index: Prompt variant index
1523 include_rag: Whether to execute RAG searches
1524 **llm_kwargs: Additional arguments passed to stream_complete()
1526 Yields:
1527 Streaming response chunks
1529 Raises:
1530 ValueError: If prompt_builder not configured or invalid prompt_type
1531 TypeError: If prompt_builder is not PromptBuilder
1533 Example:
1534 >>> for chunk in llm.render_and_stream("analyze_code", params={"code": code}):
1535 ... print(chunk.delta, end="")
1536 """
1537 # Validate
1538 from dataknobs_llm.prompts import PromptBuilder
1539 self._validate_prompt_builder(PromptBuilder)
1540 self._validate_render_params(prompt_type)
1542 # Render messages
1543 messages = self._render_messages(
1544 prompt_name, params, prompt_type, index, include_rag
1545 )
1547 # Stream LLM response
1548 for chunk in self.stream_complete(messages, **llm_kwargs):
1549 yield chunk
1551 def _render_messages(
1552 self,
1553 prompt_name: str,
1554 params: Dict[str, Any] | None,
1555 prompt_type: str,
1556 index: int,
1557 include_rag: bool
1558 ) -> List[LLMMessage]:
1559 """Render messages from prompt library (sync version).
1561 Args:
1562 prompt_name: Name of prompt in library
1563 params: Parameters for template rendering
1564 prompt_type: Type of prompt ("system", "user", or "both")
1565 index: Prompt variant index
1566 include_rag: Whether to execute RAG searches
1568 Returns:
1569 List of rendered LLM messages
1570 """
1571 from dataknobs_llm.prompts import PromptBuilder
1572 builder: PromptBuilder = self.prompt_builder # type: ignore
1574 messages: List[LLMMessage] = []
1575 params = params or {}
1577 if prompt_type in ("system", "both"):
1578 result = builder.render_system_prompt(
1579 prompt_name, params=params, include_rag=include_rag
1580 )
1581 messages.append(LLMMessage(role="system", content=result.content))
1583 if prompt_type in ("user", "both"):
1584 result = builder.render_user_prompt(
1585 prompt_name, index=index, params=params, include_rag=include_rag
1586 )
1587 messages.append(LLMMessage(role="user", content=result.content))
1589 return messages
1591 @abstractmethod
1592 def stream_complete(
1593 self,
1594 messages: Union[str, List[LLMMessage]],
1595 config_overrides: Dict[str, Any] | None = None,
1596 **kwargs
1597 ) -> Iterator[LLMStreamResponse]:
1598 """Generate streaming completion synchronously.
1600 Args:
1601 messages: Input messages or prompt
1602 config_overrides: Optional dict to override config fields for this
1603 request only. Supported fields: model, temperature, max_tokens,
1604 top_p, stop_sequences, seed. The original config is not modified.
1605 **kwargs: Additional parameters
1607 Yields:
1608 Streaming response chunks
1609 """
1610 pass
1612 @abstractmethod
1613 def embed(
1614 self,
1615 texts: Union[str, List[str]],
1616 **kwargs
1617 ) -> Union[List[float], List[List[float]]]:
1618 """Generate embeddings synchronously.
1620 Args:
1621 texts: Input text(s)
1622 **kwargs: Additional parameters
1624 Returns:
1625 Embedding vector(s)
1626 """
1627 pass
1629 @abstractmethod
1630 def function_call(
1631 self,
1632 messages: List[LLMMessage],
1633 functions: List[Dict[str, Any]],
1634 **kwargs
1635 ) -> LLMResponse:
1636 """Execute function calling synchronously.
1638 Args:
1639 messages: Conversation messages
1640 functions: Available functions
1641 **kwargs: Additional parameters
1643 Returns:
1644 Response with function call
1645 """
1646 pass
1648 def initialize(self) -> None:
1649 """Initialize the sync LLM client."""
1650 self._is_initialized = True
1652 def close(self) -> None:
1653 """Close the sync LLM client."""
1654 self._is_initialized = False
1657class LLMAdapter(ABC):
1658 """Base adapter for converting between different LLM formats.
1660 Adapters translate between the standard dataknobs LLM format
1661 (LLMMessage, LLMResponse, LLMConfig) and provider-specific formats
1662 (OpenAI, Anthropic, etc.). Each provider implementation should
1663 have a corresponding adapter.
1665 This enables provider-agnostic code that works across different
1666 LLM APIs without modification.
1668 Example:
1669 ```python
1670 from dataknobs_llm.llm.base import LLMAdapter, LLMMessage, LLMResponse
1671 from typing import Any, List, Dict
1673 class MyProviderAdapter(LLMAdapter):
1674 \"\"\"Adapter for custom LLM provider.\"\"\"
1676 def adapt_messages(
1677 self,
1678 messages: List[LLMMessage]
1679 ) -> List[Dict[str, str]]:
1680 \"\"\"Convert to provider format.\"\"\"
1681 return [
1682 {"role": msg.role, "content": msg.content}
1683 for msg in messages
1684 ]
1686 def adapt_response(
1687 self,
1688 response: Any
1689 ) -> LLMResponse:
1690 \"\"\"Convert from provider format.\"\"\"
1691 return LLMResponse(
1692 content=response["text"],
1693 model=response["model_id"],
1694 usage={
1695 "total_tokens": response["tokens_used"]
1696 }
1697 )
1699 def adapt_config(
1700 self,
1701 config: LLMConfig
1702 ) -> Dict[str, Any]:
1703 \"\"\"Convert config to provider format.\"\"\"
1704 return {
1705 "model_name": config.model,
1706 "temp": config.temperature,
1707 "max_length": config.max_tokens
1708 }
1710 # Use adapter in provider
1711 adapter = MyProviderAdapter()
1712 provider_messages = adapter.adapt_messages(messages)
1713 ```
1715 See Also:
1716 LLMProvider: Base provider interface
1717 dataknobs_llm.llm.providers.OpenAIAdapter: Example implementation
1718 """
1720 @abstractmethod
1721 def adapt_messages(
1722 self,
1723 messages: List[LLMMessage]
1724 ) -> Any:
1725 """Adapt messages to provider format.
1727 Args:
1728 messages: Standard LLMMessage list
1730 Returns:
1731 Provider-specific message format
1732 """
1733 pass
1735 @abstractmethod
1736 def adapt_response(
1737 self,
1738 response: Any
1739 ) -> LLMResponse:
1740 """Adapt provider response to standard format.
1742 Args:
1743 response: Provider-specific response object
1745 Returns:
1746 Standard LLMResponse
1747 """
1748 pass
1750 @abstractmethod
1751 def adapt_config(
1752 self,
1753 config: LLMConfig
1754 ) -> Dict[str, Any]:
1755 """Adapt configuration to provider format.
1757 Args:
1758 config: Standard LLMConfig
1760 Returns:
1761 Provider-specific config dict
1762 """
1763 pass
1766class LLMMiddleware(Protocol):
1767 """Protocol for LLM middleware.
1769 Middleware provides hooks to transform requests before they're sent
1770 to the LLM and responses before they're returned to the caller.
1771 Useful for logging, caching, content filtering, rate limiting, etc.
1773 Middleware can accept configuration as LLMConfig, dataknobs Config, or dict.
1775 Example:
1776 ```python
1777 from dataknobs_llm.llm.base import (
1778 LLMMiddleware, LLMMessage, LLMResponse, LLMConfig
1779 )
1780 from typing import List, Union, Dict, Any
1781 import logging
1783 class LoggingMiddleware:
1784 \"\"\"Logs all LLM requests and responses.\"\"\"
1786 def __init__(self):
1787 self.logger = logging.getLogger(__name__)
1789 async def process_request(
1790 self,
1791 messages: List[LLMMessage],
1792 config: Union[LLMConfig, Config, Dict[str, Any]]
1793 ) -> List[LLMMessage]:
1794 \"\"\"Log request before sending.\"\"\"
1795 self.logger.info(f"Request: {len(messages)} messages")
1796 for msg in messages:
1797 self.logger.debug(f" {msg.role}: {msg.content[:50]}...")
1798 return messages
1800 async def process_response(
1801 self,
1802 response: LLMResponse,
1803 config: Union[LLMConfig, Config, Dict[str, Any]]
1804 ) -> LLMResponse:
1805 \"\"\"Log response after receiving.\"\"\"
1806 self.logger.info(f"Response: {len(response.content)} chars")
1807 self.logger.info(f"Tokens: {response.usage['total_tokens']}")
1808 if response.cost_usd:
1809 self.logger.info(f"Cost: ${response.cost_usd:.4f}")
1810 return response
1813 class ContentFilterMiddleware:
1814 \"\"\"Filters sensitive content.\"\"\"
1816 def __init__(self, blocked_words: List[str]):
1817 self.blocked_words = blocked_words
1819 async def process_request(
1820 self,
1821 messages: List[LLMMessage],
1822 config: Union[LLMConfig, Config, Dict[str, Any]]
1823 ) -> List[LLMMessage]:
1824 \"\"\"Filter input messages.\"\"\"
1825 filtered = []
1826 for msg in messages:
1827 content = msg.content
1828 for word in self.blocked_words:
1829 content = content.replace(word, "***")
1830 filtered.append(LLMMessage(
1831 role=msg.role,
1832 content=content,
1833 name=msg.name,
1834 function_call=msg.function_call,
1835 metadata=msg.metadata
1836 ))
1837 return filtered
1839 async def process_response(
1840 self,
1841 response: LLMResponse,
1842 config: Union[LLMConfig, Config, Dict[str, Any]]
1843 ) -> LLMResponse:
1844 \"\"\"Filter output.\"\"\"
1845 content = response.content
1846 for word in self.blocked_words:
1847 content = content.replace(word, "***")
1849 from dataclasses import replace
1850 return replace(response, content=content)
1853 # Use with ConversationManager
1854 from dataknobs_llm.conversations import ConversationManager
1856 manager = await ConversationManager.create(
1857 llm=llm,
1858 prompt_builder=builder,
1859 middleware=[
1860 LoggingMiddleware(),
1861 ContentFilterMiddleware(["password", "secret"])
1862 ]
1863 )
1864 ```
1866 See Also:
1867 ConversationManager: Uses middleware for request/response processing
1868 """
1870 async def process_request(
1871 self,
1872 messages: List[LLMMessage],
1873 config: Union[LLMConfig, Config, Dict[str, Any]]
1874 ) -> List[LLMMessage]:
1875 """Process request before sending to LLM.
1877 Transform, log, validate, or filter messages before they are
1878 sent to the LLM provider.
1880 Args:
1881 messages: Input messages to be sent to LLM
1882 config: Configuration (LLMConfig, Config, or dict)
1884 Returns:
1885 Processed messages (can be modified, added to, or filtered)
1887 Raises:
1888 ValueError: If messages are invalid
1889 """
1890 ...
1892 async def process_response(
1893 self,
1894 response: LLMResponse,
1895 config: Union[LLMConfig, Config, Dict[str, Any]]
1896 ) -> LLMResponse:
1897 """Process response from LLM.
1899 Transform, log, validate, or filter the LLM response before
1900 returning to the caller.
1902 Args:
1903 response: LLM response to process
1904 config: Configuration (LLMConfig, Config, or dict)
1906 Returns:
1907 Processed response (can be modified)
1909 Raises:
1910 ValueError: If response is invalid
1911 """
1912 ...