Coverage for src/dataknobs_llm/llm/base.py: 81%
206 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Base LLM abstraction components.
3This module provides the base abstractions for unified LLM operations across
4different providers (OpenAI, Anthropic, Ollama, etc.). It defines standard
5interfaces for completions, streaming, embeddings, and function calling.
7The architecture follows a provider pattern where all LLM providers implement
8common interfaces (AsyncLLMProvider or SyncLLMProvider) and use standardized
9data structures (LLMMessage, LLMResponse, LLMConfig).
11Key Components:
12 - LLMProvider: Base provider interface with initialization and lifecycle
13 - AsyncLLMProvider: Async provider with complete(), stream_complete(), embed()
14 - SyncLLMProvider: Synchronous version for non-async applications
15 - LLMMessage: Standard message format for conversations
16 - LLMResponse: Standard response with content, usage, and cost tracking
17 - LLMConfig: Comprehensive configuration with 20+ parameters
18 - LLMAdapter: Format adapters for provider-specific APIs
19 - LLMMiddleware: Request/response processing pipeline
21Example:
22 ```python
23 from dataknobs_llm import create_llm_provider
24 from dataknobs_llm.llm.base import LLMConfig, LLMMessage
26 # Create provider with config
27 config = LLMConfig(
28 provider="openai",
29 model="gpt-4",
30 temperature=0.7,
31 max_tokens=500
32 )
34 # Async usage
35 async with create_llm_provider(config) as llm:
36 # Simple completion
37 response = await llm.complete("What is Python?")
38 print(response.content)
40 # Streaming
41 async for chunk in llm.stream_complete("Tell me a story"):
42 print(chunk.delta, end="", flush=True)
44 # Multi-turn conversation
45 messages = [
46 LLMMessage(role="system", content="You are helpful"),
47 LLMMessage(role="user", content="Hello!"),
48 ]
49 response = await llm.complete(messages)
50 ```
52See Also:
53 - dataknobs_llm.llm.providers: Provider implementations
54 - dataknobs_llm.conversations: Multi-turn conversation management
55 - dataknobs_llm.prompts: Prompt rendering and RAG integration
56"""
58from abc import ABC, abstractmethod
59from dataclasses import dataclass, field
60from enum import Enum
61from typing import (
62 Any, Dict, List, Union, AsyncIterator, Iterator,
63 Callable, Protocol
64)
65from datetime import datetime
67# Import prompt builder types - clean one-way dependency (llm depends on prompts)
68from dataknobs_llm.prompts import AsyncPromptBuilder, PromptBuilder
69from dataknobs_config.config import Config
72class CompletionMode(Enum):
73 """LLM completion modes.
75 Defines the operation mode for LLM requests. Different modes use
76 different APIs and formatting requirements.
78 Attributes:
79 CHAT: Chat completion with conversational message history
80 TEXT: Raw text completion (legacy models)
81 INSTRUCT: Instruction-following mode
82 EMBEDDING: Generate vector embeddings for semantic search
83 FUNCTION: Function/tool calling mode
85 Example:
86 ```python
87 from dataknobs_llm.llm.base import LLMConfig, CompletionMode
89 # Chat mode (default for modern models)
90 config = LLMConfig(
91 provider="openai",
92 model="gpt-4",
93 mode=CompletionMode.CHAT
94 )
96 # Embedding mode for vector search
97 embedding_config = LLMConfig(
98 provider="openai",
99 model="text-embedding-ada-002",
100 mode=CompletionMode.EMBEDDING
101 )
102 ```
103 """
104 CHAT = "chat" # Chat completion with message history
105 TEXT = "text" # Text completion
106 INSTRUCT = "instruct" # Instruction following
107 EMBEDDING = "embedding" # Generate embeddings
108 FUNCTION = "function" # Function calling
111class ModelCapability(Enum):
112 """Model capabilities.
114 Enumerates the capabilities that different LLM models support.
115 Providers use this to advertise what features are available for
116 a specific model.
118 Attributes:
119 TEXT_GENERATION: Basic text generation
120 CHAT: Multi-turn conversational interactions
121 EMBEDDINGS: Vector embedding generation
122 FUNCTION_CALLING: Tool/function calling support
123 VISION: Image understanding capabilities
124 CODE: Code generation and analysis
125 JSON_MODE: Structured JSON output
126 STREAMING: Incremental response streaming
128 Example:
129 ```python
130 from dataknobs_llm import create_llm_provider
131 from dataknobs_llm.llm.base import ModelCapability
133 # Check model capabilities
134 llm = create_llm_provider("openai", model="gpt-4")
135 capabilities = llm.get_capabilities()
137 if ModelCapability.STREAMING in capabilities:
138 # Use streaming
139 async for chunk in llm.stream_complete("Hello"):
140 print(chunk.delta, end="")
142 if ModelCapability.FUNCTION_CALLING in capabilities:
143 # Use function calling
144 response = await llm.function_call(messages, functions)
145 ```
146 """
147 TEXT_GENERATION = "text_generation"
148 CHAT = "chat"
149 EMBEDDINGS = "embeddings"
150 FUNCTION_CALLING = "function_calling"
151 VISION = "vision"
152 CODE = "code"
153 JSON_MODE = "json_mode"
154 STREAMING = "streaming"
157@dataclass
158class LLMMessage:
159 """Represents a message in LLM conversation.
161 Standard message format used across all providers. Messages are the
162 fundamental unit of LLM interactions, containing role-based content
163 for multi-turn conversations.
165 Attributes:
166 role: Message role - 'system', 'user', 'assistant', or 'function'
167 content: Message content text
168 name: Optional name for function messages or multi-user scenarios
169 function_call: Function call data for tool-using models
170 metadata: Additional metadata (timestamps, IDs, etc.)
172 Example:
173 ```python
174 from dataknobs_llm.llm.base import LLMMessage
176 # System message
177 system_msg = LLMMessage(
178 role="system",
179 content="You are a helpful coding assistant."
180 )
182 # User message
183 user_msg = LLMMessage(
184 role="user",
185 content="How do I reverse a list in Python?"
186 )
188 # Assistant message
189 assistant_msg = LLMMessage(
190 role="assistant",
191 content="Use the reverse() method or [::-1] slicing."
192 )
194 # Function result message
195 function_msg = LLMMessage(
196 role="function",
197 name="search_docs",
198 content='{"result": "Found 3 examples"}'
199 )
201 # Build conversation
202 messages = [system_msg, user_msg, assistant_msg]
203 ```
204 """
205 role: str # 'system', 'user', 'assistant', 'function'
206 content: str
207 name: str | None = None # For function messages
208 function_call: Dict[str, Any] | None = None # For function calling
209 metadata: Dict[str, Any] = field(default_factory=dict)
212@dataclass
213class LLMResponse:
214 """Response from LLM.
216 Standard response format returned by all LLM providers. Contains the
217 generated content along with metadata about token usage, cost, and
218 completion status.
220 Attributes:
221 content: Generated text content
222 model: Model identifier that generated the response
223 finish_reason: Why generation stopped - 'stop', 'length', 'function_call'
224 usage: Token usage stats (prompt_tokens, completion_tokens, total_tokens)
225 function_call: Function call data if model requested tool use
226 metadata: Provider-specific metadata
227 created_at: Response timestamp
228 cost_usd: Estimated cost in USD for this request
229 cumulative_cost_usd: Running total cost for conversation
231 Example:
232 ```python
233 from dataknobs_llm import create_llm_provider
235 llm = create_llm_provider("openai", model="gpt-4")
236 response = await llm.complete("What is Python?")
238 # Access response data
239 print(response.content)
240 # => "Python is a high-level programming language..."
242 # Check token usage
243 print(f"Tokens used: {response.usage['total_tokens']}")
244 # => Tokens used: 87
246 # Monitor costs
247 if response.cost_usd:
248 print(f"Cost: ${response.cost_usd:.4f}")
249 print(f"Total: ${response.cumulative_cost_usd:.4f}")
251 # Check completion status
252 if response.finish_reason == "length":
253 print("Response truncated due to max_tokens limit")
254 ```
256 See Also:
257 LLMMessage: Request message format
258 LLMStreamResponse: Streaming response format
259 """
260 content: str
261 model: str
262 finish_reason: str | None = None # 'stop', 'length', 'function_call'
263 usage: Dict[str, int] | None = None # tokens used
264 function_call: Dict[str, Any] | None = None
265 metadata: Dict[str, Any] = field(default_factory=dict)
266 created_at: datetime = field(default_factory=datetime.now)
268 # Cost tracking (optional enhancement for DynaBot)
269 cost_usd: float | None = None # Estimated cost in USD
270 cumulative_cost_usd: float | None = None # Running total for conversation
273@dataclass
274class LLMStreamResponse:
275 r"""Streaming response from LLM.
277 Represents a single chunk in a streaming LLM response. Streaming
278 allows displaying generated text incrementally as it's produced,
279 providing better user experience for long responses.
281 Attributes:
282 delta: Incremental content for this chunk (not cumulative)
283 is_final: True if this is the last chunk in the stream
284 finish_reason: Why generation stopped (only set on final chunk)
285 usage: Token usage stats (only set on final chunk)
286 metadata: Additional chunk metadata
288 Example:
289 ```python
290 from dataknobs_llm import create_llm_provider
292 llm = create_llm_provider("openai", model="gpt-4")
294 # Stream and display in real-time
295 async for chunk in llm.stream_complete("Write a poem"):
296 print(chunk.delta, end="", flush=True)
298 if chunk.is_final:
299 print(f"\n\nFinished: {chunk.finish_reason}")
300 print(f"Tokens: {chunk.usage['total_tokens']}")
302 # Accumulate full response
303 full_text = ""
304 chunks_received = 0
306 async for chunk in llm.stream_complete("Explain Python"):
307 full_text += chunk.delta
308 chunks_received += 1
310 # Optional: show progress
311 if chunks_received % 10 == 0:
312 print(f"Received {chunks_received} chunks...")
314 print(f"\nComplete response ({len(full_text)} chars)")
315 print(full_text)
316 ```
318 See Also:
319 LLMResponse: Non-streaming response format
320 AsyncLLMProvider.stream_complete: Streaming method
321 """
322 delta: str # Incremental content
323 is_final: bool = False
324 finish_reason: str | None = None
325 usage: Dict[str, int] | None = None
326 metadata: Dict[str, Any] = field(default_factory=dict)
329@dataclass
330class LLMConfig:
331 """Configuration for LLM operations.
333 Comprehensive configuration for LLM providers with 20+ parameters
334 controlling generation, rate limiting, function calling, and more.
335 Works seamlessly with both direct instantiation and dataknobs Config objects.
337 This class supports:
338 - All major LLM providers (OpenAI, Anthropic, Ollama, HuggingFace)
339 - Generation parameters (temperature, max_tokens, top_p, etc.)
340 - Function/tool calling configuration
341 - Streaming with callbacks
342 - Rate limiting and retry logic
343 - Provider-specific options via options dict
345 Example:
346 ```python
347 from dataknobs_llm.llm.base import LLMConfig, CompletionMode
349 # Basic configuration
350 config = LLMConfig(
351 provider="openai",
352 model="gpt-4",
353 api_key="sk-...",
354 temperature=0.7,
355 max_tokens=500
356 )
358 # Creative writing config
359 creative_config = LLMConfig(
360 provider="anthropic",
361 model="claude-3-sonnet",
362 temperature=1.2,
363 top_p=0.95,
364 max_tokens=2000
365 )
367 # Deterministic config for testing
368 test_config = LLMConfig(
369 provider="openai",
370 model="gpt-4",
371 temperature=0.0,
372 seed=42, # Reproducible outputs
373 max_tokens=100
374 )
376 # Function calling config
377 function_config = LLMConfig(
378 provider="openai",
379 model="gpt-4",
380 functions=[{
381 "name": "search_docs",
382 "description": "Search documentation",
383 "parameters": {"type": "object", "properties": {...}}
384 }],
385 function_call="auto"
386 )
388 # Streaming with callback
389 def on_chunk(chunk):
390 print(chunk.delta, end="")
392 streaming_config = LLMConfig(
393 provider="openai",
394 model="gpt-4",
395 stream=True,
396 stream_callback=on_chunk
397 )
399 # From dictionary (Config compatibility)
400 config_dict = {
401 "provider": "ollama",
402 "model": "llama2",
403 "type": "llm", # Config metadata (ignored)
404 "temperature": 0.8
405 }
406 config = LLMConfig.from_dict(config_dict)
408 # Clone with overrides
409 new_config = config.clone(temperature=1.0, max_tokens=1000)
410 ```
412 See Also:
413 normalize_llm_config: Convert various formats to LLMConfig
414 CompletionMode: Available completion modes
415 """
416 provider: str # 'openai', 'anthropic', 'ollama', etc.
417 model: str # Model name/identifier
418 api_key: str | None = None
419 api_base: str | None = None # Custom API endpoint
421 # Generation parameters
422 temperature: float = 0.7
423 max_tokens: int | None = None
424 top_p: float = 1.0
425 frequency_penalty: float = 0.0
426 presence_penalty: float = 0.0
427 stop_sequences: List[str] | None = None
429 # Mode settings
430 mode: CompletionMode = CompletionMode.CHAT
431 system_prompt: str | None = None
432 response_format: str | None = None # 'text' or 'json'
434 # Function calling
435 functions: List[Dict[str, Any]] | None = None
436 function_call: Union[str, Dict[str, str]] | None = None # 'auto', 'none', or specific function
438 # Streaming
439 stream: bool = False
440 stream_callback: Callable[[LLMStreamResponse], None] | None = None
442 # Rate limiting
443 rate_limit: int | None = None # Requests per minute
444 retry_count: int = 3
445 retry_delay: float = 1.0
446 timeout: float = 60.0
448 # Advanced settings
449 seed: int | None = None # For reproducibility
450 logit_bias: Dict[str, float] | None = None
451 user_id: str | None = None
453 # Provider-specific options
454 options: Dict[str, Any] = field(default_factory=dict)
456 @classmethod
457 def from_dict(cls, config_dict: Dict[str, Any]) -> "LLMConfig":
458 """Create LLMConfig from a dictionary.
460 This method handles dictionaries from dataknobs Config objects,
461 which may include 'type', 'name', and 'factory' attributes.
462 These attributes are ignored during LLMConfig construction.
464 Args:
465 config_dict: Configuration dictionary
467 Returns:
468 LLMConfig instance
469 """
470 # Filter out Config-specific attributes
471 config_data = {
472 k: v for k, v in config_dict.items()
473 if k not in ('type', 'name', 'factory')
474 }
476 # Handle mode conversion if it's a string
477 if 'mode' in config_data and isinstance(config_data['mode'], str):
478 config_data['mode'] = CompletionMode(config_data['mode'])
480 # Get dataclass fields to filter unknown attributes
481 valid_fields = {f.name for f in cls.__dataclass_fields__.values()}
482 filtered_data = {k: v for k, v in config_data.items() if k in valid_fields}
484 return cls(**filtered_data)
486 def to_dict(self, include_config_attrs: bool = False) -> Dict[str, Any]:
487 """Convert LLMConfig to a dictionary.
489 Args:
490 include_config_attrs: If True, includes 'type' attribute for Config compatibility
492 Returns:
493 Configuration dictionary
494 """
495 result = {}
497 for field_info in self.__dataclass_fields__.values():
498 value = getattr(self, field_info.name)
500 # Handle enum conversion
501 if isinstance(value, Enum):
502 result[field_info.name] = value.value
503 # Skip None values for optional fields
504 elif value is not None:
505 result[field_info.name] = value
506 # Include default factories even if empty for certain fields
507 elif field_info.name == 'options':
508 result[field_info.name] = {}
510 # Optionally add Config-compatible type attribute
511 if include_config_attrs:
512 result['type'] = 'llm'
514 return result
516 def clone(self, **overrides: Any) -> "LLMConfig":
517 """Create a copy of this config with optional overrides.
519 This method is useful for creating runtime configuration variations
520 without mutating the original config. All dataclass fields can be
521 overridden via keyword arguments.
523 Args:
524 **overrides: Field values to override in the cloned config
526 Returns:
527 New LLMConfig instance with overrides applied
529 Example:
530 >>> base_config = LLMConfig(provider="openai", model="gpt-4", temperature=0.7)
531 >>> creative_config = base_config.clone(temperature=1.2, max_tokens=500)
532 """
533 from dataclasses import replace
534 return replace(self, **overrides)
537def normalize_llm_config(config: Union["LLMConfig", Config, Dict[str, Any]]) -> "LLMConfig":
538 """Normalize various config formats to LLMConfig.
540 This helper function accepts LLMConfig instances, dataknobs Config objects,
541 or plain dictionaries and returns a standardized LLMConfig instance.
543 Args:
544 config: Configuration as LLMConfig, Config object, or dictionary
546 Returns:
547 LLMConfig instance
549 Raises:
550 TypeError: If config type is not supported
551 """
552 # Already an LLMConfig instance
553 if isinstance(config, LLMConfig):
554 return config
556 # Dictionary (possibly from Config.get())
557 if isinstance(config, dict):
558 return LLMConfig.from_dict(config)
560 # dataknobs Config object - try to get the config dict
561 # We check for the get method to identify Config objects
562 if hasattr(config, 'get') and hasattr(config, 'get_types'):
563 # It's a Config object, extract the llm configuration
564 # Try to get first llm config, or fall back to first available type
565 try:
566 config_dict = config.get('llm', 0)
567 except Exception as e:
568 # If no 'llm' type, try to get first available config of any type
569 types = config.get_types()
570 if types:
571 config_dict = config.get(types[0], 0)
572 else:
573 raise ValueError("Config object has no configurations") from e
575 return LLMConfig.from_dict(config_dict)
577 raise TypeError(
578 f"Unsupported config type: {type(config).__name__}. "
579 f"Expected LLMConfig, Config, or dict."
580 )
583class LLMProvider(ABC):
584 """Base LLM provider interface."""
586 def __init__(
587 self,
588 config: Union[LLMConfig, Config, Dict[str, Any]],
589 prompt_builder: Union[PromptBuilder, AsyncPromptBuilder] | None = None
590 ):
591 """Initialize provider with configuration.
593 Args:
594 config: Configuration as LLMConfig, dataknobs Config object, or dict
595 prompt_builder: Optional prompt builder for integrated prompting
596 """
597 self.config = normalize_llm_config(config)
598 self.prompt_builder = prompt_builder
599 self._client = None
600 self._is_initialized = False
602 def _validate_prompt_builder(self, expected_type: type) -> None:
603 """Validate that prompt builder is configured and of correct type.
605 Args:
606 expected_type: Expected builder type (PromptBuilder or AsyncPromptBuilder)
608 Raises:
609 ValueError: If prompt_builder not configured
610 TypeError: If prompt_builder is wrong type
611 """
612 if not self.prompt_builder:
613 raise ValueError(
614 "No prompt_builder configured. Pass prompt_builder to __init__() "
615 "or use complete() directly with pre-rendered messages."
616 )
618 if not isinstance(self.prompt_builder, expected_type):
619 raise TypeError(
620 f"{self.__class__.__name__} requires {expected_type.__name__}, "
621 f"got {type(self.prompt_builder).__name__}"
622 )
624 def _validate_render_params(
625 self,
626 prompt_type: str
627 ) -> None:
628 """Validate render parameters.
630 Args:
631 prompt_type: Type of prompt to render
633 Raises:
634 ValueError: If prompt_type is invalid
635 """
636 if prompt_type not in ("system", "user", "both"):
637 raise ValueError(
638 f"Invalid prompt_type: {prompt_type}. "
639 f"Must be 'system', 'user', or 'both'"
640 )
642 @abstractmethod
643 def initialize(self) -> None:
644 """Initialize the LLM client."""
645 pass
647 @abstractmethod
648 def close(self) -> None:
649 """Close the LLM client."""
650 pass
652 @abstractmethod
653 def validate_model(self) -> bool:
654 """Validate that the model is available."""
655 pass
657 @abstractmethod
658 def get_capabilities(self) -> List[ModelCapability]:
659 """Get model capabilities."""
660 pass
662 @property
663 def is_initialized(self) -> bool:
664 """Check if provider is initialized."""
665 return self._is_initialized
667 def __enter__(self):
668 """Context manager entry."""
669 self.initialize()
670 return self
672 def __exit__(self, exc_type, exc_val, exc_tb):
673 """Context manager exit."""
674 self.close()
677class AsyncLLMProvider(LLMProvider):
678 """Async LLM provider interface."""
680 @abstractmethod
681 async def complete(
682 self,
683 messages: Union[str, List[LLMMessage]],
684 **kwargs
685 ) -> LLMResponse:
686 """Generate completion asynchronously.
688 Primary method for getting LLM responses. Accepts either a simple
689 string prompt or a list of LLMMessage objects for multi-turn
690 conversations. This is the recommended async method for most use cases.
692 Args:
693 messages: Either a single string prompt or a list of LLMMessage
694 objects for multi-turn conversations.
695 **kwargs: Additional provider-specific parameters. Common options:
696 - temperature (float): Sampling temperature (0.0-2.0)
697 - max_tokens (int): Maximum tokens to generate
698 - top_p (float): Nucleus sampling parameter (0.0-1.0)
699 - stop (List[str]): Stop sequences
700 - presence_penalty (float): Presence penalty (-2.0 to 2.0)
701 - frequency_penalty (float): Frequency penalty (-2.0 to 2.0)
703 Returns:
704 LLMResponse containing generated content, usage stats, and metadata
706 Raises:
707 ValueError: If messages format is invalid
708 ConnectionError: If API connection fails
709 TimeoutError: If request exceeds timeout
711 Example:
712 ```python
713 from dataknobs_llm import create_llm_provider
714 from dataknobs_llm.llm.base import LLMMessage
716 llm = create_llm_provider("openai", model="gpt-4")
718 # Simple string prompt
719 response = await llm.complete("What is Python?")
720 print(response.content)
721 # => "Python is a high-level programming language..."
723 # With parameters
724 response = await llm.complete(
725 "Write a haiku about coding",
726 temperature=0.9,
727 max_tokens=100
728 )
730 # Multi-turn conversation
731 messages = [
732 LLMMessage(role="system", content="You are a helpful tutor"),
733 LLMMessage(role="user", content="Explain recursion"),
734 LLMMessage(role="assistant", content="Recursion is when..."),
735 LLMMessage(role="user", content="Can you give an example?")
736 ]
737 response = await llm.complete(messages)
739 # Check token usage
740 print(f"Tokens: {response.usage['total_tokens']}")
741 print(f"Cost: ${response.cost_usd:.4f}")
742 ```
744 See Also:
745 stream_complete: Streaming version
746 render_and_complete: Complete with prompt rendering
747 """
748 pass
750 async def render_and_complete(
751 self,
752 prompt_name: str,
753 params: Dict[str, Any] | None = None,
754 prompt_type: str = "user",
755 index: int = 0,
756 include_rag: bool = True,
757 **llm_kwargs
758 ) -> LLMResponse:
759 """Render prompt from library and execute LLM completion.
761 This is a convenience method for one-off interactions that combines
762 prompt rendering with LLM execution. For multi-turn conversations,
763 use ConversationManager instead.
765 Args:
766 prompt_name: Name of prompt in library
767 params: Parameters for template rendering
768 prompt_type: Type of prompt ("system", "user", or "both")
769 index: Prompt variant index (for user prompts)
770 include_rag: Whether to execute RAG searches
771 **llm_kwargs: Additional arguments passed to complete()
773 Returns:
774 LLM response
776 Raises:
777 ValueError: If prompt_builder not configured or invalid prompt_type
778 TypeError: If prompt_builder is not AsyncPromptBuilder
780 Example:
781 >>> llm = OpenAIProvider(config, prompt_builder=builder)
782 >>> result = await llm.render_and_complete(
783 ... "analyze_code",
784 ... params={"code": code, "language": "python"}
785 ... )
786 """
787 # Validate
788 from dataknobs_llm.prompts import AsyncPromptBuilder
789 self._validate_prompt_builder(AsyncPromptBuilder)
790 self._validate_render_params(prompt_type)
792 # Render messages
793 messages = await self._render_messages(
794 prompt_name, params, prompt_type, index, include_rag
795 )
797 # Execute LLM
798 return await self.complete(messages, **llm_kwargs)
800 async def render_and_stream(
801 self,
802 prompt_name: str,
803 params: Dict[str, Any] | None = None,
804 prompt_type: str = "user",
805 index: int = 0,
806 include_rag: bool = True,
807 **llm_kwargs
808 ) -> AsyncIterator[LLMStreamResponse]:
809 """Render prompt and stream LLM response.
811 Same as render_and_complete() but returns streaming response.
813 Args:
814 prompt_name: Name of prompt in library
815 params: Parameters for template rendering
816 prompt_type: Type of prompt ("system", "user", or "both")
817 index: Prompt variant index
818 include_rag: Whether to execute RAG searches
819 **llm_kwargs: Additional arguments passed to stream_complete()
821 Yields:
822 Streaming response chunks
824 Raises:
825 ValueError: If prompt_builder not configured or invalid prompt_type
826 TypeError: If prompt_builder is not AsyncPromptBuilder
828 Example:
829 >>> async for chunk in llm.render_and_stream("analyze_code", params={"code": code}):
830 ... print(chunk.delta, end="")
831 """
832 # Validate
833 from dataknobs_llm.prompts import AsyncPromptBuilder
834 self._validate_prompt_builder(AsyncPromptBuilder)
835 self._validate_render_params(prompt_type)
837 # Render messages
838 messages = await self._render_messages(
839 prompt_name, params, prompt_type, index, include_rag
840 )
842 # Stream LLM response
843 async for chunk in self.stream_complete(messages, **llm_kwargs):
844 yield chunk
846 async def _render_messages(
847 self,
848 prompt_name: str,
849 params: Dict[str, Any] | None,
850 prompt_type: str,
851 index: int,
852 include_rag: bool
853 ) -> List[LLMMessage]:
854 """Render messages from prompt library (async version).
856 Args:
857 prompt_name: Name of prompt in library
858 params: Parameters for template rendering
859 prompt_type: Type of prompt ("system", "user", or "both")
860 index: Prompt variant index
861 include_rag: Whether to execute RAG searches
863 Returns:
864 List of rendered LLM messages
865 """
866 from dataknobs_llm.prompts import AsyncPromptBuilder
867 builder: AsyncPromptBuilder = self.prompt_builder # type: ignore
869 messages: List[LLMMessage] = []
870 params = params or {}
872 if prompt_type in ("system", "both"):
873 result = await builder.render_system_prompt(
874 prompt_name, params=params, include_rag=include_rag
875 )
876 messages.append(LLMMessage(role="system", content=result.content))
878 if prompt_type in ("user", "both"):
879 result = await builder.render_user_prompt(
880 prompt_name, index=index, params=params, include_rag=include_rag
881 )
882 messages.append(LLMMessage(role="user", content=result.content))
884 return messages
886 @abstractmethod
887 async def stream_complete(
888 self,
889 messages: Union[str, List[LLMMessage]],
890 **kwargs
891 ) -> AsyncIterator[LLMStreamResponse]:
892 r"""Generate streaming completion asynchronously.
894 Streams response chunks as they are generated, enabling real-time
895 display of LLM output. Each chunk contains incremental content
896 (delta), and the final chunk includes usage statistics.
898 Args:
899 messages: Either a single string prompt or list of LLMMessage objects
900 **kwargs: Provider-specific parameters (same as complete())
902 Yields:
903 LLMStreamResponse chunks containing incremental content. The final
904 chunk has is_final=True and includes finish_reason and usage stats.
906 Raises:
907 ValueError: If messages format is invalid
908 ConnectionError: If API connection fails
909 TimeoutError: If request exceeds timeout
911 Example:
912 ```python
913 from dataknobs_llm import create_llm_provider
915 llm = create_llm_provider("openai", model="gpt-4")
917 # Stream and display in real-time
918 async for chunk in llm.stream_complete("Tell me a story"):
919 print(chunk.delta, end="", flush=True)
921 if chunk.is_final:
922 print(f"\n\nFinished: {chunk.finish_reason}")
923 print(f"Total tokens: {chunk.usage['total_tokens']}")
925 # Accumulate full response
926 full_text = ""
927 chunk_count = 0
929 async for chunk in llm.stream_complete("Explain quantum computing"):
930 full_text += chunk.delta
931 chunk_count += 1
933 print(f"Received {chunk_count} chunks")
934 print(f"Total length: {len(full_text)} characters")
936 # Stream with progress callback
937 async def stream_with_progress(prompt: str):
938 chunks = []
939 async for chunk in llm.stream_complete(prompt):
940 chunks.append(chunk)
941 # Update progress UI
942 if len(chunks) % 5 == 0:
943 print(f"Processing... ({len(chunks)} chunks)")
944 return "".join(c.delta for c in chunks)
946 result = await stream_with_progress("Write a tutorial")
947 ```
949 See Also:
950 complete: Non-streaming version
951 render_and_stream: Stream with prompt rendering
952 LLMStreamResponse: Chunk data structure
953 """
954 pass
956 @abstractmethod
957 async def embed(
958 self,
959 texts: Union[str, List[str]],
960 **kwargs
961 ) -> Union[List[float], List[List[float]]]:
962 """Generate embeddings asynchronously.
964 Converts text into dense vector representations for semantic search,
965 clustering, and similarity comparison. Returns high-dimensional
966 vectors (typically 768-1536 dimensions depending on model).
968 Args:
969 texts: Single text string or list of texts to embed
970 **kwargs: Provider-specific parameters:
971 - model (str): Embedding model override
972 - dimensions (int): Target dimensions (if supported)
974 Returns:
975 Single embedding vector (List[float]) if input is a string,
976 or list of vectors (List[List[float]]) if input is a list
978 Raises:
979 ValueError: If texts is empty or invalid
980 ConnectionError: If API connection fails
982 Example:
983 ```python
984 from dataknobs_llm import create_llm_provider
985 import numpy as np
987 # Create embedding provider
988 llm = create_llm_provider(
989 "openai",
990 model="text-embedding-ada-002"
991 )
993 # Single text embedding
994 embedding = await llm.embed("What is machine learning?")
995 print(f"Dimensions: {len(embedding)}")
996 # => Dimensions: 1536
998 # Batch embedding
999 texts = [
1000 "Python is a programming language",
1001 "JavaScript is used for web development",
1002 "Machine learning uses statistical methods"
1003 ]
1004 embeddings = await llm.embed(texts)
1005 print(f"Generated {len(embeddings)} embeddings")
1007 # Compute similarity
1008 def cosine_similarity(v1, v2):
1009 return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
1011 query_emb = await llm.embed("Tell me about ML")
1012 similarities = [
1013 cosine_similarity(query_emb, emb)
1014 for emb in embeddings
1015 ]
1016 most_similar_idx = np.argmax(similarities)
1017 print(f"Most similar: {texts[most_similar_idx]}")
1018 # => Most similar: Machine learning uses statistical methods
1020 # Store in vector database
1021 from dataknobs_data import database_factory
1022 db = database_factory.create("vector_db")
1023 for text, emb in zip(texts, embeddings):
1024 db.create({"text": text, "embedding": emb})
1025 ```
1027 See Also:
1028 complete: Text generation method
1029 """
1030 pass
1032 @abstractmethod
1033 async def function_call(
1034 self,
1035 messages: List[LLMMessage],
1036 functions: List[Dict[str, Any]],
1037 **kwargs
1038 ) -> LLMResponse:
1039 """Execute function calling asynchronously.
1041 Enables the LLM to call external functions/tools. The model decides
1042 which function to call based on the conversation context, and returns
1043 the function name and arguments in a structured format.
1045 Args:
1046 messages: Conversation messages leading up to the function call
1047 functions: List of function definitions in JSON Schema format.
1048 Each function dict must have:
1049 - name (str): Function name
1050 - description (str): What the function does
1051 - parameters (dict): JSON Schema for parameters
1052 **kwargs: Provider-specific parameters:
1053 - function_call (str|dict): 'auto', 'none', or specific function
1054 - temperature (float): Sampling temperature
1055 - max_tokens (int): Maximum response tokens
1057 Returns:
1058 LLMResponse with function_call field populated containing:
1059 - name (str): Function to call
1060 - arguments (str): JSON string of arguments
1062 Raises:
1063 ValueError: If functions format is invalid
1064 ConnectionError: If API connection fails
1066 Example:
1067 ```python
1068 from dataknobs_llm import create_llm_provider
1069 from dataknobs_llm.llm.base import LLMMessage
1070 import json
1072 llm = create_llm_provider("openai", model="gpt-4")
1074 # Define available functions
1075 functions = [
1076 {
1077 "name": "search_docs",
1078 "description": "Search documentation for information",
1079 "parameters": {
1080 "type": "object",
1081 "properties": {
1082 "query": {
1083 "type": "string",
1084 "description": "Search query"
1085 },
1086 "limit": {
1087 "type": "integer",
1088 "description": "Max results"
1089 }
1090 },
1091 "required": ["query"]
1092 }
1093 },
1094 {
1095 "name": "execute_code",
1096 "description": "Execute Python code",
1097 "parameters": {
1098 "type": "object",
1099 "properties": {
1100 "code": {"type": "string"}
1101 },
1102 "required": ["code"]
1103 }
1104 }
1105 ]
1107 # Ask question that requires function
1108 messages = [
1109 LLMMessage(
1110 role="user",
1111 content="Search for information about async/await in Python"
1112 )
1113 ]
1115 # Model decides to call function
1116 response = await llm.function_call(messages, functions)
1118 if response.function_call:
1119 func_name = response.function_call["name"]
1120 func_args = json.loads(response.function_call["arguments"])
1122 print(f"Function: {func_name}")
1123 print(f"Arguments: {func_args}")
1124 # => Function: search_docs
1125 # => Arguments: {'query': 'async/await Python', 'limit': 5}
1127 # Execute function
1128 results = search_docs(**func_args)
1130 # Add function result to conversation
1131 messages.append(LLMMessage(
1132 role="function",
1133 name=func_name,
1134 content=json.dumps(results)
1135 ))
1137 # Get final response
1138 final = await llm.complete(messages)
1139 print(final.content)
1140 ```
1142 See Also:
1143 complete: Standard completion without functions
1144 dataknobs_llm.tools: Tool abstraction framework
1145 """
1146 pass
1148 async def initialize(self) -> None:
1149 """Initialize the async LLM client."""
1150 self._is_initialized = True
1152 async def close(self) -> None:
1153 """Close the async LLM client."""
1154 self._is_initialized = False
1156 async def __aenter__(self):
1157 """Async context manager entry."""
1158 await self.initialize()
1159 return self
1161 async def __aexit__(self, exc_type, exc_val, exc_tb):
1162 """Async context manager exit."""
1163 await self.close()
1166class SyncLLMProvider(LLMProvider):
1167 """Synchronous LLM provider interface."""
1169 @abstractmethod
1170 def complete(
1171 self,
1172 messages: Union[str, List[LLMMessage]],
1173 **kwargs
1174 ) -> LLMResponse:
1175 """Generate completion synchronously.
1177 Args:
1178 messages: Input messages or prompt
1179 **kwargs: Additional parameters
1181 Returns:
1182 LLM response
1183 """
1184 pass
1186 def render_and_complete(
1187 self,
1188 prompt_name: str,
1189 params: Dict[str, Any] | None = None,
1190 prompt_type: str = "user",
1191 index: int = 0,
1192 include_rag: bool = True,
1193 **llm_kwargs
1194 ) -> LLMResponse:
1195 """Render prompt from library and execute LLM completion.
1197 This is a convenience method for one-off interactions that combines
1198 prompt rendering with LLM execution. For multi-turn conversations,
1199 use ConversationManager instead.
1201 Args:
1202 prompt_name: Name of prompt in library
1203 params: Parameters for template rendering
1204 prompt_type: Type of prompt ("system", "user", or "both")
1205 index: Prompt variant index (for user prompts)
1206 include_rag: Whether to execute RAG searches
1207 **llm_kwargs: Additional arguments passed to complete()
1209 Returns:
1210 LLM response
1212 Raises:
1213 ValueError: If prompt_builder not configured or invalid prompt_type
1214 TypeError: If prompt_builder is not PromptBuilder
1216 Example:
1217 >>> llm = SyncOpenAIProvider(config, prompt_builder=builder)
1218 >>> result = llm.render_and_complete(
1219 ... "analyze_code",
1220 ... params={"code": code, "language": "python"}
1221 ... )
1222 """
1223 # Validate
1224 from dataknobs_llm.prompts import PromptBuilder
1225 self._validate_prompt_builder(PromptBuilder)
1226 self._validate_render_params(prompt_type)
1228 # Render messages
1229 messages = self._render_messages(
1230 prompt_name, params, prompt_type, index, include_rag
1231 )
1233 # Execute LLM
1234 return self.complete(messages, **llm_kwargs)
1236 def render_and_stream(
1237 self,
1238 prompt_name: str,
1239 params: Dict[str, Any] | None = None,
1240 prompt_type: str = "user",
1241 index: int = 0,
1242 include_rag: bool = True,
1243 **llm_kwargs
1244 ) -> Iterator[LLMStreamResponse]:
1245 """Render prompt and stream LLM response.
1247 Same as render_and_complete() but returns streaming response.
1249 Args:
1250 prompt_name: Name of prompt in library
1251 params: Parameters for template rendering
1252 prompt_type: Type of prompt ("system", "user", or "both")
1253 index: Prompt variant index
1254 include_rag: Whether to execute RAG searches
1255 **llm_kwargs: Additional arguments passed to stream_complete()
1257 Yields:
1258 Streaming response chunks
1260 Raises:
1261 ValueError: If prompt_builder not configured or invalid prompt_type
1262 TypeError: If prompt_builder is not PromptBuilder
1264 Example:
1265 >>> for chunk in llm.render_and_stream("analyze_code", params={"code": code}):
1266 ... print(chunk.delta, end="")
1267 """
1268 # Validate
1269 from dataknobs_llm.prompts import PromptBuilder
1270 self._validate_prompt_builder(PromptBuilder)
1271 self._validate_render_params(prompt_type)
1273 # Render messages
1274 messages = self._render_messages(
1275 prompt_name, params, prompt_type, index, include_rag
1276 )
1278 # Stream LLM response
1279 for chunk in self.stream_complete(messages, **llm_kwargs):
1280 yield chunk
1282 def _render_messages(
1283 self,
1284 prompt_name: str,
1285 params: Dict[str, Any] | None,
1286 prompt_type: str,
1287 index: int,
1288 include_rag: bool
1289 ) -> List[LLMMessage]:
1290 """Render messages from prompt library (sync version).
1292 Args:
1293 prompt_name: Name of prompt in library
1294 params: Parameters for template rendering
1295 prompt_type: Type of prompt ("system", "user", or "both")
1296 index: Prompt variant index
1297 include_rag: Whether to execute RAG searches
1299 Returns:
1300 List of rendered LLM messages
1301 """
1302 from dataknobs_llm.prompts import PromptBuilder
1303 builder: PromptBuilder = self.prompt_builder # type: ignore
1305 messages: List[LLMMessage] = []
1306 params = params or {}
1308 if prompt_type in ("system", "both"):
1309 result = builder.render_system_prompt(
1310 prompt_name, params=params, include_rag=include_rag
1311 )
1312 messages.append(LLMMessage(role="system", content=result.content))
1314 if prompt_type in ("user", "both"):
1315 result = builder.render_user_prompt(
1316 prompt_name, index=index, params=params, include_rag=include_rag
1317 )
1318 messages.append(LLMMessage(role="user", content=result.content))
1320 return messages
1322 @abstractmethod
1323 def stream_complete(
1324 self,
1325 messages: Union[str, List[LLMMessage]],
1326 **kwargs
1327 ) -> Iterator[LLMStreamResponse]:
1328 """Generate streaming completion synchronously.
1330 Args:
1331 messages: Input messages or prompt
1332 **kwargs: Additional parameters
1334 Yields:
1335 Streaming response chunks
1336 """
1337 pass
1339 @abstractmethod
1340 def embed(
1341 self,
1342 texts: Union[str, List[str]],
1343 **kwargs
1344 ) -> Union[List[float], List[List[float]]]:
1345 """Generate embeddings synchronously.
1347 Args:
1348 texts: Input text(s)
1349 **kwargs: Additional parameters
1351 Returns:
1352 Embedding vector(s)
1353 """
1354 pass
1356 @abstractmethod
1357 def function_call(
1358 self,
1359 messages: List[LLMMessage],
1360 functions: List[Dict[str, Any]],
1361 **kwargs
1362 ) -> LLMResponse:
1363 """Execute function calling synchronously.
1365 Args:
1366 messages: Conversation messages
1367 functions: Available functions
1368 **kwargs: Additional parameters
1370 Returns:
1371 Response with function call
1372 """
1373 pass
1375 def initialize(self) -> None:
1376 """Initialize the sync LLM client."""
1377 self._is_initialized = True
1379 def close(self) -> None:
1380 """Close the sync LLM client."""
1381 self._is_initialized = False
1384class LLMAdapter(ABC):
1385 """Base adapter for converting between different LLM formats.
1387 Adapters translate between the standard dataknobs LLM format
1388 (LLMMessage, LLMResponse, LLMConfig) and provider-specific formats
1389 (OpenAI, Anthropic, etc.). Each provider implementation should
1390 have a corresponding adapter.
1392 This enables provider-agnostic code that works across different
1393 LLM APIs without modification.
1395 Example:
1396 ```python
1397 from dataknobs_llm.llm.base import LLMAdapter, LLMMessage, LLMResponse
1398 from typing import Any, List, Dict
1400 class MyProviderAdapter(LLMAdapter):
1401 \"\"\"Adapter for custom LLM provider.\"\"\"
1403 def adapt_messages(
1404 self,
1405 messages: List[LLMMessage]
1406 ) -> List[Dict[str, str]]:
1407 \"\"\"Convert to provider format.\"\"\"
1408 return [
1409 {"role": msg.role, "content": msg.content}
1410 for msg in messages
1411 ]
1413 def adapt_response(
1414 self,
1415 response: Any
1416 ) -> LLMResponse:
1417 \"\"\"Convert from provider format.\"\"\"
1418 return LLMResponse(
1419 content=response["text"],
1420 model=response["model_id"],
1421 usage={
1422 "total_tokens": response["tokens_used"]
1423 }
1424 )
1426 def adapt_config(
1427 self,
1428 config: LLMConfig
1429 ) -> Dict[str, Any]:
1430 \"\"\"Convert config to provider format.\"\"\"
1431 return {
1432 "model_name": config.model,
1433 "temp": config.temperature,
1434 "max_length": config.max_tokens
1435 }
1437 # Use adapter in provider
1438 adapter = MyProviderAdapter()
1439 provider_messages = adapter.adapt_messages(messages)
1440 ```
1442 See Also:
1443 LLMProvider: Base provider interface
1444 dataknobs_llm.llm.providers.OpenAIAdapter: Example implementation
1445 """
1447 @abstractmethod
1448 def adapt_messages(
1449 self,
1450 messages: List[LLMMessage]
1451 ) -> Any:
1452 """Adapt messages to provider format.
1454 Args:
1455 messages: Standard LLMMessage list
1457 Returns:
1458 Provider-specific message format
1459 """
1460 pass
1462 @abstractmethod
1463 def adapt_response(
1464 self,
1465 response: Any
1466 ) -> LLMResponse:
1467 """Adapt provider response to standard format.
1469 Args:
1470 response: Provider-specific response object
1472 Returns:
1473 Standard LLMResponse
1474 """
1475 pass
1477 @abstractmethod
1478 def adapt_config(
1479 self,
1480 config: LLMConfig
1481 ) -> Dict[str, Any]:
1482 """Adapt configuration to provider format.
1484 Args:
1485 config: Standard LLMConfig
1487 Returns:
1488 Provider-specific config dict
1489 """
1490 pass
1493class LLMMiddleware(Protocol):
1494 """Protocol for LLM middleware.
1496 Middleware provides hooks to transform requests before they're sent
1497 to the LLM and responses before they're returned to the caller.
1498 Useful for logging, caching, content filtering, rate limiting, etc.
1500 Middleware can accept configuration as LLMConfig, dataknobs Config, or dict.
1502 Example:
1503 ```python
1504 from dataknobs_llm.llm.base import (
1505 LLMMiddleware, LLMMessage, LLMResponse, LLMConfig
1506 )
1507 from typing import List, Union, Dict, Any
1508 import logging
1510 class LoggingMiddleware:
1511 \"\"\"Logs all LLM requests and responses.\"\"\"
1513 def __init__(self):
1514 self.logger = logging.getLogger(__name__)
1516 async def process_request(
1517 self,
1518 messages: List[LLMMessage],
1519 config: Union[LLMConfig, Config, Dict[str, Any]]
1520 ) -> List[LLMMessage]:
1521 \"\"\"Log request before sending.\"\"\"
1522 self.logger.info(f"Request: {len(messages)} messages")
1523 for msg in messages:
1524 self.logger.debug(f" {msg.role}: {msg.content[:50]}...")
1525 return messages
1527 async def process_response(
1528 self,
1529 response: LLMResponse,
1530 config: Union[LLMConfig, Config, Dict[str, Any]]
1531 ) -> LLMResponse:
1532 \"\"\"Log response after receiving.\"\"\"
1533 self.logger.info(f"Response: {len(response.content)} chars")
1534 self.logger.info(f"Tokens: {response.usage['total_tokens']}")
1535 if response.cost_usd:
1536 self.logger.info(f"Cost: ${response.cost_usd:.4f}")
1537 return response
1540 class ContentFilterMiddleware:
1541 \"\"\"Filters sensitive content.\"\"\"
1543 def __init__(self, blocked_words: List[str]):
1544 self.blocked_words = blocked_words
1546 async def process_request(
1547 self,
1548 messages: List[LLMMessage],
1549 config: Union[LLMConfig, Config, Dict[str, Any]]
1550 ) -> List[LLMMessage]:
1551 \"\"\"Filter input messages.\"\"\"
1552 filtered = []
1553 for msg in messages:
1554 content = msg.content
1555 for word in self.blocked_words:
1556 content = content.replace(word, "***")
1557 filtered.append(LLMMessage(
1558 role=msg.role,
1559 content=content,
1560 name=msg.name,
1561 function_call=msg.function_call,
1562 metadata=msg.metadata
1563 ))
1564 return filtered
1566 async def process_response(
1567 self,
1568 response: LLMResponse,
1569 config: Union[LLMConfig, Config, Dict[str, Any]]
1570 ) -> LLMResponse:
1571 \"\"\"Filter output.\"\"\"
1572 content = response.content
1573 for word in self.blocked_words:
1574 content = content.replace(word, "***")
1576 from dataclasses import replace
1577 return replace(response, content=content)
1580 # Use with ConversationManager
1581 from dataknobs_llm.conversations import ConversationManager
1583 manager = await ConversationManager.create(
1584 llm=llm,
1585 prompt_builder=builder,
1586 middleware=[
1587 LoggingMiddleware(),
1588 ContentFilterMiddleware(["password", "secret"])
1589 ]
1590 )
1591 ```
1593 See Also:
1594 ConversationManager: Uses middleware for request/response processing
1595 """
1597 async def process_request(
1598 self,
1599 messages: List[LLMMessage],
1600 config: Union[LLMConfig, Config, Dict[str, Any]]
1601 ) -> List[LLMMessage]:
1602 """Process request before sending to LLM.
1604 Transform, log, validate, or filter messages before they are
1605 sent to the LLM provider.
1607 Args:
1608 messages: Input messages to be sent to LLM
1609 config: Configuration (LLMConfig, Config, or dict)
1611 Returns:
1612 Processed messages (can be modified, added to, or filtered)
1614 Raises:
1615 ValueError: If messages are invalid
1616 """
1617 ...
1619 async def process_response(
1620 self,
1621 response: LLMResponse,
1622 config: Union[LLMConfig, Config, Dict[str, Any]]
1623 ) -> LLMResponse:
1624 """Process response from LLM.
1626 Transform, log, validate, or filter the LLM response before
1627 returning to the caller.
1629 Args:
1630 response: LLM response to process
1631 config: Configuration (LLMConfig, Config, or dict)
1633 Returns:
1634 Processed response (can be modified)
1636 Raises:
1637 ValueError: If response is invalid
1638 """
1639 ...