Coverage for src / dataknobs_llm / prompts / adapters / dataknobs_backend_adapter.py: 9%

106 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 10:28 -0700

1"""Dataknobs backend resource adapters. 

2 

3This module provides adapters that wrap dataknobs database backends, enabling them 

4to be used as resource providers in the prompt library system. Supports both sync 

5and async database backends. 

6""" 

7 

8from typing import Any, Dict, List, TYPE_CHECKING 

9 

10from .resource_adapter import ResourceAdapter, AsyncResourceAdapter, BaseSearchLogic 

11 

12if TYPE_CHECKING: 

13 from dataknobs_data.database import SyncDatabase, AsyncDatabase 

14 

15 

16class DataknobsBackendAdapter(ResourceAdapter): 

17 """Synchronous adapter for dataknobs database backends. 

18 

19 Wraps a dataknobs SyncDatabase instance to provide resource adapter functionality. 

20 

21 Features: 

22 - Record retrieval by ID using database.read() 

23 - Search using database.search() with Query objects 

24 - Field extraction with dot-notation support 

25 - Score-based ranking from search results 

26 

27 Example: 

28 >>> from dataknobs_data.backends import SyncMemoryDatabase 

29 >>> db = SyncMemoryDatabase() 

30 >>> # ... populate database ... 

31 >>> adapter = DataknobsBackendAdapter(db, name="memory") 

32 >>> record = adapter.get_value("record_id_123") 

33 >>> results = adapter.search("query text") 

34 """ 

35 

36 def __init__( 

37 self, 

38 database: "SyncDatabase", 

39 name: str = "dataknobs_backend", 

40 text_field: str = "content", 

41 metadata_field: str | None = None 

42 ): 

43 """Initialize dataknobs backend adapter. 

44 

45 Args: 

46 database: SyncDatabase instance to wrap 

47 name: Name identifier for this adapter 

48 text_field: Field name to use as primary content (default: "content") 

49 metadata_field: Optional field to extract as metadata 

50 """ 

51 super().__init__(name=name) 

52 self._database = database 

53 self._text_field = text_field 

54 self._metadata_field = metadata_field 

55 

56 def get_value( 

57 self, 

58 key: str, 

59 default: Any = None, 

60 context: Dict[str, Any] | None = None 

61 ) -> Any: 

62 """Retrieve a record or field value by ID. 

63 

64 Supports field extraction using dot notation: 

65 - Simple key: Returns entire record as dict 

66 - "record_id.field_name": Returns specific field value 

67 - "record_id.field.nested": Returns nested field value 

68 

69 Args: 

70 key: Record ID or "record_id.field" notation 

71 default: Value to return if record/field not found 

72 context: Optional context with additional parameters 

73 

74 Returns: 

75 Record dict, field value, or default if not found 

76 """ 

77 try: 

78 # Parse key for potential field extraction 

79 if '.' in key: 

80 parts = key.split('.', 1) 

81 record_id = parts[0] 

82 field_path = parts[1] 

83 else: 

84 record_id = key 

85 field_path = None 

86 

87 # Read record from database 

88 record = self._database.read(record_id) 

89 

90 if record is None: 

91 return default 

92 

93 # Extract field if specified 

94 if field_path: 

95 return record.get_value(field_path, default=default) 

96 else: 

97 # Return full record as dict 

98 return record.to_dict(include_metadata=True) 

99 

100 except Exception: 

101 # Log error if needed, return default 

102 return default 

103 

104 def search( 

105 self, 

106 query: str, 

107 k: int = 5, 

108 filters: Dict[str, Any] | None = None, 

109 **kwargs: Any 

110 ) -> List[Dict[str, Any]]: 

111 """Perform search using database backend. 

112 

113 Creates a Query object with LIKE filter for text search. 

114 Results are formatted according to BaseSearchLogic standards. 

115 

116 Args: 

117 query: Search query string (searches text_field using LIKE) 

118 k: Maximum number of results to return 

119 filters: Optional additional filters for the search 

120 **kwargs: Additional search options: 

121 - min_score: Minimum relevance score (default: 0.0) 

122 - deduplicate: Whether to deduplicate results (default: False) 

123 

124 Returns: 

125 List of search results with structure: 

126 { 

127 "content": <text content>, 

128 "score": <relevance score>, 

129 "metadata": {<record metadata>} 

130 } 

131 """ 

132 try: 

133 from dataknobs_data.query import Query, Filter, Operator 

134 

135 # Build filter for text search using LIKE operator 

136 # This searches for the query string anywhere in the text field 

137 search_filter = Filter( 

138 field=self._text_field, 

139 operator=Operator.LIKE, 

140 value=f"%{query}%" 

141 ) 

142 

143 # Build query object with filter and limit 

144 query_obj = Query( 

145 filters=[search_filter], 

146 limit_value=k 

147 ) 

148 

149 # Execute search 

150 records = self._database.search(query_obj) 

151 

152 # Format results 

153 results = [] 

154 for record in records: 

155 # Extract content field 

156 content = record.get_value(self._text_field, default="") 

157 

158 # Get score from metadata if available 

159 score = record.metadata.get("score", record.metadata.get("_score", 1.0)) 

160 

161 # Extract metadata 

162 metadata = {} 

163 if self._metadata_field: 

164 metadata_value = record.get_value(self._metadata_field) 

165 if metadata_value is not None: 

166 metadata["metadata_field"] = metadata_value 

167 

168 # Add record ID and other metadata 

169 if hasattr(record, 'storage_id') and record.storage_id: 

170 metadata["record_id"] = record.storage_id 

171 

172 # Merge with record metadata 

173 metadata.update(record.metadata) 

174 

175 # Format result 

176 result = BaseSearchLogic.format_search_result( 

177 content, 

178 score=score, 

179 metadata=metadata 

180 ) 

181 results.append(result) 

182 

183 # Apply filters if provided 

184 if filters: 

185 results = BaseSearchLogic.filter_results(results, filters=filters) 

186 

187 # Apply min_score filter 

188 min_score = kwargs.get('min_score', 0.0) 

189 if min_score > 0: 

190 results = BaseSearchLogic.filter_results(results, min_score=min_score) 

191 

192 # Deduplicate if requested 

193 if kwargs.get('deduplicate', False): 

194 results = BaseSearchLogic.deduplicate_results(results, key='content') 

195 

196 return results[:k] 

197 

198 except Exception: 

199 # Log error if needed 

200 return [] 

201 

202 

203class AsyncDataknobsBackendAdapter(AsyncResourceAdapter): 

204 """Asynchronous adapter for dataknobs database backends. 

205 

206 Wraps a dataknobs AsyncDatabase instance to provide async resource adapter functionality. 

207 

208 Example: 

209 >>> from dataknobs_data.backends import AsyncMemoryDatabase 

210 >>> db = AsyncMemoryDatabase() 

211 >>> adapter = AsyncDataknobsBackendAdapter(db) 

212 >>> record = await adapter.get_value("record_id_123") 

213 >>> results = await adapter.search("query text") 

214 """ 

215 

216 def __init__( 

217 self, 

218 database: "AsyncDatabase", 

219 name: str = "async_dataknobs_backend", 

220 text_field: str = "content", 

221 metadata_field: str | None = None 

222 ): 

223 """Initialize async dataknobs backend adapter. 

224 

225 Args: 

226 database: AsyncDatabase instance to wrap 

227 name: Name identifier for this adapter 

228 text_field: Field name to use as primary content (default: "content") 

229 metadata_field: Optional field to extract as metadata 

230 """ 

231 super().__init__(name=name) 

232 self._database = database 

233 self._text_field = text_field 

234 self._metadata_field = metadata_field 

235 

236 async def get_value( 

237 self, 

238 key: str, 

239 default: Any = None, 

240 context: Dict[str, Any] | None = None 

241 ) -> Any: 

242 """Retrieve a record or field value by ID (async). 

243 

244 See DataknobsBackendAdapter.get_value for details. 

245 """ 

246 try: 

247 # Parse key for potential field extraction 

248 if '.' in key: 

249 parts = key.split('.', 1) 

250 record_id = parts[0] 

251 field_path = parts[1] 

252 else: 

253 record_id = key 

254 field_path = None 

255 

256 # Read record from database 

257 record = await self._database.read(record_id) 

258 

259 if record is None: 

260 return default 

261 

262 # Extract field if specified 

263 if field_path: 

264 return record.get_value(field_path, default=default) 

265 else: 

266 # Return full record as dict 

267 return record.to_dict(include_metadata=True) 

268 

269 except Exception: 

270 # Log error if needed, return default 

271 return default 

272 

273 async def search( 

274 self, 

275 query: str, 

276 k: int = 5, 

277 filters: Dict[str, Any] | None = None, 

278 **kwargs: Any 

279 ) -> List[Dict[str, Any]]: 

280 """Perform search using database backend (async). 

281 

282 See DataknobsBackendAdapter.search for details. 

283 """ 

284 try: 

285 from dataknobs_data.query import Query, Filter, Operator 

286 

287 # Build filter for text search using LIKE operator 

288 search_filter = Filter( 

289 field=self._text_field, 

290 operator=Operator.LIKE, 

291 value=f"%{query}%" 

292 ) 

293 

294 # Build query object with filter and limit 

295 query_obj = Query( 

296 filters=[search_filter], 

297 limit_value=k 

298 ) 

299 

300 # Execute search 

301 records = await self._database.search(query_obj) 

302 

303 # Format results 

304 results = [] 

305 for record in records: 

306 # Extract content field 

307 content = record.get_value(self._text_field, default="") 

308 

309 # Get score from metadata if available 

310 score = record.metadata.get("score", record.metadata.get("_score", 1.0)) 

311 

312 # Extract metadata 

313 metadata = {} 

314 if self._metadata_field: 

315 metadata_value = record.get_value(self._metadata_field) 

316 if metadata_value is not None: 

317 metadata["metadata_field"] = metadata_value 

318 

319 # Add record ID and other metadata 

320 if hasattr(record, 'storage_id') and record.storage_id: 

321 metadata["record_id"] = record.storage_id 

322 

323 # Merge with record metadata 

324 metadata.update(record.metadata) 

325 

326 # Format result 

327 result = BaseSearchLogic.format_search_result( 

328 content, 

329 score=score, 

330 metadata=metadata 

331 ) 

332 results.append(result) 

333 

334 # Apply filters if provided 

335 if filters: 

336 results = BaseSearchLogic.filter_results(results, filters=filters) 

337 

338 # Apply min_score filter 

339 min_score = kwargs.get('min_score', 0.0) 

340 if min_score > 0: 

341 results = BaseSearchLogic.filter_results(results, min_score=min_score) 

342 

343 # Deduplicate if requested 

344 if kwargs.get('deduplicate', False): 

345 results = BaseSearchLogic.deduplicate_results(results, key='content') 

346 

347 return results[:k] 

348 

349 except Exception: 

350 # Log error if needed 

351 return []