Coverage for src/dataknobs_llm/prompts/adapters/dataknobs_backend_adapter.py: 85%
106 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Dataknobs backend resource adapters.
3This module provides adapters that wrap dataknobs database backends, enabling them
4to be used as resource providers in the prompt library system. Supports both sync
5and async database backends.
6"""
8from typing import Any, Dict, List, TYPE_CHECKING
10from .resource_adapter import ResourceAdapter, AsyncResourceAdapter, BaseSearchLogic
12if TYPE_CHECKING:
13 from dataknobs_data.database import SyncDatabase, AsyncDatabase
16class DataknobsBackendAdapter(ResourceAdapter):
17 """Synchronous adapter for dataknobs database backends.
19 Wraps a dataknobs SyncDatabase instance to provide resource adapter functionality.
21 Features:
22 - Record retrieval by ID using database.read()
23 - Search using database.search() with Query objects
24 - Field extraction with dot-notation support
25 - Score-based ranking from search results
27 Example:
28 >>> from dataknobs_data.backends import SyncMemoryDatabase
29 >>> db = SyncMemoryDatabase()
30 >>> # ... populate database ...
31 >>> adapter = DataknobsBackendAdapter(db, name="memory")
32 >>> record = adapter.get_value("record_id_123")
33 >>> results = adapter.search("query text")
34 """
36 def __init__(
37 self,
38 database: "SyncDatabase",
39 name: str = "dataknobs_backend",
40 text_field: str = "content",
41 metadata_field: str | None = None
42 ):
43 """Initialize dataknobs backend adapter.
45 Args:
46 database: SyncDatabase instance to wrap
47 name: Name identifier for this adapter
48 text_field: Field name to use as primary content (default: "content")
49 metadata_field: Optional field to extract as metadata
50 """
51 super().__init__(name=name)
52 self._database = database
53 self._text_field = text_field
54 self._metadata_field = metadata_field
56 def get_value(
57 self,
58 key: str,
59 default: Any = None,
60 context: Dict[str, Any] | None = None
61 ) -> Any:
62 """Retrieve a record or field value by ID.
64 Supports field extraction using dot notation:
65 - Simple key: Returns entire record as dict
66 - "record_id.field_name": Returns specific field value
67 - "record_id.field.nested": Returns nested field value
69 Args:
70 key: Record ID or "record_id.field" notation
71 default: Value to return if record/field not found
72 context: Optional context with additional parameters
74 Returns:
75 Record dict, field value, or default if not found
76 """
77 try:
78 # Parse key for potential field extraction
79 if '.' in key:
80 parts = key.split('.', 1)
81 record_id = parts[0]
82 field_path = parts[1]
83 else:
84 record_id = key
85 field_path = None
87 # Read record from database
88 record = self._database.read(record_id)
90 if record is None:
91 return default
93 # Extract field if specified
94 if field_path:
95 return record.get_value(field_path, default=default)
96 else:
97 # Return full record as dict
98 return record.to_dict(include_metadata=True)
100 except Exception:
101 # Log error if needed, return default
102 return default
104 def search(
105 self,
106 query: str,
107 k: int = 5,
108 filters: Dict[str, Any] | None = None,
109 **kwargs: Any
110 ) -> List[Dict[str, Any]]:
111 """Perform search using database backend.
113 Creates a Query object with LIKE filter for text search.
114 Results are formatted according to BaseSearchLogic standards.
116 Args:
117 query: Search query string (searches text_field using LIKE)
118 k: Maximum number of results to return
119 filters: Optional additional filters for the search
120 **kwargs: Additional search options:
121 - min_score: Minimum relevance score (default: 0.0)
122 - deduplicate: Whether to deduplicate results (default: False)
124 Returns:
125 List of search results with structure:
126 {
127 "content": <text content>,
128 "score": <relevance score>,
129 "metadata": {<record metadata>}
130 }
131 """
132 try:
133 from dataknobs_data.query import Query, Filter, Operator
135 # Build filter for text search using LIKE operator
136 # This searches for the query string anywhere in the text field
137 search_filter = Filter(
138 field=self._text_field,
139 operator=Operator.LIKE,
140 value=f"%{query}%"
141 )
143 # Build query object with filter and limit
144 query_obj = Query(
145 filters=[search_filter],
146 limit_value=k
147 )
149 # Execute search
150 records = self._database.search(query_obj)
152 # Format results
153 results = []
154 for record in records:
155 # Extract content field
156 content = record.get_value(self._text_field, default="")
158 # Get score from metadata if available
159 score = record.metadata.get("score", record.metadata.get("_score", 1.0))
161 # Extract metadata
162 metadata = {}
163 if self._metadata_field:
164 metadata_value = record.get_value(self._metadata_field)
165 if metadata_value is not None:
166 metadata["metadata_field"] = metadata_value
168 # Add record ID and other metadata
169 if hasattr(record, 'storage_id') and record.storage_id:
170 metadata["record_id"] = record.storage_id
172 # Merge with record metadata
173 metadata.update(record.metadata)
175 # Format result
176 result = BaseSearchLogic.format_search_result(
177 content,
178 score=score,
179 metadata=metadata
180 )
181 results.append(result)
183 # Apply filters if provided
184 if filters:
185 results = BaseSearchLogic.filter_results(results, filters=filters)
187 # Apply min_score filter
188 min_score = kwargs.get('min_score', 0.0)
189 if min_score > 0:
190 results = BaseSearchLogic.filter_results(results, min_score=min_score)
192 # Deduplicate if requested
193 if kwargs.get('deduplicate', False):
194 results = BaseSearchLogic.deduplicate_results(results, key='content')
196 return results[:k]
198 except Exception:
199 # Log error if needed
200 return []
203class AsyncDataknobsBackendAdapter(AsyncResourceAdapter):
204 """Asynchronous adapter for dataknobs database backends.
206 Wraps a dataknobs AsyncDatabase instance to provide async resource adapter functionality.
208 Example:
209 >>> from dataknobs_data.backends import AsyncMemoryDatabase
210 >>> db = AsyncMemoryDatabase()
211 >>> adapter = AsyncDataknobsBackendAdapter(db)
212 >>> record = await adapter.get_value("record_id_123")
213 >>> results = await adapter.search("query text")
214 """
216 def __init__(
217 self,
218 database: "AsyncDatabase",
219 name: str = "async_dataknobs_backend",
220 text_field: str = "content",
221 metadata_field: str | None = None
222 ):
223 """Initialize async dataknobs backend adapter.
225 Args:
226 database: AsyncDatabase instance to wrap
227 name: Name identifier for this adapter
228 text_field: Field name to use as primary content (default: "content")
229 metadata_field: Optional field to extract as metadata
230 """
231 super().__init__(name=name)
232 self._database = database
233 self._text_field = text_field
234 self._metadata_field = metadata_field
236 async def get_value(
237 self,
238 key: str,
239 default: Any = None,
240 context: Dict[str, Any] | None = None
241 ) -> Any:
242 """Retrieve a record or field value by ID (async).
244 See DataknobsBackendAdapter.get_value for details.
245 """
246 try:
247 # Parse key for potential field extraction
248 if '.' in key:
249 parts = key.split('.', 1)
250 record_id = parts[0]
251 field_path = parts[1]
252 else:
253 record_id = key
254 field_path = None
256 # Read record from database
257 record = await self._database.read(record_id)
259 if record is None:
260 return default
262 # Extract field if specified
263 if field_path:
264 return record.get_value(field_path, default=default)
265 else:
266 # Return full record as dict
267 return record.to_dict(include_metadata=True)
269 except Exception:
270 # Log error if needed, return default
271 return default
273 async def search(
274 self,
275 query: str,
276 k: int = 5,
277 filters: Dict[str, Any] | None = None,
278 **kwargs: Any
279 ) -> List[Dict[str, Any]]:
280 """Perform search using database backend (async).
282 See DataknobsBackendAdapter.search for details.
283 """
284 try:
285 from dataknobs_data.query import Query, Filter, Operator
287 # Build filter for text search using LIKE operator
288 search_filter = Filter(
289 field=self._text_field,
290 operator=Operator.LIKE,
291 value=f"%{query}%"
292 )
294 # Build query object with filter and limit
295 query_obj = Query(
296 filters=[search_filter],
297 limit_value=k
298 )
300 # Execute search
301 records = await self._database.search(query_obj)
303 # Format results
304 results = []
305 for record in records:
306 # Extract content field
307 content = record.get_value(self._text_field, default="")
309 # Get score from metadata if available
310 score = record.metadata.get("score", record.metadata.get("_score", 1.0))
312 # Extract metadata
313 metadata = {}
314 if self._metadata_field:
315 metadata_value = record.get_value(self._metadata_field)
316 if metadata_value is not None:
317 metadata["metadata_field"] = metadata_value
319 # Add record ID and other metadata
320 if hasattr(record, 'storage_id') and record.storage_id:
321 metadata["record_id"] = record.storage_id
323 # Merge with record metadata
324 metadata.update(record.metadata)
326 # Format result
327 result = BaseSearchLogic.format_search_result(
328 content,
329 score=score,
330 metadata=metadata
331 )
332 results.append(result)
334 # Apply filters if provided
335 if filters:
336 results = BaseSearchLogic.filter_results(results, filters=filters)
338 # Apply min_score filter
339 min_score = kwargs.get('min_score', 0.0)
340 if min_score > 0:
341 results = BaseSearchLogic.filter_results(results, min_score=min_score)
343 # Deduplicate if requested
344 if kwargs.get('deduplicate', False):
345 results = BaseSearchLogic.deduplicate_results(results, key='content')
347 return results[:k]
349 except Exception:
350 # Log error if needed
351 return []