Coverage for src/dataknobs_llm/prompts/adapters/dataknobs_backend_adapter.py: 9%

106 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-31 15:21 -0600

1"""Dataknobs backend resource adapters. 

2 

3This module provides adapters that wrap dataknobs database backends, enabling them 

4to be used as resource providers in the prompt library system. Supports both sync 

5and async database backends. 

6""" 

7 

8from typing import Any, Dict, List, Optional, TYPE_CHECKING 

9 

10from .resource_adapter import ResourceAdapter, AsyncResourceAdapter, BaseSearchLogic 

11 

12if TYPE_CHECKING: 

13 from dataknobs_data.database import SyncDatabase, AsyncDatabase 

14 from dataknobs_data.records import Record 

15 from dataknobs_data.query import Query 

16 

17 

18class DataknobsBackendAdapter(ResourceAdapter): 

19 """Synchronous adapter for dataknobs database backends. 

20 

21 Wraps a dataknobs SyncDatabase instance to provide resource adapter functionality. 

22 

23 Features: 

24 - Record retrieval by ID using database.read() 

25 - Search using database.search() with Query objects 

26 - Field extraction with dot-notation support 

27 - Score-based ranking from search results 

28 

29 Example: 

30 >>> from dataknobs_data.backends import SyncMemoryDatabase 

31 >>> db = SyncMemoryDatabase() 

32 >>> # ... populate database ... 

33 >>> adapter = DataknobsBackendAdapter(db, name="memory") 

34 >>> record = adapter.get_value("record_id_123") 

35 >>> results = adapter.search("query text") 

36 """ 

37 

38 def __init__( 

39 self, 

40 database: "SyncDatabase", 

41 name: str = "dataknobs_backend", 

42 text_field: str = "content", 

43 metadata_field: Optional[str] = None 

44 ): 

45 """Initialize dataknobs backend adapter. 

46 

47 Args: 

48 database: SyncDatabase instance to wrap 

49 name: Name identifier for this adapter 

50 text_field: Field name to use as primary content (default: "content") 

51 metadata_field: Optional field to extract as metadata 

52 """ 

53 super().__init__(name=name) 

54 self._database = database 

55 self._text_field = text_field 

56 self._metadata_field = metadata_field 

57 

58 def get_value( 

59 self, 

60 key: str, 

61 default: Any = None, 

62 context: Optional[Dict[str, Any]] = None 

63 ) -> Any: 

64 """Retrieve a record or field value by ID. 

65 

66 Supports field extraction using dot notation: 

67 - Simple key: Returns entire record as dict 

68 - "record_id.field_name": Returns specific field value 

69 - "record_id.field.nested": Returns nested field value 

70 

71 Args: 

72 key: Record ID or "record_id.field" notation 

73 default: Value to return if record/field not found 

74 context: Optional context with additional parameters 

75 

76 Returns: 

77 Record dict, field value, or default if not found 

78 """ 

79 try: 

80 # Parse key for potential field extraction 

81 if '.' in key: 

82 parts = key.split('.', 1) 

83 record_id = parts[0] 

84 field_path = parts[1] 

85 else: 

86 record_id = key 

87 field_path = None 

88 

89 # Read record from database 

90 record = self._database.read(record_id) 

91 

92 if record is None: 

93 return default 

94 

95 # Extract field if specified 

96 if field_path: 

97 return record.get_value(field_path, default=default) 

98 else: 

99 # Return full record as dict 

100 return record.to_dict(include_metadata=True) 

101 

102 except Exception as e: 

103 # Log error if needed, return default 

104 return default 

105 

106 def search( 

107 self, 

108 query: str, 

109 k: int = 5, 

110 filters: Optional[Dict[str, Any]] = None, 

111 **kwargs 

112 ) -> List[Dict[str, Any]]: 

113 """Perform search using database backend. 

114 

115 Creates a Query object with LIKE filter for text search. 

116 Results are formatted according to BaseSearchLogic standards. 

117 

118 Args: 

119 query: Search query string (searches text_field using LIKE) 

120 k: Maximum number of results to return 

121 filters: Optional additional filters for the search 

122 **kwargs: Additional search options: 

123 - min_score: Minimum relevance score (default: 0.0) 

124 - deduplicate: Whether to deduplicate results (default: False) 

125 

126 Returns: 

127 List of search results with structure: 

128 { 

129 "content": <text content>, 

130 "score": <relevance score>, 

131 "metadata": {<record metadata>} 

132 } 

133 """ 

134 try: 

135 from dataknobs_data.query import Query, Filter, Operator 

136 

137 # Build filter for text search using LIKE operator 

138 # This searches for the query string anywhere in the text field 

139 search_filter = Filter( 

140 field=self._text_field, 

141 operator=Operator.LIKE, 

142 value=f"%{query}%" 

143 ) 

144 

145 # Build query object with filter and limit 

146 query_obj = Query( 

147 filters=[search_filter], 

148 limit_value=k 

149 ) 

150 

151 # Execute search 

152 records = self._database.search(query_obj) 

153 

154 # Format results 

155 results = [] 

156 for record in records: 

157 # Extract content field 

158 content = record.get_value(self._text_field, default="") 

159 

160 # Get score from metadata if available 

161 score = record.metadata.get("score", record.metadata.get("_score", 1.0)) 

162 

163 # Extract metadata 

164 metadata = {} 

165 if self._metadata_field: 

166 metadata_value = record.get_value(self._metadata_field) 

167 if metadata_value is not None: 

168 metadata["metadata_field"] = metadata_value 

169 

170 # Add record ID and other metadata 

171 if hasattr(record, 'storage_id') and record.storage_id: 

172 metadata["record_id"] = record.storage_id 

173 

174 # Merge with record metadata 

175 metadata.update(record.metadata) 

176 

177 # Format result 

178 result = BaseSearchLogic.format_search_result( 

179 content, 

180 score=score, 

181 metadata=metadata 

182 ) 

183 results.append(result) 

184 

185 # Apply filters if provided 

186 if filters: 

187 results = BaseSearchLogic.filter_results(results, filters=filters) 

188 

189 # Apply min_score filter 

190 min_score = kwargs.get('min_score', 0.0) 

191 if min_score > 0: 

192 results = BaseSearchLogic.filter_results(results, min_score=min_score) 

193 

194 # Deduplicate if requested 

195 if kwargs.get('deduplicate', False): 

196 results = BaseSearchLogic.deduplicate_results(results, key='content') 

197 

198 return results[:k] 

199 

200 except Exception as e: 

201 # Log error if needed 

202 return [] 

203 

204 

205class AsyncDataknobsBackendAdapter(AsyncResourceAdapter): 

206 """Asynchronous adapter for dataknobs database backends. 

207 

208 Wraps a dataknobs AsyncDatabase instance to provide async resource adapter functionality. 

209 

210 Example: 

211 >>> from dataknobs_data.backends import AsyncMemoryDatabase 

212 >>> db = AsyncMemoryDatabase() 

213 >>> adapter = AsyncDataknobsBackendAdapter(db) 

214 >>> record = await adapter.get_value("record_id_123") 

215 >>> results = await adapter.search("query text") 

216 """ 

217 

218 def __init__( 

219 self, 

220 database: "AsyncDatabase", 

221 name: str = "async_dataknobs_backend", 

222 text_field: str = "content", 

223 metadata_field: Optional[str] = None 

224 ): 

225 """Initialize async dataknobs backend adapter. 

226 

227 Args: 

228 database: AsyncDatabase instance to wrap 

229 name: Name identifier for this adapter 

230 text_field: Field name to use as primary content (default: "content") 

231 metadata_field: Optional field to extract as metadata 

232 """ 

233 super().__init__(name=name) 

234 self._database = database 

235 self._text_field = text_field 

236 self._metadata_field = metadata_field 

237 

238 async def get_value( 

239 self, 

240 key: str, 

241 default: Any = None, 

242 context: Optional[Dict[str, Any]] = None 

243 ) -> Any: 

244 """Retrieve a record or field value by ID (async). 

245 

246 See DataknobsBackendAdapter.get_value for details. 

247 """ 

248 try: 

249 # Parse key for potential field extraction 

250 if '.' in key: 

251 parts = key.split('.', 1) 

252 record_id = parts[0] 

253 field_path = parts[1] 

254 else: 

255 record_id = key 

256 field_path = None 

257 

258 # Read record from database 

259 record = await self._database.read(record_id) 

260 

261 if record is None: 

262 return default 

263 

264 # Extract field if specified 

265 if field_path: 

266 return record.get_value(field_path, default=default) 

267 else: 

268 # Return full record as dict 

269 return record.to_dict(include_metadata=True) 

270 

271 except Exception as e: 

272 # Log error if needed, return default 

273 return default 

274 

275 async def search( 

276 self, 

277 query: str, 

278 k: int = 5, 

279 filters: Optional[Dict[str, Any]] = None, 

280 **kwargs 

281 ) -> List[Dict[str, Any]]: 

282 """Perform search using database backend (async). 

283 

284 See DataknobsBackendAdapter.search for details. 

285 """ 

286 try: 

287 from dataknobs_data.query import Query, Filter, Operator 

288 

289 # Build filter for text search using LIKE operator 

290 search_filter = Filter( 

291 field=self._text_field, 

292 operator=Operator.LIKE, 

293 value=f"%{query}%" 

294 ) 

295 

296 # Build query object with filter and limit 

297 query_obj = Query( 

298 filters=[search_filter], 

299 limit_value=k 

300 ) 

301 

302 # Execute search 

303 records = await self._database.search(query_obj) 

304 

305 # Format results 

306 results = [] 

307 for record in records: 

308 # Extract content field 

309 content = record.get_value(self._text_field, default="") 

310 

311 # Get score from metadata if available 

312 score = record.metadata.get("score", record.metadata.get("_score", 1.0)) 

313 

314 # Extract metadata 

315 metadata = {} 

316 if self._metadata_field: 

317 metadata_value = record.get_value(self._metadata_field) 

318 if metadata_value is not None: 

319 metadata["metadata_field"] = metadata_value 

320 

321 # Add record ID and other metadata 

322 if hasattr(record, 'storage_id') and record.storage_id: 

323 metadata["record_id"] = record.storage_id 

324 

325 # Merge with record metadata 

326 metadata.update(record.metadata) 

327 

328 # Format result 

329 result = BaseSearchLogic.format_search_result( 

330 content, 

331 score=score, 

332 metadata=metadata 

333 ) 

334 results.append(result) 

335 

336 # Apply filters if provided 

337 if filters: 

338 results = BaseSearchLogic.filter_results(results, filters=filters) 

339 

340 # Apply min_score filter 

341 min_score = kwargs.get('min_score', 0.0) 

342 if min_score > 0: 

343 results = BaseSearchLogic.filter_results(results, min_score=min_score) 

344 

345 # Deduplicate if requested 

346 if kwargs.get('deduplicate', False): 

347 results = BaseSearchLogic.deduplicate_results(results, key='content') 

348 

349 return results[:k] 

350 

351 except Exception as e: 

352 # Log error if needed 

353 return []