Coverage for src/dataknobs_llm/prompts/adapters/resource_adapter.py: 98%

59 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 13:51 -0700

1"""Resource adapter interfaces for pluggable data sources. 

2 

3This module defines the adapter pattern for accessing external resources 

4(databases, vector stores, configuration systems, etc.) in both synchronous 

5and asynchronous contexts. 

6 

7Key concepts: 

8- Separate sync (ResourceAdapter) and async (AsyncResourceAdapter) interfaces 

9- Shared base class (ResourceAdapterBase) for common functionality 

10- BaseSearchLogic for reusable search operations 

11- No mixing of sync/async - builders require matching adapter types 

12""" 

13 

14from abc import ABC, abstractmethod 

15from typing import Any, Dict, List 

16import logging 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class ResourceAdapterBase: 

22 """Base class with shared functionality for both sync and async adapters. 

23 

24 This class provides: 

25 - Adapter name and metadata management 

26 - Metadata caching 

27 - Helper methods for type checking 

28 - Common initialization logic 

29 """ 

30 

31 def __init__(self, name: str = "adapter", metadata: Dict[str, Any] | None = None): 

32 """Initialize the resource adapter base. 

33 

34 Args: 

35 name: Adapter identifier (used in logs and error messages) 

36 metadata: Optional metadata about this adapter 

37 """ 

38 self._name = name 

39 self._metadata = metadata or {} 

40 self._metadata_cache: Dict[str, Any] | None = None 

41 

42 @property 

43 def name(self) -> str: 

44 """Get the adapter name.""" 

45 return self._name 

46 

47 def is_async(self) -> bool: 

48 """Check if this is an async adapter. 

49 

50 Returns: 

51 True if this adapter implements AsyncResourceAdapter 

52 """ 

53 return isinstance(self, AsyncResourceAdapter) 

54 

55 def get_metadata(self) -> Dict[str, Any]: 

56 """Get metadata about this adapter. 

57 

58 Returns: 

59 Dictionary with adapter metadata 

60 """ 

61 return { 

62 "name": self._name, 

63 "type": "async" if self.is_async() else "sync", 

64 "class": self.__class__.__name__, 

65 **self._metadata 

66 } 

67 

68 def __repr__(self) -> str: 

69 """Return a string representation of this adapter.""" 

70 adapter_type = "async" if self.is_async() else "sync" 

71 return f"{self.__class__.__name__}(name={self._name!r}, type={adapter_type})" 

72 

73 

74class ResourceAdapter(ResourceAdapterBase, ABC): 

75 """Synchronous resource adapter interface. 

76 

77 Adapters implementing this interface provide synchronous access to 

78 external resources for parameter resolution and RAG searches. 

79 

80 All methods are synchronous (blocking). 

81 """ 

82 

83 @abstractmethod 

84 def get_value( 

85 self, 

86 key: str, 

87 default: Any = None, 

88 context: Dict[str, Any] | None = None 

89 ) -> Any: 

90 """Retrieve a value by key from the resource. 

91 

92 Args: 

93 key: The key to look up 

94 default: Default value if key not found 

95 context: Optional context for the lookup (e.g., user ID, session) 

96 

97 Returns: 

98 The value associated with the key, or default if not found 

99 """ 

100 pass 

101 

102 @abstractmethod 

103 def search( 

104 self, 

105 query: str, 

106 k: int = 5, 

107 filters: Dict[str, Any] | None = None, 

108 **kwargs: Any 

109 ) -> List[Dict[str, Any]]: 

110 """Perform a search query against the resource. 

111 

112 Args: 

113 query: Search query string 

114 k: Number of results to return (default: 5) 

115 filters: Optional filters to apply to the search 

116 **kwargs: Additional adapter-specific search parameters 

117 

118 Returns: 

119 List of search results as dictionaries 

120 """ 

121 pass 

122 

123 def batch_get_values( 

124 self, 

125 keys: List[str], 

126 default: Any = None, 

127 context: Dict[str, Any] | None = None 

128 ) -> Dict[str, Any]: 

129 """Retrieve multiple values by keys. 

130 

131 Default implementation calls get_value() for each key. 

132 Adapters can override for more efficient batch operations. 

133 

134 Args: 

135 keys: List of keys to look up 

136 default: Default value for keys not found 

137 context: Optional context for the lookup 

138 

139 Returns: 

140 Dictionary mapping keys to their values 

141 """ 

142 return {key: self.get_value(key, default, context) for key in keys} 

143 

144 

145class AsyncResourceAdapter(ResourceAdapterBase, ABC): 

146 """Asynchronous resource adapter interface. 

147 

148 Adapters implementing this interface provide asynchronous access to 

149 external resources for parameter resolution and RAG searches. 

150 

151 All methods are asynchronous (non-blocking). 

152 """ 

153 

154 @abstractmethod 

155 async def get_value( 

156 self, 

157 key: str, 

158 default: Any = None, 

159 context: Dict[str, Any] | None = None 

160 ) -> Any: 

161 """Retrieve a value by key from the resource (async). 

162 

163 Args: 

164 key: The key to look up 

165 default: Default value if key not found 

166 context: Optional context for the lookup (e.g., user ID, session) 

167 

168 Returns: 

169 The value associated with the key, or default if not found 

170 """ 

171 pass 

172 

173 @abstractmethod 

174 async def search( 

175 self, 

176 query: str, 

177 k: int = 5, 

178 filters: Dict[str, Any] | None = None, 

179 **kwargs: Any 

180 ) -> List[Dict[str, Any]]: 

181 """Perform a search query against the resource (async). 

182 

183 Args: 

184 query: Search query string 

185 k: Number of results to return (default: 5) 

186 filters: Optional filters to apply to the search 

187 **kwargs: Additional adapter-specific search parameters 

188 

189 Returns: 

190 List of search results as dictionaries 

191 """ 

192 pass 

193 

194 async def batch_get_values( 

195 self, 

196 keys: List[str], 

197 default: Any = None, 

198 context: Dict[str, Any] | None = None 

199 ) -> Dict[str, Any]: 

200 """Retrieve multiple values by keys (async). 

201 

202 Default implementation calls get_value() concurrently for each key. 

203 Adapters can override for more efficient batch operations. 

204 

205 Args: 

206 keys: List of keys to look up 

207 default: Default value for keys not found 

208 context: Optional context for the lookup 

209 

210 Returns: 

211 Dictionary mapping keys to their values 

212 """ 

213 import asyncio 

214 tasks = [self.get_value(key, default, context) for key in keys] 

215 values = await asyncio.gather(*tasks) 

216 return dict(zip(keys, values, strict=True)) 

217 

218 

219class BaseSearchLogic: 

220 """Shared search logic utilities for both sync and async adapters. 

221 

222 This class provides helper methods for common search operations: 

223 - Result formatting and filtering 

224 - Score normalization 

225 - Result deduplication 

226 - Metadata extraction 

227 """ 

228 

229 @staticmethod 

230 def format_search_result( 

231 item: Any, 

232 score: float | None = None, 

233 metadata: Dict[str, Any] | None = None 

234 ) -> Dict[str, Any]: 

235 """Format a search result into a standardized dictionary. 

236 

237 Args: 

238 item: The search result item (could be string, dict, or object) 

239 score: Optional relevance score 

240 metadata: Optional metadata about the result 

241 

242 Returns: 

243 Formatted result dictionary with 'content', 'score', 'metadata' 

244 """ 

245 result: Dict[str, Any] = {} 

246 

247 # Extract content 

248 if isinstance(item, str): 

249 result["content"] = item 

250 elif isinstance(item, dict): 

251 # Try common content keys 

252 result["content"] = item.get("content") or item.get("text") or str(item) 

253 # Preserve other fields as metadata 

254 metadata = {**item, **(metadata or {})} 

255 else: 

256 result["content"] = str(item) 

257 

258 # Add score if provided 

259 if score is not None: 

260 result["score"] = score 

261 

262 # Add metadata if provided 

263 if metadata: 

264 result["metadata"] = metadata 

265 

266 return result 

267 

268 @staticmethod 

269 def filter_results( 

270 results: List[Dict[str, Any]], 

271 filters: Dict[str, Any] | None = None, 

272 min_score: float | None = None 

273 ) -> List[Dict[str, Any]]: 

274 """Filter search results based on criteria. 

275 

276 Args: 

277 results: List of search result dictionaries 

278 filters: Dictionary of field filters (exact match) 

279 min_score: Minimum score threshold 

280 

281 Returns: 

282 Filtered list of results 

283 """ 

284 filtered = results 

285 

286 # Apply score threshold 

287 if min_score is not None: 

288 filtered = [r for r in filtered if r.get("score", 0.0) >= min_score] 

289 

290 # Apply field filters 

291 if filters: 

292 for key, value in filters.items(): 

293 filtered = [ 

294 r for r in filtered 

295 if r.get(key) == value or r.get("metadata", {}).get(key) == value 

296 ] 

297 

298 return filtered 

299 

300 @staticmethod 

301 def deduplicate_results( 

302 results: List[Dict[str, Any]], 

303 key: str = "content" 

304 ) -> List[Dict[str, Any]]: 

305 """Remove duplicate results based on a key. 

306 

307 Args: 

308 results: List of search result dictionaries 

309 key: Key to use for deduplication (default: "content") 

310 

311 Returns: 

312 Deduplicated list of results (preserves order, keeps first occurrence) 

313 """ 

314 seen = set() 

315 deduplicated = [] 

316 

317 for result in results: 

318 value = result.get(key) 

319 if value not in seen: 

320 seen.add(value) 

321 deduplicated.append(result) 

322 

323 return deduplicated