Coverage for src/dataknobs_llm/prompts/adapters/dataknobs_backend_adapter.py: 9%
106 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 15:21 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 15:21 -0600
1"""Dataknobs backend resource adapters.
3This module provides adapters that wrap dataknobs database backends, enabling them
4to be used as resource providers in the prompt library system. Supports both sync
5and async database backends.
6"""
8from typing import Any, Dict, List, Optional, TYPE_CHECKING
10from .resource_adapter import ResourceAdapter, AsyncResourceAdapter, BaseSearchLogic
12if TYPE_CHECKING:
13 from dataknobs_data.database import SyncDatabase, AsyncDatabase
14 from dataknobs_data.records import Record
15 from dataknobs_data.query import Query
18class DataknobsBackendAdapter(ResourceAdapter):
19 """Synchronous adapter for dataknobs database backends.
21 Wraps a dataknobs SyncDatabase instance to provide resource adapter functionality.
23 Features:
24 - Record retrieval by ID using database.read()
25 - Search using database.search() with Query objects
26 - Field extraction with dot-notation support
27 - Score-based ranking from search results
29 Example:
30 >>> from dataknobs_data.backends import SyncMemoryDatabase
31 >>> db = SyncMemoryDatabase()
32 >>> # ... populate database ...
33 >>> adapter = DataknobsBackendAdapter(db, name="memory")
34 >>> record = adapter.get_value("record_id_123")
35 >>> results = adapter.search("query text")
36 """
38 def __init__(
39 self,
40 database: "SyncDatabase",
41 name: str = "dataknobs_backend",
42 text_field: str = "content",
43 metadata_field: Optional[str] = None
44 ):
45 """Initialize dataknobs backend adapter.
47 Args:
48 database: SyncDatabase instance to wrap
49 name: Name identifier for this adapter
50 text_field: Field name to use as primary content (default: "content")
51 metadata_field: Optional field to extract as metadata
52 """
53 super().__init__(name=name)
54 self._database = database
55 self._text_field = text_field
56 self._metadata_field = metadata_field
58 def get_value(
59 self,
60 key: str,
61 default: Any = None,
62 context: Optional[Dict[str, Any]] = None
63 ) -> Any:
64 """Retrieve a record or field value by ID.
66 Supports field extraction using dot notation:
67 - Simple key: Returns entire record as dict
68 - "record_id.field_name": Returns specific field value
69 - "record_id.field.nested": Returns nested field value
71 Args:
72 key: Record ID or "record_id.field" notation
73 default: Value to return if record/field not found
74 context: Optional context with additional parameters
76 Returns:
77 Record dict, field value, or default if not found
78 """
79 try:
80 # Parse key for potential field extraction
81 if '.' in key:
82 parts = key.split('.', 1)
83 record_id = parts[0]
84 field_path = parts[1]
85 else:
86 record_id = key
87 field_path = None
89 # Read record from database
90 record = self._database.read(record_id)
92 if record is None:
93 return default
95 # Extract field if specified
96 if field_path:
97 return record.get_value(field_path, default=default)
98 else:
99 # Return full record as dict
100 return record.to_dict(include_metadata=True)
102 except Exception as e:
103 # Log error if needed, return default
104 return default
106 def search(
107 self,
108 query: str,
109 k: int = 5,
110 filters: Optional[Dict[str, Any]] = None,
111 **kwargs
112 ) -> List[Dict[str, Any]]:
113 """Perform search using database backend.
115 Creates a Query object with LIKE filter for text search.
116 Results are formatted according to BaseSearchLogic standards.
118 Args:
119 query: Search query string (searches text_field using LIKE)
120 k: Maximum number of results to return
121 filters: Optional additional filters for the search
122 **kwargs: Additional search options:
123 - min_score: Minimum relevance score (default: 0.0)
124 - deduplicate: Whether to deduplicate results (default: False)
126 Returns:
127 List of search results with structure:
128 {
129 "content": <text content>,
130 "score": <relevance score>,
131 "metadata": {<record metadata>}
132 }
133 """
134 try:
135 from dataknobs_data.query import Query, Filter, Operator
137 # Build filter for text search using LIKE operator
138 # This searches for the query string anywhere in the text field
139 search_filter = Filter(
140 field=self._text_field,
141 operator=Operator.LIKE,
142 value=f"%{query}%"
143 )
145 # Build query object with filter and limit
146 query_obj = Query(
147 filters=[search_filter],
148 limit_value=k
149 )
151 # Execute search
152 records = self._database.search(query_obj)
154 # Format results
155 results = []
156 for record in records:
157 # Extract content field
158 content = record.get_value(self._text_field, default="")
160 # Get score from metadata if available
161 score = record.metadata.get("score", record.metadata.get("_score", 1.0))
163 # Extract metadata
164 metadata = {}
165 if self._metadata_field:
166 metadata_value = record.get_value(self._metadata_field)
167 if metadata_value is not None:
168 metadata["metadata_field"] = metadata_value
170 # Add record ID and other metadata
171 if hasattr(record, 'storage_id') and record.storage_id:
172 metadata["record_id"] = record.storage_id
174 # Merge with record metadata
175 metadata.update(record.metadata)
177 # Format result
178 result = BaseSearchLogic.format_search_result(
179 content,
180 score=score,
181 metadata=metadata
182 )
183 results.append(result)
185 # Apply filters if provided
186 if filters:
187 results = BaseSearchLogic.filter_results(results, filters=filters)
189 # Apply min_score filter
190 min_score = kwargs.get('min_score', 0.0)
191 if min_score > 0:
192 results = BaseSearchLogic.filter_results(results, min_score=min_score)
194 # Deduplicate if requested
195 if kwargs.get('deduplicate', False):
196 results = BaseSearchLogic.deduplicate_results(results, key='content')
198 return results[:k]
200 except Exception as e:
201 # Log error if needed
202 return []
205class AsyncDataknobsBackendAdapter(AsyncResourceAdapter):
206 """Asynchronous adapter for dataknobs database backends.
208 Wraps a dataknobs AsyncDatabase instance to provide async resource adapter functionality.
210 Example:
211 >>> from dataknobs_data.backends import AsyncMemoryDatabase
212 >>> db = AsyncMemoryDatabase()
213 >>> adapter = AsyncDataknobsBackendAdapter(db)
214 >>> record = await adapter.get_value("record_id_123")
215 >>> results = await adapter.search("query text")
216 """
218 def __init__(
219 self,
220 database: "AsyncDatabase",
221 name: str = "async_dataknobs_backend",
222 text_field: str = "content",
223 metadata_field: Optional[str] = None
224 ):
225 """Initialize async dataknobs backend adapter.
227 Args:
228 database: AsyncDatabase instance to wrap
229 name: Name identifier for this adapter
230 text_field: Field name to use as primary content (default: "content")
231 metadata_field: Optional field to extract as metadata
232 """
233 super().__init__(name=name)
234 self._database = database
235 self._text_field = text_field
236 self._metadata_field = metadata_field
238 async def get_value(
239 self,
240 key: str,
241 default: Any = None,
242 context: Optional[Dict[str, Any]] = None
243 ) -> Any:
244 """Retrieve a record or field value by ID (async).
246 See DataknobsBackendAdapter.get_value for details.
247 """
248 try:
249 # Parse key for potential field extraction
250 if '.' in key:
251 parts = key.split('.', 1)
252 record_id = parts[0]
253 field_path = parts[1]
254 else:
255 record_id = key
256 field_path = None
258 # Read record from database
259 record = await self._database.read(record_id)
261 if record is None:
262 return default
264 # Extract field if specified
265 if field_path:
266 return record.get_value(field_path, default=default)
267 else:
268 # Return full record as dict
269 return record.to_dict(include_metadata=True)
271 except Exception as e:
272 # Log error if needed, return default
273 return default
275 async def search(
276 self,
277 query: str,
278 k: int = 5,
279 filters: Optional[Dict[str, Any]] = None,
280 **kwargs
281 ) -> List[Dict[str, Any]]:
282 """Perform search using database backend (async).
284 See DataknobsBackendAdapter.search for details.
285 """
286 try:
287 from dataknobs_data.query import Query, Filter, Operator
289 # Build filter for text search using LIKE operator
290 search_filter = Filter(
291 field=self._text_field,
292 operator=Operator.LIKE,
293 value=f"%{query}%"
294 )
296 # Build query object with filter and limit
297 query_obj = Query(
298 filters=[search_filter],
299 limit_value=k
300 )
302 # Execute search
303 records = await self._database.search(query_obj)
305 # Format results
306 results = []
307 for record in records:
308 # Extract content field
309 content = record.get_value(self._text_field, default="")
311 # Get score from metadata if available
312 score = record.metadata.get("score", record.metadata.get("_score", 1.0))
314 # Extract metadata
315 metadata = {}
316 if self._metadata_field:
317 metadata_value = record.get_value(self._metadata_field)
318 if metadata_value is not None:
319 metadata["metadata_field"] = metadata_value
321 # Add record ID and other metadata
322 if hasattr(record, 'storage_id') and record.storage_id:
323 metadata["record_id"] = record.storage_id
325 # Merge with record metadata
326 metadata.update(record.metadata)
328 # Format result
329 result = BaseSearchLogic.format_search_result(
330 content,
331 score=score,
332 metadata=metadata
333 )
334 results.append(result)
336 # Apply filters if provided
337 if filters:
338 results = BaseSearchLogic.filter_results(results, filters=filters)
340 # Apply min_score filter
341 min_score = kwargs.get('min_score', 0.0)
342 if min_score > 0:
343 results = BaseSearchLogic.filter_results(results, min_score=min_score)
345 # Deduplicate if requested
346 if kwargs.get('deduplicate', False):
347 results = BaseSearchLogic.deduplicate_results(results, key='content')
349 return results[:k]
351 except Exception as e:
352 # Log error if needed
353 return []