Coverage for src/dataknobs_common/registry.py: 28%

174 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 14:52 -0700

1"""Generic registry pattern for managing named items. 

2 

3This module provides reusable registry implementations that packages can extend 

4to manage collections of named items (tools, bots, resources, etc.). 

5 

6The registry patterns support: 

7- Thread-safe item management 

8- Optional caching with TTL 

9- Optional metrics collection 

10- Generic typing for type safety 

11- Both sync and async variants 

12 

13Example: 

14 ```python 

15 from dataknobs_common.registry import Registry 

16 

17 # Create a simple registry 

18 class ToolRegistry(Registry[Tool]): 

19 def __init__(self): 

20 super().__init__("tools") 

21 

22 def register_tool(self, tool: Tool) -> None: 

23 self.register(tool.name, tool, metadata={"type": "tool"}) 

24 

25 registry = ToolRegistry() 

26 registry.register_tool(my_tool) 

27 tool = registry.get("my_tool") 

28 ``` 

29 

30With Caching: 

31 ```python 

32 from dataknobs_common.registry import CachedRegistry 

33 

34 class BotRegistry(CachedRegistry[Bot]): 

35 def __init__(self): 

36 super().__init__("bots", cache_ttl=300) 

37 

38 def get_or_create_bot(self, client_id: str) -> Bot: 

39 return self.get_cached( 

40 client_id, 

41 factory=lambda: self._create_bot(client_id) 

42 ) 

43 ``` 

44""" 

45 

46import asyncio 

47import threading 

48import time 

49from typing import ( 

50 Any, 

51 Callable, 

52 Dict, 

53 Generic, 

54 List, 

55 TypeVar, 

56) 

57 

58from dataknobs_common.exceptions import NotFoundError, OperationError 

59 

60T = TypeVar("T") 

61 

62 

63class Registry(Generic[T]): 

64 """Base registry for managing named items with optional metrics. 

65 

66 This is a thread-safe registry that manages a collection of items by 

67 unique keys. It provides core operations for registration, lookup, 

68 and enumeration. 

69 

70 The registry is generic, so you can specify the type of items it 

71 manages for better type safety. 

72 

73 Attributes: 

74 name: Name of the registry (for logging/debugging) 

75 

76 Args: 

77 name: Name for this registry instance 

78 enable_metrics: Whether to track registration metrics 

79 

80 Example: 

81 >>> registry = Registry[str]("my_registry") 

82 >>> registry.register("key1", "value1") 

83 >>> registry.get("key1") 

84 'value1' 

85 >>> registry.count() 

86 1 

87 """ 

88 

89 def __init__(self, name: str, enable_metrics: bool = False): 

90 """Initialize the registry. 

91 

92 Args: 

93 name: Registry name for identification 

94 enable_metrics: Enable metrics tracking 

95 """ 

96 self._name = name 

97 self._items: Dict[str, T] = {} 

98 self._lock = threading.RLock() 

99 self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None 

100 

101 @property 

102 def name(self) -> str: 

103 """Get registry name.""" 

104 return self._name 

105 

106 def register( 

107 self, 

108 key: str, 

109 item: T, 

110 metadata: Dict[str, Any] | None = None, 

111 allow_overwrite: bool = False, 

112 ) -> None: 

113 """Register an item by key. 

114 

115 Args: 

116 key: Unique identifier for the item 

117 item: Item to register 

118 metadata: Optional metadata about the item 

119 allow_overwrite: Whether to allow overwriting existing items 

120 

121 Raises: 

122 OperationError: If item already exists and allow_overwrite is False 

123 

124 Example: 

125 >>> registry.register("tool1", my_tool, metadata={"version": "1.0"}) 

126 """ 

127 with self._lock: 

128 if not allow_overwrite and key in self._items: 

129 raise OperationError( 

130 f"Item '{key}' already registered in {self._name}", 

131 context={"key": key, "registry": self._name}, 

132 ) 

133 

134 self._items[key] = item 

135 

136 if self._metrics is not None: 

137 self._metrics[key] = { 

138 "registered_at": time.time(), 

139 "metadata": metadata or {}, 

140 } 

141 

142 def unregister(self, key: str) -> T: 

143 """Unregister and return an item by key. 

144 

145 Args: 

146 key: Key of item to unregister 

147 

148 Returns: 

149 The unregistered item 

150 

151 Raises: 

152 NotFoundError: If item not found 

153 

154 Example: 

155 >>> item = registry.unregister("tool1") 

156 """ 

157 with self._lock: 

158 if key not in self._items: 

159 raise NotFoundError( 

160 f"Item not found: {key}", 

161 context={"key": key, "registry": self._name}, 

162 ) 

163 

164 item = self._items.pop(key) 

165 

166 if self._metrics is not None and key in self._metrics: 

167 del self._metrics[key] 

168 

169 return item 

170 

171 def get(self, key: str) -> T: 

172 """Get an item by key. 

173 

174 Args: 

175 key: Key of item to retrieve 

176 

177 Returns: 

178 The registered item 

179 

180 Raises: 

181 NotFoundError: If item not found 

182 

183 Example: 

184 >>> item = registry.get("tool1") 

185 """ 

186 with self._lock: 

187 if key not in self._items: 

188 raise NotFoundError( 

189 f"Item not found: {key}", 

190 context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())}, 

191 ) 

192 return self._items[key] 

193 

194 def get_optional(self, key: str) -> T | None: 

195 """Get an item by key, returning None if not found. 

196 

197 Args: 

198 key: Key of item to retrieve 

199 

200 Returns: 

201 The registered item or None 

202 

203 Example: 

204 >>> item = registry.get_optional("tool1") 

205 >>> if item is None: 

206 ... print("Not found") 

207 """ 

208 with self._lock: 

209 return self._items.get(key) 

210 

211 def has(self, key: str) -> bool: 

212 """Check if item exists. 

213 

214 Args: 

215 key: Key to check 

216 

217 Returns: 

218 True if item exists 

219 

220 Example: 

221 >>> if registry.has("tool1"): 

222 ... print("Found") 

223 """ 

224 with self._lock: 

225 return key in self._items 

226 

227 def list_keys(self) -> List[str]: 

228 """List all registered keys. 

229 

230 Returns: 

231 List of registered keys 

232 

233 Example: 

234 >>> keys = registry.list_keys() 

235 >>> print(keys) 

236 ['tool1', 'tool2'] 

237 """ 

238 with self._lock: 

239 return list(self._items.keys()) 

240 

241 def list_items(self) -> List[T]: 

242 """List all registered items. 

243 

244 Returns: 

245 List of registered items 

246 

247 Example: 

248 >>> items = registry.list_items() 

249 >>> for item in items: 

250 ... print(item) 

251 """ 

252 with self._lock: 

253 return list(self._items.values()) 

254 

255 def items(self) -> List[tuple[str, T]]: 

256 """Get all key-item pairs. 

257 

258 Returns: 

259 List of (key, item) tuples 

260 

261 Example: 

262 >>> for key, item in registry.items(): 

263 ... print(f"{key}: {item}") 

264 """ 

265 with self._lock: 

266 return list(self._items.items()) 

267 

268 def count(self) -> int: 

269 """Get count of registered items. 

270 

271 Returns: 

272 Number of items in registry 

273 

274 Example: 

275 >>> count = registry.count() 

276 >>> print(f"Registry has {count} items") 

277 """ 

278 with self._lock: 

279 return len(self._items) 

280 

281 def clear(self) -> None: 

282 """Clear all items from registry. 

283 

284 Example: 

285 >>> registry.clear() 

286 >>> registry.count() 

287 0 

288 """ 

289 with self._lock: 

290 self._items.clear() 

291 if self._metrics is not None: 

292 self._metrics.clear() 

293 

294 def get_metrics(self, key: str | None = None) -> Dict[str, Any]: 

295 """Get registration metrics. 

296 

297 Args: 

298 key: Optional specific key to get metrics for 

299 

300 Returns: 

301 Metrics dictionary 

302 

303 Example: 

304 >>> metrics = registry.get_metrics() 

305 >>> print(metrics) 

306 {'tool1': {'registered_at': 1699456789.0, 'metadata': {}}} 

307 """ 

308 with self._lock: 

309 if self._metrics is None: 

310 return {} 

311 

312 if key: 

313 return self._metrics.get(key, {}) 

314 

315 return dict(self._metrics) 

316 

317 def __len__(self) -> int: 

318 """Get number of registered items using len().""" 

319 return self.count() 

320 

321 def __contains__(self, key: str) -> bool: 

322 """Check if item exists using 'in' operator.""" 

323 return self.has(key) 

324 

325 def __iter__(self): 

326 """Iterate over registered items.""" 

327 return iter(self.list_items()) 

328 

329 

330class CachedRegistry(Registry[T]): 

331 """Registry with time-based caching support. 

332 

333 Extends the base registry with caching capabilities. Items can be 

334 retrieved from cache with automatic expiration and refresh based on TTL. 

335 Implements LRU eviction when cache size exceeds limits. 

336 

337 Args: 

338 name: Registry name 

339 cache_ttl: Cache time-to-live in seconds (default: 300) 

340 max_cache_size: Maximum number of cached items (default: 1000) 

341 

342 Example: 

343 >>> registry = CachedRegistry[Bot]("bots", cache_ttl=300) 

344 >>> bot = registry.get_cached( 

345 ... "client1", 

346 ... factory=lambda: create_bot("client1") 

347 ... ) 

348 """ 

349 

350 def __init__( 

351 self, 

352 name: str, 

353 cache_ttl: int = 300, 

354 max_cache_size: int = 1000, 

355 ): 

356 """Initialize cached registry. 

357 

358 Args: 

359 name: Registry name 

360 cache_ttl: Time-to-live for cached items in seconds 

361 max_cache_size: Maximum cache size before eviction 

362 """ 

363 super().__init__(name, enable_metrics=True) 

364 self._cache: Dict[str, tuple[T, float]] = {} 

365 self._cache_ttl = cache_ttl 

366 self._max_cache_size = max_cache_size 

367 self._cache_hits = 0 

368 self._cache_misses = 0 

369 

370 def get_cached( 

371 self, 

372 key: str, 

373 factory: Callable[[], T], 

374 force_refresh: bool = False, 

375 ) -> T: 

376 """Get item from cache with automatic refresh. 

377 

378 If item exists in cache and is not expired, returns cached version. 

379 Otherwise, calls factory to create new item and caches it. 

380 

381 Args: 

382 key: Cache key 

383 factory: Callable that creates the item if not cached 

384 force_refresh: Force refresh even if cached 

385 

386 Returns: 

387 Cached or newly created item 

388 

389 Example: 

390 >>> def create_bot(): 

391 ... return Bot("my-bot") 

392 >>> bot = registry.get_cached("bot1", create_bot) 

393 """ 

394 with self._lock: 

395 # Check cache 

396 if not force_refresh and key in self._cache: 

397 item, cached_at = self._cache[key] 

398 if time.time() - cached_at < self._cache_ttl: 

399 self._cache_hits += 1 

400 return item 

401 

402 # Cache miss - create new item 

403 self._cache_misses += 1 

404 item = factory() 

405 self._cache[key] = (item, time.time()) 

406 

407 # Evict if cache too large 

408 if len(self._cache) > self._max_cache_size: 

409 self._evict_oldest() 

410 

411 return item 

412 

413 def invalidate_cache(self, key: str | None = None) -> None: 

414 """Invalidate cache for a key or all keys. 

415 

416 Args: 

417 key: Specific key to invalidate, or None to invalidate all 

418 

419 Example: 

420 >>> registry.invalidate_cache("bot1") # Invalidate one 

421 >>> registry.invalidate_cache() # Invalidate all 

422 """ 

423 with self._lock: 

424 if key: 

425 if key in self._cache: 

426 del self._cache[key] 

427 else: 

428 self._cache.clear() 

429 

430 def get_cache_stats(self) -> Dict[str, Any]: 

431 """Get cache statistics. 

432 

433 Returns: 

434 Dictionary with cache statistics 

435 

436 Example: 

437 >>> stats = registry.get_cache_stats() 

438 >>> print(f"Hit rate: {stats['hit_rate']:.2%}") 

439 """ 

440 with self._lock: 

441 total = self._cache_hits + self._cache_misses 

442 hit_rate = self._cache_hits / total if total > 0 else 0.0 

443 

444 return { 

445 "size": len(self._cache), 

446 "max_size": self._max_cache_size, 

447 "ttl_seconds": self._cache_ttl, 

448 "hits": self._cache_hits, 

449 "misses": self._cache_misses, 

450 "total_requests": total, 

451 "hit_rate": hit_rate, 

452 } 

453 

454 def _evict_oldest(self) -> None: 

455 """Evict oldest cache entries (LRU). 

456 

457 Removes oldest 10% of cache entries when max size is exceeded. 

458 """ 

459 sorted_items = sorted(self._cache.items(), key=lambda x: x[1][1]) 

460 num_to_remove = max(1, len(sorted_items) // 10) 

461 

462 for key, _ in sorted_items[:num_to_remove]: 

463 del self._cache[key] 

464 

465 

466class AsyncRegistry(Generic[T]): 

467 """Async-safe registry for managing named items. 

468 

469 Similar to Registry but uses asyncio locks for async-safe operations. 

470 Use this when working in async contexts. 

471 

472 Args: 

473 name: Registry name 

474 enable_metrics: Enable metrics tracking 

475 

476 Example: 

477 >>> registry = AsyncRegistry[Tool]("tools") 

478 >>> await registry.register("tool1", my_tool) 

479 >>> tool = await registry.get("tool1") 

480 """ 

481 

482 def __init__(self, name: str, enable_metrics: bool = False): 

483 """Initialize async registry. 

484 

485 Args: 

486 name: Registry name 

487 enable_metrics: Enable metrics tracking 

488 """ 

489 self._name = name 

490 self._items: Dict[str, T] = {} 

491 self._lock = asyncio.Lock() 

492 self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None 

493 

494 @property 

495 def name(self) -> str: 

496 """Get registry name.""" 

497 return self._name 

498 

499 async def register( 

500 self, 

501 key: str, 

502 item: T, 

503 metadata: Dict[str, Any] | None = None, 

504 allow_overwrite: bool = False, 

505 ) -> None: 

506 """Register an item by key. 

507 

508 Args: 

509 key: Unique identifier 

510 item: Item to register 

511 metadata: Optional metadata 

512 allow_overwrite: Allow overwriting existing items 

513 

514 Raises: 

515 OperationError: If item exists and allow_overwrite is False 

516 """ 

517 async with self._lock: 

518 if not allow_overwrite and key in self._items: 

519 raise OperationError( 

520 f"Item '{key}' already registered in {self._name}", 

521 context={"key": key, "registry": self._name}, 

522 ) 

523 

524 self._items[key] = item 

525 

526 if self._metrics is not None: 

527 self._metrics[key] = { 

528 "registered_at": time.time(), 

529 "metadata": metadata or {}, 

530 } 

531 

532 async def unregister(self, key: str) -> T: 

533 """Unregister and return an item. 

534 

535 Args: 

536 key: Key to unregister 

537 

538 Returns: 

539 The unregistered item 

540 

541 Raises: 

542 NotFoundError: If item not found 

543 """ 

544 async with self._lock: 

545 if key not in self._items: 

546 raise NotFoundError( 

547 f"Item not found: {key}", 

548 context={"key": key, "registry": self._name}, 

549 ) 

550 

551 item = self._items.pop(key) 

552 

553 if self._metrics is not None and key in self._metrics: 

554 del self._metrics[key] 

555 

556 return item 

557 

558 async def get(self, key: str) -> T: 

559 """Get an item by key. 

560 

561 Args: 

562 key: Key to retrieve 

563 

564 Returns: 

565 The registered item 

566 

567 Raises: 

568 NotFoundError: If item not found 

569 """ 

570 async with self._lock: 

571 if key not in self._items: 

572 raise NotFoundError( 

573 f"Item not found: {key}", 

574 context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())}, 

575 ) 

576 return self._items[key] 

577 

578 async def get_optional(self, key: str) -> T | None: 

579 """Get an item, returning None if not found. 

580 

581 Args: 

582 key: Key to retrieve 

583 

584 Returns: 

585 The item or None 

586 """ 

587 async with self._lock: 

588 return self._items.get(key) 

589 

590 async def has(self, key: str) -> bool: 

591 """Check if item exists. 

592 

593 Args: 

594 key: Key to check 

595 

596 Returns: 

597 True if exists 

598 """ 

599 async with self._lock: 

600 return key in self._items 

601 

602 async def list_keys(self) -> List[str]: 

603 """List all registered keys. 

604 

605 Returns: 

606 List of keys 

607 """ 

608 async with self._lock: 

609 return list(self._items.keys()) 

610 

611 async def list_items(self) -> List[T]: 

612 """List all registered items. 

613 

614 Returns: 

615 List of items 

616 """ 

617 async with self._lock: 

618 return list(self._items.values()) 

619 

620 async def items(self) -> List[tuple[str, T]]: 

621 """Get all key-item pairs. 

622 

623 Returns: 

624 List of (key, item) tuples 

625 """ 

626 async with self._lock: 

627 return list(self._items.items()) 

628 

629 async def count(self) -> int: 

630 """Get count of registered items. 

631 

632 Returns: 

633 Number of items 

634 """ 

635 async with self._lock: 

636 return len(self._items) 

637 

638 async def clear(self) -> None: 

639 """Clear all items.""" 

640 async with self._lock: 

641 self._items.clear() 

642 if self._metrics is not None: 

643 self._metrics.clear() 

644 

645 async def get_metrics(self, key: str | None = None) -> Dict[str, Any]: 

646 """Get registration metrics. 

647 

648 Args: 

649 key: Optional specific key 

650 

651 Returns: 

652 Metrics dictionary 

653 """ 

654 async with self._lock: 

655 if self._metrics is None: 

656 return {} 

657 

658 if key: 

659 return self._metrics.get(key, {}) 

660 

661 return dict(self._metrics) 

662 

663 def __len__(self) -> int: 

664 """Get number of registered items using len().""" 

665 # Note: This is synchronous but safe since it just reads the dict 

666 return len(self._items) 

667 

668 def __contains__(self, key: str) -> bool: 

669 """Check if item exists using 'in' operator.""" 

670 # Note: This is synchronous but safe since it just reads the dict 

671 return key in self._items 

672 

673 def __iter__(self): 

674 """Iterate over registered items.""" 

675 # Note: Returns iterator over current snapshot 

676 return iter(list(self._items.values())) 

677 

678 

679__all__ = [ 

680 "Registry", 

681 "CachedRegistry", 

682 "AsyncRegistry", 

683]