Coverage for src/dataknobs_llm/llm/base.py: 50%
200 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:04 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 16:04 -0600
1"""Base LLM abstraction components.
3This module provides the base abstractions for unified LLM operations.
4"""
6from abc import ABC, abstractmethod
7from dataclasses import dataclass, field
8from enum import Enum
9from typing import (
10 Any, Dict, List, Union, AsyncIterator, Iterator,
11 Callable, Protocol, Optional
12)
13from datetime import datetime
15# Import prompt builder types - clean one-way dependency (llm depends on prompts)
16from dataknobs_llm.prompts import AsyncPromptBuilder, PromptBuilder
19class CompletionMode(Enum):
20 """LLM completion modes."""
21 CHAT = "chat" # Chat completion with message history
22 TEXT = "text" # Text completion
23 INSTRUCT = "instruct" # Instruction following
24 EMBEDDING = "embedding" # Generate embeddings
25 FUNCTION = "function" # Function calling
28class ModelCapability(Enum):
29 """Model capabilities."""
30 TEXT_GENERATION = "text_generation"
31 CHAT = "chat"
32 EMBEDDINGS = "embeddings"
33 FUNCTION_CALLING = "function_calling"
34 VISION = "vision"
35 CODE = "code"
36 JSON_MODE = "json_mode"
37 STREAMING = "streaming"
40@dataclass
41class LLMMessage:
42 """Represents a message in LLM conversation."""
43 role: str # 'system', 'user', 'assistant', 'function'
44 content: str
45 name: str | None = None # For function messages
46 function_call: Dict[str, Any] | None = None # For function calling
47 metadata: Dict[str, Any] = field(default_factory=dict)
50@dataclass
51class LLMResponse:
52 """Response from LLM."""
53 content: str
54 model: str
55 finish_reason: str | None = None # 'stop', 'length', 'function_call'
56 usage: Dict[str, int] | None = None # tokens used
57 function_call: Dict[str, Any] | None = None
58 metadata: Dict[str, Any] = field(default_factory=dict)
59 created_at: datetime = field(default_factory=datetime.now)
62@dataclass
63class LLMStreamResponse:
64 """Streaming response from LLM."""
65 delta: str # Incremental content
66 is_final: bool = False
67 finish_reason: str | None = None
68 usage: Dict[str, int] | None = None
69 metadata: Dict[str, Any] = field(default_factory=dict)
72@dataclass
73class LLMConfig:
74 """Configuration for LLM operations.
76 This configuration class works with both direct instantiation and
77 dataknobs Config objects. It can be created from dictionaries that
78 include optional 'type' and 'factory' attributes used by Config.
79 """
80 provider: str # 'openai', 'anthropic', 'ollama', etc.
81 model: str # Model name/identifier
82 api_key: str | None = None
83 api_base: str | None = None # Custom API endpoint
85 # Generation parameters
86 temperature: float = 0.7
87 max_tokens: int | None = None
88 top_p: float = 1.0
89 frequency_penalty: float = 0.0
90 presence_penalty: float = 0.0
91 stop_sequences: List[str] | None = None
93 # Mode settings
94 mode: CompletionMode = CompletionMode.CHAT
95 system_prompt: str | None = None
96 response_format: str | None = None # 'text' or 'json'
98 # Function calling
99 functions: List[Dict[str, Any]] | None = None
100 function_call: Union[str, Dict[str, str]] | None = None # 'auto', 'none', or specific function
102 # Streaming
103 stream: bool = False
104 stream_callback: Callable[[LLMStreamResponse], None] | None = None
106 # Rate limiting
107 rate_limit: int | None = None # Requests per minute
108 retry_count: int = 3
109 retry_delay: float = 1.0
110 timeout: float = 60.0
112 # Advanced settings
113 seed: int | None = None # For reproducibility
114 logit_bias: Dict[str, float] | None = None
115 user_id: str | None = None
117 # Provider-specific options
118 options: Dict[str, Any] = field(default_factory=dict)
120 @classmethod
121 def from_dict(cls, config_dict: Dict[str, Any]) -> "LLMConfig":
122 """Create LLMConfig from a dictionary.
124 This method handles dictionaries from dataknobs Config objects,
125 which may include 'type', 'name', and 'factory' attributes.
126 These attributes are ignored during LLMConfig construction.
128 Args:
129 config_dict: Configuration dictionary
131 Returns:
132 LLMConfig instance
133 """
134 # Filter out Config-specific attributes
135 config_data = {
136 k: v for k, v in config_dict.items()
137 if k not in ('type', 'name', 'factory')
138 }
140 # Handle mode conversion if it's a string
141 if 'mode' in config_data and isinstance(config_data['mode'], str):
142 config_data['mode'] = CompletionMode(config_data['mode'])
144 # Get dataclass fields to filter unknown attributes
145 valid_fields = {f.name for f in cls.__dataclass_fields__.values()}
146 filtered_data = {k: v for k, v in config_data.items() if k in valid_fields}
148 return cls(**filtered_data)
150 def to_dict(self, include_config_attrs: bool = False) -> Dict[str, Any]:
151 """Convert LLMConfig to a dictionary.
153 Args:
154 include_config_attrs: If True, includes 'type' attribute for Config compatibility
156 Returns:
157 Configuration dictionary
158 """
159 result = {}
161 for field_info in self.__dataclass_fields__.values():
162 value = getattr(self, field_info.name)
164 # Handle enum conversion
165 if isinstance(value, Enum):
166 result[field_info.name] = value.value
167 # Skip None values for optional fields
168 elif value is not None:
169 result[field_info.name] = value
170 # Include default factories even if empty for certain fields
171 elif field_info.name == 'options':
172 result[field_info.name] = {}
174 # Optionally add Config-compatible type attribute
175 if include_config_attrs:
176 result['type'] = 'llm'
178 return result
181def normalize_llm_config(config: Union["LLMConfig", "Config", Dict[str, Any]]) -> "LLMConfig":
182 """Normalize various config formats to LLMConfig.
184 This helper function accepts LLMConfig instances, dataknobs Config objects,
185 or plain dictionaries and returns a standardized LLMConfig instance.
187 Args:
188 config: Configuration as LLMConfig, Config object, or dictionary
190 Returns:
191 LLMConfig instance
193 Raises:
194 TypeError: If config type is not supported
195 """
196 # Already an LLMConfig instance
197 if isinstance(config, LLMConfig):
198 return config
200 # Dictionary (possibly from Config.get())
201 if isinstance(config, dict):
202 return LLMConfig.from_dict(config)
204 # dataknobs Config object - try to get the config dict
205 # We check for the get method to identify Config objects
206 if hasattr(config, 'get') and hasattr(config, 'get_types'):
207 # It's a Config object, extract the llm configuration
208 # Try to get first llm config, or fall back to first available type
209 try:
210 config_dict = config.get('llm', 0)
211 except Exception:
212 # If no 'llm' type, try to get first available config of any type
213 types = config.get_types()
214 if types:
215 config_dict = config.get(types[0], 0)
216 else:
217 raise ValueError("Config object has no configurations")
219 return LLMConfig.from_dict(config_dict)
221 raise TypeError(
222 f"Unsupported config type: {type(config).__name__}. "
223 f"Expected LLMConfig, Config, or dict."
224 )
227class LLMProvider(ABC):
228 """Base LLM provider interface."""
230 def __init__(
231 self,
232 config: Union[LLMConfig, "Config", Dict[str, Any]],
233 prompt_builder: Optional[Union[PromptBuilder, AsyncPromptBuilder]] = None
234 ):
235 """Initialize provider with configuration.
237 Args:
238 config: Configuration as LLMConfig, dataknobs Config object, or dict
239 prompt_builder: Optional prompt builder for integrated prompting
240 """
241 self.config = normalize_llm_config(config)
242 self.prompt_builder = prompt_builder
243 self._client = None
244 self._is_initialized = False
246 def _validate_prompt_builder(self, expected_type: type) -> None:
247 """Validate that prompt builder is configured and of correct type.
249 Args:
250 expected_type: Expected builder type (PromptBuilder or AsyncPromptBuilder)
252 Raises:
253 ValueError: If prompt_builder not configured
254 TypeError: If prompt_builder is wrong type
255 """
256 if not self.prompt_builder:
257 raise ValueError(
258 "No prompt_builder configured. Pass prompt_builder to __init__() "
259 "or use complete() directly with pre-rendered messages."
260 )
262 if not isinstance(self.prompt_builder, expected_type):
263 raise TypeError(
264 f"{self.__class__.__name__} requires {expected_type.__name__}, "
265 f"got {type(self.prompt_builder).__name__}"
266 )
268 def _validate_render_params(
269 self,
270 prompt_type: str
271 ) -> None:
272 """Validate render parameters.
274 Args:
275 prompt_type: Type of prompt to render
277 Raises:
278 ValueError: If prompt_type is invalid
279 """
280 if prompt_type not in ("system", "user", "both"):
281 raise ValueError(
282 f"Invalid prompt_type: {prompt_type}. "
283 f"Must be 'system', 'user', or 'both'"
284 )
286 @abstractmethod
287 def initialize(self) -> None:
288 """Initialize the LLM client."""
289 pass
291 @abstractmethod
292 def close(self) -> None:
293 """Close the LLM client."""
294 pass
296 @abstractmethod
297 def validate_model(self) -> bool:
298 """Validate that the model is available."""
299 pass
301 @abstractmethod
302 def get_capabilities(self) -> List[ModelCapability]:
303 """Get model capabilities."""
304 pass
306 @property
307 def is_initialized(self) -> bool:
308 """Check if provider is initialized."""
309 return self._is_initialized
311 def __enter__(self):
312 """Context manager entry."""
313 self.initialize()
314 return self
316 def __exit__(self, exc_type, exc_val, exc_tb):
317 """Context manager exit."""
318 self.close()
321class AsyncLLMProvider(LLMProvider):
322 """Async LLM provider interface."""
324 @abstractmethod
325 async def complete(
326 self,
327 messages: Union[str, List[LLMMessage]],
328 **kwargs
329 ) -> LLMResponse:
330 """Generate completion asynchronously.
332 Args:
333 messages: Input messages or prompt
334 **kwargs: Additional parameters
336 Returns:
337 LLM response
338 """
339 pass
341 async def render_and_complete(
342 self,
343 prompt_name: str,
344 params: Optional[Dict[str, Any]] = None,
345 prompt_type: str = "user",
346 index: int = 0,
347 include_rag: bool = True,
348 **llm_kwargs
349 ) -> LLMResponse:
350 """Render prompt from library and execute LLM completion.
352 This is a convenience method for one-off interactions that combines
353 prompt rendering with LLM execution. For multi-turn conversations,
354 use ConversationManager instead.
356 Args:
357 prompt_name: Name of prompt in library
358 params: Parameters for template rendering
359 prompt_type: Type of prompt ("system", "user", or "both")
360 index: Prompt variant index (for user prompts)
361 include_rag: Whether to execute RAG searches
362 **llm_kwargs: Additional arguments passed to complete()
364 Returns:
365 LLM response
367 Raises:
368 ValueError: If prompt_builder not configured or invalid prompt_type
369 TypeError: If prompt_builder is not AsyncPromptBuilder
371 Example:
372 >>> llm = OpenAIProvider(config, prompt_builder=builder)
373 >>> result = await llm.render_and_complete(
374 ... "analyze_code",
375 ... params={"code": code, "language": "python"}
376 ... )
377 """
378 # Validate
379 from dataknobs_llm.prompts import AsyncPromptBuilder
380 self._validate_prompt_builder(AsyncPromptBuilder)
381 self._validate_render_params(prompt_type)
383 # Render messages
384 messages = await self._render_messages(
385 prompt_name, params, prompt_type, index, include_rag
386 )
388 # Execute LLM
389 return await self.complete(messages, **llm_kwargs)
391 async def render_and_stream(
392 self,
393 prompt_name: str,
394 params: Optional[Dict[str, Any]] = None,
395 prompt_type: str = "user",
396 index: int = 0,
397 include_rag: bool = True,
398 **llm_kwargs
399 ) -> AsyncIterator[LLMStreamResponse]:
400 """Render prompt and stream LLM response.
402 Same as render_and_complete() but returns streaming response.
404 Args:
405 prompt_name: Name of prompt in library
406 params: Parameters for template rendering
407 prompt_type: Type of prompt ("system", "user", or "both")
408 index: Prompt variant index
409 include_rag: Whether to execute RAG searches
410 **llm_kwargs: Additional arguments passed to stream_complete()
412 Yields:
413 Streaming response chunks
415 Raises:
416 ValueError: If prompt_builder not configured or invalid prompt_type
417 TypeError: If prompt_builder is not AsyncPromptBuilder
419 Example:
420 >>> async for chunk in llm.render_and_stream("analyze_code", params={"code": code}):
421 ... print(chunk.delta, end="")
422 """
423 # Validate
424 from dataknobs_llm.prompts import AsyncPromptBuilder
425 self._validate_prompt_builder(AsyncPromptBuilder)
426 self._validate_render_params(prompt_type)
428 # Render messages
429 messages = await self._render_messages(
430 prompt_name, params, prompt_type, index, include_rag
431 )
433 # Stream LLM response
434 async for chunk in self.stream_complete(messages, **llm_kwargs):
435 yield chunk
437 async def _render_messages(
438 self,
439 prompt_name: str,
440 params: Optional[Dict[str, Any]],
441 prompt_type: str,
442 index: int,
443 include_rag: bool
444 ) -> List[LLMMessage]:
445 """Render messages from prompt library (async version).
447 Args:
448 prompt_name: Name of prompt in library
449 params: Parameters for template rendering
450 prompt_type: Type of prompt ("system", "user", or "both")
451 index: Prompt variant index
452 include_rag: Whether to execute RAG searches
454 Returns:
455 List of rendered LLM messages
456 """
457 from dataknobs_llm.prompts import AsyncPromptBuilder
458 builder: AsyncPromptBuilder = self.prompt_builder # type: ignore
460 messages: List[LLMMessage] = []
461 params = params or {}
463 if prompt_type in ("system", "both"):
464 result = await builder.render_system_prompt(
465 prompt_name, params=params, include_rag=include_rag
466 )
467 messages.append(LLMMessage(role="system", content=result.content))
469 if prompt_type in ("user", "both"):
470 result = await builder.render_user_prompt(
471 prompt_name, index=index, params=params, include_rag=include_rag
472 )
473 messages.append(LLMMessage(role="user", content=result.content))
475 return messages
477 @abstractmethod
478 async def stream_complete(
479 self,
480 messages: Union[str, List[LLMMessage]],
481 **kwargs
482 ) -> AsyncIterator[LLMStreamResponse]:
483 """Generate streaming completion asynchronously.
485 Args:
486 messages: Input messages or prompt
487 **kwargs: Additional parameters
489 Yields:
490 Streaming response chunks
491 """
492 pass
494 @abstractmethod
495 async def embed(
496 self,
497 texts: Union[str, List[str]],
498 **kwargs
499 ) -> Union[List[float], List[List[float]]]:
500 """Generate embeddings asynchronously.
502 Args:
503 texts: Input text(s)
504 **kwargs: Additional parameters
506 Returns:
507 Embedding vector(s)
508 """
509 pass
511 @abstractmethod
512 async def function_call(
513 self,
514 messages: List[LLMMessage],
515 functions: List[Dict[str, Any]],
516 **kwargs
517 ) -> LLMResponse:
518 """Execute function calling asynchronously.
520 Args:
521 messages: Conversation messages
522 functions: Available functions
523 **kwargs: Additional parameters
525 Returns:
526 Response with function call
527 """
528 pass
530 async def initialize(self) -> None:
531 """Initialize the async LLM client."""
532 self._is_initialized = True
534 async def close(self) -> None:
535 """Close the async LLM client."""
536 self._is_initialized = False
538 async def __aenter__(self):
539 """Async context manager entry."""
540 await self.initialize()
541 return self
543 async def __aexit__(self, exc_type, exc_val, exc_tb):
544 """Async context manager exit."""
545 await self.close()
548class SyncLLMProvider(LLMProvider):
549 """Synchronous LLM provider interface."""
551 @abstractmethod
552 def complete(
553 self,
554 messages: Union[str, List[LLMMessage]],
555 **kwargs
556 ) -> LLMResponse:
557 """Generate completion synchronously.
559 Args:
560 messages: Input messages or prompt
561 **kwargs: Additional parameters
563 Returns:
564 LLM response
565 """
566 pass
568 def render_and_complete(
569 self,
570 prompt_name: str,
571 params: Optional[Dict[str, Any]] = None,
572 prompt_type: str = "user",
573 index: int = 0,
574 include_rag: bool = True,
575 **llm_kwargs
576 ) -> LLMResponse:
577 """Render prompt from library and execute LLM completion.
579 This is a convenience method for one-off interactions that combines
580 prompt rendering with LLM execution. For multi-turn conversations,
581 use ConversationManager instead.
583 Args:
584 prompt_name: Name of prompt in library
585 params: Parameters for template rendering
586 prompt_type: Type of prompt ("system", "user", or "both")
587 index: Prompt variant index (for user prompts)
588 include_rag: Whether to execute RAG searches
589 **llm_kwargs: Additional arguments passed to complete()
591 Returns:
592 LLM response
594 Raises:
595 ValueError: If prompt_builder not configured or invalid prompt_type
596 TypeError: If prompt_builder is not PromptBuilder
598 Example:
599 >>> llm = SyncOpenAIProvider(config, prompt_builder=builder)
600 >>> result = llm.render_and_complete(
601 ... "analyze_code",
602 ... params={"code": code, "language": "python"}
603 ... )
604 """
605 # Validate
606 from dataknobs_llm.prompts import PromptBuilder
607 self._validate_prompt_builder(PromptBuilder)
608 self._validate_render_params(prompt_type)
610 # Render messages
611 messages = self._render_messages(
612 prompt_name, params, prompt_type, index, include_rag
613 )
615 # Execute LLM
616 return self.complete(messages, **llm_kwargs)
618 def render_and_stream(
619 self,
620 prompt_name: str,
621 params: Optional[Dict[str, Any]] = None,
622 prompt_type: str = "user",
623 index: int = 0,
624 include_rag: bool = True,
625 **llm_kwargs
626 ) -> Iterator[LLMStreamResponse]:
627 """Render prompt and stream LLM response.
629 Same as render_and_complete() but returns streaming response.
631 Args:
632 prompt_name: Name of prompt in library
633 params: Parameters for template rendering
634 prompt_type: Type of prompt ("system", "user", or "both")
635 index: Prompt variant index
636 include_rag: Whether to execute RAG searches
637 **llm_kwargs: Additional arguments passed to stream_complete()
639 Yields:
640 Streaming response chunks
642 Raises:
643 ValueError: If prompt_builder not configured or invalid prompt_type
644 TypeError: If prompt_builder is not PromptBuilder
646 Example:
647 >>> for chunk in llm.render_and_stream("analyze_code", params={"code": code}):
648 ... print(chunk.delta, end="")
649 """
650 # Validate
651 from dataknobs_llm.prompts import PromptBuilder
652 self._validate_prompt_builder(PromptBuilder)
653 self._validate_render_params(prompt_type)
655 # Render messages
656 messages = self._render_messages(
657 prompt_name, params, prompt_type, index, include_rag
658 )
660 # Stream LLM response
661 for chunk in self.stream_complete(messages, **llm_kwargs):
662 yield chunk
664 def _render_messages(
665 self,
666 prompt_name: str,
667 params: Optional[Dict[str, Any]],
668 prompt_type: str,
669 index: int,
670 include_rag: bool
671 ) -> List[LLMMessage]:
672 """Render messages from prompt library (sync version).
674 Args:
675 prompt_name: Name of prompt in library
676 params: Parameters for template rendering
677 prompt_type: Type of prompt ("system", "user", or "both")
678 index: Prompt variant index
679 include_rag: Whether to execute RAG searches
681 Returns:
682 List of rendered LLM messages
683 """
684 from dataknobs_llm.prompts import PromptBuilder
685 builder: PromptBuilder = self.prompt_builder # type: ignore
687 messages: List[LLMMessage] = []
688 params = params or {}
690 if prompt_type in ("system", "both"):
691 result = builder.render_system_prompt(
692 prompt_name, params=params, include_rag=include_rag
693 )
694 messages.append(LLMMessage(role="system", content=result.content))
696 if prompt_type in ("user", "both"):
697 result = builder.render_user_prompt(
698 prompt_name, index=index, params=params, include_rag=include_rag
699 )
700 messages.append(LLMMessage(role="user", content=result.content))
702 return messages
704 @abstractmethod
705 def stream_complete(
706 self,
707 messages: Union[str, List[LLMMessage]],
708 **kwargs
709 ) -> Iterator[LLMStreamResponse]:
710 """Generate streaming completion synchronously.
712 Args:
713 messages: Input messages or prompt
714 **kwargs: Additional parameters
716 Yields:
717 Streaming response chunks
718 """
719 pass
721 @abstractmethod
722 def embed(
723 self,
724 texts: Union[str, List[str]],
725 **kwargs
726 ) -> Union[List[float], List[List[float]]]:
727 """Generate embeddings synchronously.
729 Args:
730 texts: Input text(s)
731 **kwargs: Additional parameters
733 Returns:
734 Embedding vector(s)
735 """
736 pass
738 @abstractmethod
739 def function_call(
740 self,
741 messages: List[LLMMessage],
742 functions: List[Dict[str, Any]],
743 **kwargs
744 ) -> LLMResponse:
745 """Execute function calling synchronously.
747 Args:
748 messages: Conversation messages
749 functions: Available functions
750 **kwargs: Additional parameters
752 Returns:
753 Response with function call
754 """
755 pass
757 def initialize(self) -> None:
758 """Initialize the sync LLM client."""
759 self._is_initialized = True
761 def close(self) -> None:
762 """Close the sync LLM client."""
763 self._is_initialized = False
766class LLMAdapter(ABC):
767 """Base adapter for converting between different LLM formats."""
769 @abstractmethod
770 def adapt_messages(
771 self,
772 messages: List[LLMMessage]
773 ) -> Any:
774 """Adapt messages to provider format."""
775 pass
777 @abstractmethod
778 def adapt_response(
779 self,
780 response: Any
781 ) -> LLMResponse:
782 """Adapt provider response to standard format."""
783 pass
785 @abstractmethod
786 def adapt_config(
787 self,
788 config: LLMConfig
789 ) -> Dict[str, Any]:
790 """Adapt configuration to provider format."""
791 pass
794class LLMMiddleware(Protocol):
795 """Protocol for LLM middleware.
797 Middleware can accept configuration as LLMConfig, dataknobs Config, or dict.
798 """
800 async def process_request(
801 self,
802 messages: List[LLMMessage],
803 config: Union[LLMConfig, "Config", Dict[str, Any]]
804 ) -> List[LLMMessage]:
805 """Process request before sending to LLM.
807 Args:
808 messages: Input messages
809 config: Configuration (LLMConfig, Config, or dict)
811 Returns:
812 Processed messages
813 """
814 ...
816 async def process_response(
817 self,
818 response: LLMResponse,
819 config: Union[LLMConfig, "Config", Dict[str, Any]]
820 ) -> LLMResponse:
821 """Process response from LLM.
823 Args:
824 response: LLM response
825 config: Configuration (LLMConfig, Config, or dict)
827 Returns:
828 Processed response
829 """
830 ...