Coverage for src/dataknobs_llm/prompts/adapters/resource_adapter.py: 98%
59 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Resource adapter interfaces for pluggable data sources.
3This module defines the adapter pattern for accessing external resources
4(databases, vector stores, configuration systems, etc.) in both synchronous
5and asynchronous contexts.
7Key concepts:
8- Separate sync (ResourceAdapter) and async (AsyncResourceAdapter) interfaces
9- Shared base class (ResourceAdapterBase) for common functionality
10- BaseSearchLogic for reusable search operations
11- No mixing of sync/async - builders require matching adapter types
12"""
14from abc import ABC, abstractmethod
15from typing import Any, Dict, List
16import logging
18logger = logging.getLogger(__name__)
21class ResourceAdapterBase:
22 """Base class with shared functionality for both sync and async adapters.
24 This class provides:
25 - Adapter name and metadata management
26 - Metadata caching
27 - Helper methods for type checking
28 - Common initialization logic
29 """
31 def __init__(self, name: str = "adapter", metadata: Dict[str, Any] | None = None):
32 """Initialize the resource adapter base.
34 Args:
35 name: Adapter identifier (used in logs and error messages)
36 metadata: Optional metadata about this adapter
37 """
38 self._name = name
39 self._metadata = metadata or {}
40 self._metadata_cache: Dict[str, Any] | None = None
42 @property
43 def name(self) -> str:
44 """Get the adapter name."""
45 return self._name
47 def is_async(self) -> bool:
48 """Check if this is an async adapter.
50 Returns:
51 True if this adapter implements AsyncResourceAdapter
52 """
53 return isinstance(self, AsyncResourceAdapter)
55 def get_metadata(self) -> Dict[str, Any]:
56 """Get metadata about this adapter.
58 Returns:
59 Dictionary with adapter metadata
60 """
61 return {
62 "name": self._name,
63 "type": "async" if self.is_async() else "sync",
64 "class": self.__class__.__name__,
65 **self._metadata
66 }
68 def __repr__(self) -> str:
69 """Return a string representation of this adapter."""
70 adapter_type = "async" if self.is_async() else "sync"
71 return f"{self.__class__.__name__}(name={self._name!r}, type={adapter_type})"
74class ResourceAdapter(ResourceAdapterBase, ABC):
75 """Synchronous resource adapter interface.
77 Adapters implementing this interface provide synchronous access to
78 external resources for parameter resolution and RAG searches.
80 All methods are synchronous (blocking).
81 """
83 @abstractmethod
84 def get_value(
85 self,
86 key: str,
87 default: Any = None,
88 context: Dict[str, Any] | None = None
89 ) -> Any:
90 """Retrieve a value by key from the resource.
92 Args:
93 key: The key to look up
94 default: Default value if key not found
95 context: Optional context for the lookup (e.g., user ID, session)
97 Returns:
98 The value associated with the key, or default if not found
99 """
100 pass
102 @abstractmethod
103 def search(
104 self,
105 query: str,
106 k: int = 5,
107 filters: Dict[str, Any] | None = None,
108 **kwargs: Any
109 ) -> List[Dict[str, Any]]:
110 """Perform a search query against the resource.
112 Args:
113 query: Search query string
114 k: Number of results to return (default: 5)
115 filters: Optional filters to apply to the search
116 **kwargs: Additional adapter-specific search parameters
118 Returns:
119 List of search results as dictionaries
120 """
121 pass
123 def batch_get_values(
124 self,
125 keys: List[str],
126 default: Any = None,
127 context: Dict[str, Any] | None = None
128 ) -> Dict[str, Any]:
129 """Retrieve multiple values by keys.
131 Default implementation calls get_value() for each key.
132 Adapters can override for more efficient batch operations.
134 Args:
135 keys: List of keys to look up
136 default: Default value for keys not found
137 context: Optional context for the lookup
139 Returns:
140 Dictionary mapping keys to their values
141 """
142 return {key: self.get_value(key, default, context) for key in keys}
145class AsyncResourceAdapter(ResourceAdapterBase, ABC):
146 """Asynchronous resource adapter interface.
148 Adapters implementing this interface provide asynchronous access to
149 external resources for parameter resolution and RAG searches.
151 All methods are asynchronous (non-blocking).
152 """
154 @abstractmethod
155 async def get_value(
156 self,
157 key: str,
158 default: Any = None,
159 context: Dict[str, Any] | None = None
160 ) -> Any:
161 """Retrieve a value by key from the resource (async).
163 Args:
164 key: The key to look up
165 default: Default value if key not found
166 context: Optional context for the lookup (e.g., user ID, session)
168 Returns:
169 The value associated with the key, or default if not found
170 """
171 pass
173 @abstractmethod
174 async def search(
175 self,
176 query: str,
177 k: int = 5,
178 filters: Dict[str, Any] | None = None,
179 **kwargs: Any
180 ) -> List[Dict[str, Any]]:
181 """Perform a search query against the resource (async).
183 Args:
184 query: Search query string
185 k: Number of results to return (default: 5)
186 filters: Optional filters to apply to the search
187 **kwargs: Additional adapter-specific search parameters
189 Returns:
190 List of search results as dictionaries
191 """
192 pass
194 async def batch_get_values(
195 self,
196 keys: List[str],
197 default: Any = None,
198 context: Dict[str, Any] | None = None
199 ) -> Dict[str, Any]:
200 """Retrieve multiple values by keys (async).
202 Default implementation calls get_value() concurrently for each key.
203 Adapters can override for more efficient batch operations.
205 Args:
206 keys: List of keys to look up
207 default: Default value for keys not found
208 context: Optional context for the lookup
210 Returns:
211 Dictionary mapping keys to their values
212 """
213 import asyncio
214 tasks = [self.get_value(key, default, context) for key in keys]
215 values = await asyncio.gather(*tasks)
216 return dict(zip(keys, values, strict=True))
219class BaseSearchLogic:
220 """Shared search logic utilities for both sync and async adapters.
222 This class provides helper methods for common search operations:
223 - Result formatting and filtering
224 - Score normalization
225 - Result deduplication
226 - Metadata extraction
227 """
229 @staticmethod
230 def format_search_result(
231 item: Any,
232 score: float | None = None,
233 metadata: Dict[str, Any] | None = None
234 ) -> Dict[str, Any]:
235 """Format a search result into a standardized dictionary.
237 Args:
238 item: The search result item (could be string, dict, or object)
239 score: Optional relevance score
240 metadata: Optional metadata about the result
242 Returns:
243 Formatted result dictionary with 'content', 'score', 'metadata'
244 """
245 result: Dict[str, Any] = {}
247 # Extract content
248 if isinstance(item, str):
249 result["content"] = item
250 elif isinstance(item, dict):
251 # Try common content keys
252 result["content"] = item.get("content") or item.get("text") or str(item)
253 # Preserve other fields as metadata
254 metadata = {**item, **(metadata or {})}
255 else:
256 result["content"] = str(item)
258 # Add score if provided
259 if score is not None:
260 result["score"] = score
262 # Add metadata if provided
263 if metadata:
264 result["metadata"] = metadata
266 return result
268 @staticmethod
269 def filter_results(
270 results: List[Dict[str, Any]],
271 filters: Dict[str, Any] | None = None,
272 min_score: float | None = None
273 ) -> List[Dict[str, Any]]:
274 """Filter search results based on criteria.
276 Args:
277 results: List of search result dictionaries
278 filters: Dictionary of field filters (exact match)
279 min_score: Minimum score threshold
281 Returns:
282 Filtered list of results
283 """
284 filtered = results
286 # Apply score threshold
287 if min_score is not None:
288 filtered = [r for r in filtered if r.get("score", 0.0) >= min_score]
290 # Apply field filters
291 if filters:
292 for key, value in filters.items():
293 filtered = [
294 r for r in filtered
295 if r.get(key) == value or r.get("metadata", {}).get(key) == value
296 ]
298 return filtered
300 @staticmethod
301 def deduplicate_results(
302 results: List[Dict[str, Any]],
303 key: str = "content"
304 ) -> List[Dict[str, Any]]:
305 """Remove duplicate results based on a key.
307 Args:
308 results: List of search result dictionaries
309 key: Key to use for deduplication (default: "content")
311 Returns:
312 Deduplicated list of results (preserves order, keeps first occurrence)
313 """
314 seen = set()
315 deduplicated = []
317 for result in results:
318 value = result.get(key)
319 if value not in seen:
320 seen.add(value)
321 deduplicated.append(result)
323 return deduplicated