Coverage for src/dataknobs_llm/llm/providers/ollama.py: 13%
189 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Ollama local LLM provider implementation.
3This module provides Ollama integration for dataknobs-llm, enabling local LLM
4deployment and usage without cloud APIs. Perfect for privacy-sensitive applications,
5offline usage, and cost reduction.
7Supports:
8- All Ollama models (Llama, Mistral, CodeLlama, Phi, etc.)
9- Chat with message history
10- Streaming responses
11- Embeddings for semantic search
12- Tool/function calling (Ollama 0.1.17+)
13- Vision models with image inputs
14- Custom model parameters (temperature, top_p, seed, etc.)
15- Docker environment auto-detection
16- Multi-modal capabilities
18The OllamaProvider automatically detects Docker environments and adjusts
19connection URLs accordingly.
21Example:
22 ```python
23 from dataknobs_llm.llm.providers import OllamaProvider
24 from dataknobs_llm.llm.base import LLMConfig
26 # Basic usage (assumes Ollama running on localhost:11434)
27 config = LLMConfig(
28 provider="ollama",
29 model="llama2",
30 temperature=0.7
31 )
33 async with OllamaProvider(config) as llm:
34 # Simple completion
35 response = await llm.complete("Explain Python generators")
36 print(response.content)
38 # Streaming
39 async for chunk in llm.stream_complete("Write a poem"):
40 print(chunk.delta, end="", flush=True)
42 # Custom Ollama URL (remote or Docker)
43 remote_config = LLMConfig(
44 provider="ollama",
45 model="codellama",
46 api_base="http://my-ollama-server:11434"
47 )
49 # Generate embeddings
50 embed_config = LLMConfig(
51 provider="ollama",
52 model="nomic-embed-text"
53 )
55 llm = OllamaProvider(embed_config)
56 await llm.initialize()
57 embeddings = await llm.embed([
58 "Python is great",
59 "JavaScript is versatile"
60 ])
62 # Vision model with images
63 vision_messages = [
64 LLMMessage(
65 role="user",
66 content="What's in this image?",
67 metadata={"images": ["base64encodedimage..."]}
68 )
69 ]
71 vision_config = LLMConfig(provider="ollama", model="llava")
72 llm = OllamaProvider(vision_config)
73 await llm.initialize()
74 response = await llm.complete(vision_messages)
75 ```
77Installation:
78 1. Install Ollama from https://ollama.ai
79 2. Pull a model: `ollama pull llama2`
80 3. Start server: `ollama serve` (usually auto-starts)
81 4. Use with dataknobs-llm (no API key needed!)
83See Also:
84 - Ollama: https://ollama.ai
85 - Ollama Models: https://ollama.ai/library
86 - Ollama GitHub: https://github.com/ollama/ollama
87"""
89import os
90import json
91from typing import TYPE_CHECKING, Any, Dict, List, Union, AsyncIterator
93from ..base import (
94 LLMConfig, LLMMessage, LLMResponse, LLMStreamResponse,
95 AsyncLLMProvider, ModelCapability,
96 normalize_llm_config
97)
98from dataknobs_llm.prompts import AsyncPromptBuilder
100if TYPE_CHECKING:
101 from dataknobs_config.config import Config
104class OllamaProvider(AsyncLLMProvider):
105 """Ollama local LLM provider for privacy-first, offline LLM usage.
107 Provides async access to locally-hosted Ollama models, enabling
108 on-premise LLM deployment without cloud APIs. Perfect for sensitive
109 data, air-gapped environments, and cost optimization.
111 Features:
112 - All Ollama models (Llama 2/3, Mistral, Phi, CodeLlama, etc.)
113 - No API key required - fully local
114 - Chat with message history
115 - Streaming responses for real-time output
116 - Embeddings for RAG and semantic search
117 - Tool/function calling (Ollama 0.1.17+)
118 - Vision models (LLaVA, bakllava)
119 - Docker environment auto-detection
120 - Custom model parameters (temperature, top_p, seed)
121 - Zero-cost inference
123 Example:
124 ```python
125 from dataknobs_llm.llm.providers import OllamaProvider
126 from dataknobs_llm.llm.base import LLMConfig, LLMMessage
128 # Basic local usage
129 config = LLMConfig(
130 provider="ollama",
131 model="llama2", # or llama3, mistral, phi, etc.
132 temperature=0.7
133 )
135 async with OllamaProvider(config) as llm:
136 # Simple completion
137 response = await llm.complete("Explain decorators in Python")
138 print(response.content)
140 # Multi-turn conversation
141 messages = [
142 LLMMessage(role="system", content="You are a helpful assistant"),
143 LLMMessage(role="user", content="What is recursion?"),
144 LLMMessage(role="assistant", content="Recursion is..."),
145 LLMMessage(role="user", content="Show me an example")
146 ]
147 response = await llm.complete(messages)
149 # Code generation with CodeLlama
150 code_config = LLMConfig(
151 provider="ollama",
152 model="codellama",
153 temperature=0.2, # Lower for more deterministic code
154 max_tokens=500
155 )
157 llm = OllamaProvider(code_config)
158 await llm.initialize()
159 response = await llm.complete(
160 "Write a Python function to merge two sorted lists"
161 )
162 print(response.content)
164 # Remote Ollama server
165 remote_config = LLMConfig(
166 provider="ollama",
167 model="llama2",
168 api_base="http://192.168.1.100:11434" # Remote server
169 )
171 # Docker usage (auto-detects)
172 # In Docker, automatically uses host.docker.internal
173 docker_config = LLMConfig(
174 provider="ollama",
175 model="mistral"
176 )
178 # Vision model with image input
179 from dataknobs_llm.llm.base import LLMMessage
180 import base64
182 with open("image.jpg", "rb") as f:
183 image_data = base64.b64encode(f.read()).decode()
185 vision_config = LLMConfig(
186 provider="ollama",
187 model="llava" # or bakllava
188 )
190 llm = OllamaProvider(vision_config)
191 await llm.initialize()
193 messages = [
194 LLMMessage(
195 role="user",
196 content="What objects are in this image?",
197 metadata={"images": [image_data]}
198 )
199 ]
201 response = await llm.complete(messages)
202 print(response.content)
204 # Embeddings for RAG
205 embed_config = LLMConfig(
206 provider="ollama",
207 model="nomic-embed-text" # or mxbai-embed-large
208 )
210 llm = OllamaProvider(embed_config)
211 await llm.initialize()
213 # Single embedding
214 embedding = await llm.embed("Sample text")
215 print(f"Dimensions: {len(embedding)}")
217 # Batch embeddings
218 texts = [
219 "Python programming",
220 "Machine learning basics",
221 "Web development with Flask"
222 ]
223 embeddings = await llm.embed(texts)
224 print(f"Generated {len(embeddings)} embeddings")
226 # Tool use (Ollama 0.1.17+)
227 tools = [
228 {
229 "type": "function",
230 "function": {
231 "name": "get_weather",
232 "description": "Get current weather",
233 "parameters": {
234 "type": "object",
235 "properties": {
236 "location": {"type": "string"}
237 },
238 "required": ["location"]
239 }
240 }
241 }
242 ]
244 response = await llm.function_call(messages, tools)
245 ```
247 Args:
248 config: LLMConfig, dataknobs Config, or dict with provider settings
249 prompt_builder: Optional AsyncPromptBuilder for prompt rendering
251 Attributes:
252 base_url (str): Ollama API base URL (auto-detects Docker environment)
253 _client: HTTP client for Ollama API
255 See Also:
256 LLMConfig: Configuration options
257 AsyncLLMProvider: Base provider interface
258 Ollama Documentation: https://ollama.ai
259 """
261 def __init__(
262 self,
263 config: Union[LLMConfig, "Config", Dict[str, Any]],
264 prompt_builder: AsyncPromptBuilder | None = None
265 ):
266 # Normalize config first
267 llm_config = normalize_llm_config(config)
268 super().__init__(llm_config, prompt_builder=prompt_builder)
270 # Check for Docker environment and adjust URL accordingly
271 default_url = 'http://localhost:11434'
272 if os.path.exists('/.dockerenv'):
273 # Running in Docker, use host.docker.internal
274 default_url = 'http://host.docker.internal:11434'
276 # Allow environment variable override
277 self.base_url = llm_config.api_base or os.environ.get('OLLAMA_BASE_URL', default_url)
279 def _build_options(self) -> Dict[str, Any]:
280 """Build options dict for Ollama API calls.
282 Returns:
283 Dictionary of options for the API request.
284 """
285 options: Dict[str, Any] = {}
287 # Only add temperature if it's not the default to avoid issues
288 if self.config.temperature != 1.0:
289 options['temperature'] = float(self.config.temperature)
291 # Only add top_p if explicitly set and different from default
292 if self.config.top_p != 1.0:
293 options['top_p'] = float(self.config.top_p)
295 if self.config.seed is not None:
296 options['seed'] = int(self.config.seed)
298 if self.config.max_tokens:
299 # Ensure it's an integer
300 options['num_predict'] = int(self.config.max_tokens)
302 if self.config.stop_sequences:
303 options['stop'] = list(self.config.stop_sequences)
305 return options
307 def _messages_to_ollama(self, messages: List[LLMMessage]) -> List[Dict[str, Any]]:
308 """Convert LLMMessage list to Ollama chat format.
310 Args:
311 messages: List of LLM messages
313 Returns:
314 List of message dicts in Ollama format
315 """
316 ollama_messages = []
317 for msg in messages:
318 message = {
319 'role': msg.role,
320 'content': msg.content
321 }
322 # Ollama supports images in messages for vision models
323 if msg.metadata.get('images'):
324 message['images'] = msg.metadata['images']
325 ollama_messages.append(message)
326 return ollama_messages
328 def _adapt_tools(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
329 """Adapt tools to Ollama format.
331 Ollama uses a similar format to OpenAI for tools.
333 Args:
334 tools: List of tool definitions
336 Returns:
337 List of tools in Ollama format
338 """
339 # Ollama format is similar to OpenAI
340 ollama_tools = []
341 for tool in tools:
342 ollama_tools.append({
343 'type': 'function',
344 'function': {
345 'name': tool.get('name'),
346 'description': tool.get('description', ''),
347 'parameters': tool.get('parameters', {})
348 }
349 })
350 return ollama_tools
352 async def initialize(self) -> None:
353 """Initialize Ollama client."""
354 try:
355 import aiohttp
356 self._session = aiohttp.ClientSession(
357 timeout=aiohttp.ClientTimeout(total=self.config.timeout or 30.0)
358 )
360 # Test connection and verify model availability
361 try:
362 async with self._session.get(f"{self.base_url}/api/tags") as response:
363 if response.status == 200:
364 data = await response.json()
365 models = [m['name'] for m in data.get('models', [])]
366 if models:
367 # Check if configured model is available
368 if self.config.model not in models:
369 # Try without tag (e.g., 'llama2' instead of 'llama2:latest')
370 base_model = self.config.model.split(':')[0]
371 matching_models = [m for m in models if m.startswith(base_model)]
372 if matching_models:
373 # Use first matching model
374 self.config.model = matching_models[0]
375 import logging
376 logging.info(f"Ollama: Using model {self.config.model}")
377 else:
378 import logging
379 logging.warning(f"Ollama: Model {self.config.model} not found. Available: {models}")
380 else:
381 import logging
382 logging.warning("Ollama: No models found. Please pull a model first.")
383 else:
384 import logging
385 logging.warning(f"Ollama: API returned status {response.status}")
386 except Exception as e:
387 import logging
388 logging.warning(f"Ollama: Could not connect to {self.base_url}: {e}")
390 self._is_initialized = True
391 except ImportError as e:
392 raise ImportError("aiohttp package not installed. Install with: pip install aiohttp") from e
394 async def close(self) -> None:
395 """Close Ollama client."""
396 if hasattr(self, '_session') and self._session:
397 await self._session.close()
398 self._is_initialized = False
400 async def validate_model(self) -> bool:
401 """Validate model availability."""
402 if not self._is_initialized or not hasattr(self, '_session'):
403 return False
405 try:
406 async with self._session.get(f"{self.base_url}/api/tags") as response:
407 if response.status == 200:
408 data = await response.json()
409 models = [m['name'] for m in data.get('models', [])]
410 # Check exact match or base model match
411 if self.config.model in models:
412 return True
413 base_model = self.config.model.split(':')[0]
414 return any(m.startswith(base_model) for m in models)
415 except Exception:
416 return False
417 return False
419 def get_capabilities(self) -> List[ModelCapability]:
420 """Get Ollama model capabilities."""
421 # Capabilities depend on the specific model
422 capabilities = [
423 ModelCapability.TEXT_GENERATION,
424 ModelCapability.CHAT,
425 ModelCapability.STREAMING
426 ]
428 # Most recent Ollama models support function calling
429 if any(model in self.config.model.lower() for model in ['llama3', 'mistral', 'mixtral', 'qwen']):
430 capabilities.append(ModelCapability.FUNCTION_CALLING)
432 if 'llava' in self.config.model.lower():
433 capabilities.append(ModelCapability.VISION)
435 if 'codellama' in self.config.model.lower() or 'codegemma' in self.config.model.lower():
436 capabilities.append(ModelCapability.CODE)
438 return capabilities
440 async def complete(
441 self,
442 messages: Union[str, List[LLMMessage]],
443 **kwargs
444 ) -> LLMResponse:
445 """Generate completion using Ollama chat endpoint."""
446 if not self._is_initialized:
447 await self.initialize()
449 # Convert to message list
450 if isinstance(messages, str):
451 messages = [LLMMessage(role='user', content=messages)]
453 # Add system prompt if configured
454 if self.config.system_prompt and (not messages or messages[0].role != 'system'):
455 messages = [LLMMessage(role='system', content=self.config.system_prompt)] + list(messages)
457 # Convert to Ollama format
458 ollama_messages = self._messages_to_ollama(messages)
460 # Build payload for chat endpoint
461 payload = {
462 'model': self.config.model,
463 'messages': ollama_messages,
464 'stream': False,
465 'options': self._build_options()
466 }
468 # Add format if JSON mode requested
469 if self.config.response_format == 'json':
470 payload['format'] = 'json'
472 async with self._session.post(f"{self.base_url}/api/chat", json=payload) as response:
473 if response.status != 200:
474 error_text = await response.text()
475 import logging
476 logging.error(f"Ollama API error (status {response.status}): {error_text}")
477 logging.error(f"Request payload: {json.dumps(payload, indent=2)}")
478 response.raise_for_status()
479 data = await response.json()
481 # Extract response
482 content = data.get('message', {}).get('content', '')
484 return LLMResponse(
485 content=content,
486 model=self.config.model,
487 finish_reason='stop' if data.get('done') else 'length',
488 usage={
489 'prompt_tokens': data.get('prompt_eval_count', 0),
490 'completion_tokens': data.get('eval_count', 0),
491 'total_tokens': data.get('prompt_eval_count', 0) + data.get('eval_count', 0)
492 } if 'eval_count' in data else None,
493 metadata={
494 'eval_duration': data.get('eval_duration'),
495 'total_duration': data.get('total_duration'),
496 'model_info': data.get('model', '')
497 }
498 )
500 async def stream_complete(
501 self,
502 messages: Union[str, List[LLMMessage]],
503 **kwargs
504 ) -> AsyncIterator[LLMStreamResponse]:
505 """Generate streaming completion."""
506 if not self._is_initialized:
507 await self.initialize()
509 # Convert to Ollama format
510 if isinstance(messages, str):
511 prompt = messages
512 else:
513 prompt = self._build_prompt(messages)
515 # Stream API call
516 payload = {
517 'model': self.config.model,
518 'prompt': prompt,
519 'stream': True,
520 'options': self._build_options()
521 }
523 async with self._session.post(f"{self.base_url}/api/generate", json=payload) as response:
524 response.raise_for_status()
526 async for line in response.content:
527 if line:
528 data = json.loads(line.decode('utf-8'))
529 yield LLMStreamResponse(
530 delta=data.get('response', ''),
531 is_final=data.get('done', False),
532 finish_reason='stop' if data.get('done') else None
533 )
535 async def embed(
536 self,
537 texts: Union[str, List[str]],
538 **kwargs
539 ) -> Union[List[float], List[List[float]]]:
540 """Generate embeddings."""
541 if not self._is_initialized:
542 await self.initialize()
544 if isinstance(texts, str):
545 texts = [texts]
546 single = True
547 else:
548 single = False
550 embeddings = []
551 for text in texts:
552 payload = {
553 'model': self.config.model,
554 'prompt': text
555 }
557 async with self._session.post(f"{self.base_url}/api/embeddings", json=payload) as response:
558 response.raise_for_status()
559 data = await response.json()
560 embeddings.append(data['embedding'])
562 return embeddings[0] if single else embeddings
564 async def function_call(
565 self,
566 messages: List[LLMMessage],
567 functions: List[Dict[str, Any]],
568 **kwargs
569 ) -> LLMResponse:
570 """Execute function calling with native Ollama tools support.
572 For Ollama 0.1.17+, uses native tools API.
573 Falls back to prompt-based approach for older versions.
574 """
575 if not self._is_initialized:
576 await self.initialize()
578 # Add system prompt if configured
579 if self.config.system_prompt and (not messages or messages[0].role != 'system'):
580 messages = [LLMMessage(role='system', content=self.config.system_prompt)] + list(messages)
582 # Convert to Ollama format
583 ollama_messages = self._messages_to_ollama(messages)
585 # Adapt tools to Ollama format
586 ollama_tools = self._adapt_tools(functions)
588 # Build payload with tools
589 payload = {
590 'model': self.config.model,
591 'messages': ollama_messages,
592 'tools': ollama_tools,
593 'stream': False,
594 'options': self._build_options()
595 }
597 try:
598 async with self._session.post(f"{self.base_url}/api/chat", json=payload) as response:
599 response.raise_for_status()
600 data = await response.json()
602 # Extract response and tool calls
603 message = data.get('message', {})
604 content = message.get('content', '')
605 tool_calls = message.get('tool_calls', [])
607 # Build response
608 llm_response = LLMResponse(
609 content=content,
610 model=self.config.model,
611 finish_reason='tool_calls' if tool_calls else 'stop',
612 usage={
613 'prompt_tokens': data.get('prompt_eval_count', 0),
614 'completion_tokens': data.get('eval_count', 0),
615 'total_tokens': data.get('prompt_eval_count', 0) + data.get('eval_count', 0)
616 } if 'eval_count' in data else None
617 )
619 # Add tool call information if present
620 if tool_calls:
621 # Use first tool call (Ollama can return multiple)
622 tool_call = tool_calls[0]
623 llm_response.function_call = {
624 'name': tool_call.get('function', {}).get('name', ''),
625 'arguments': tool_call.get('function', {}).get('arguments', {})
626 }
628 return llm_response
630 except Exception as e:
631 # Fallback to prompt-based approach if native tools not supported
632 import logging
633 logging.warning(f"Ollama native tools failed, falling back to prompt-based: {e}")
635 function_descriptions = json.dumps(functions, indent=2)
637 system_prompt = f"""You have access to these functions:
638{function_descriptions}
640To call a function, respond with JSON:
641{{"function": "name", "arguments": {{...}}}}"""
643 messages_with_system = [
644 LLMMessage(role='system', content=system_prompt)
645 ] + list(messages)
647 llm_response = await self.complete(messages_with_system, **kwargs)
649 # Try to parse function call
650 try:
651 func_data = json.loads(llm_response.content)
652 if 'function' in func_data:
653 llm_response.function_call = {
654 'name': func_data['function'],
655 'arguments': func_data.get('arguments', {})
656 }
657 except json.JSONDecodeError:
658 pass
660 return llm_response
662 def _build_prompt(self, messages: List[LLMMessage]) -> str:
663 """Build prompt from messages."""
664 prompt = ""
665 for msg in messages:
666 if msg.role == 'system':
667 prompt += f"System: {msg.content}\n\n"
668 elif msg.role == 'user':
669 prompt += f"User: {msg.content}\n\n"
670 elif msg.role == 'assistant':
671 prompt += f"Assistant: {msg.content}\n\n"
672 return prompt