Coverage for src/dataknobs_common/registry.py: 28%
174 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:52 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:52 -0700
1"""Generic registry pattern for managing named items.
3This module provides reusable registry implementations that packages can extend
4to manage collections of named items (tools, bots, resources, etc.).
6The registry patterns support:
7- Thread-safe item management
8- Optional caching with TTL
9- Optional metrics collection
10- Generic typing for type safety
11- Both sync and async variants
13Example:
14 ```python
15 from dataknobs_common.registry import Registry
17 # Create a simple registry
18 class ToolRegistry(Registry[Tool]):
19 def __init__(self):
20 super().__init__("tools")
22 def register_tool(self, tool: Tool) -> None:
23 self.register(tool.name, tool, metadata={"type": "tool"})
25 registry = ToolRegistry()
26 registry.register_tool(my_tool)
27 tool = registry.get("my_tool")
28 ```
30With Caching:
31 ```python
32 from dataknobs_common.registry import CachedRegistry
34 class BotRegistry(CachedRegistry[Bot]):
35 def __init__(self):
36 super().__init__("bots", cache_ttl=300)
38 def get_or_create_bot(self, client_id: str) -> Bot:
39 return self.get_cached(
40 client_id,
41 factory=lambda: self._create_bot(client_id)
42 )
43 ```
44"""
46import asyncio
47import threading
48import time
49from typing import (
50 Any,
51 Callable,
52 Dict,
53 Generic,
54 List,
55 TypeVar,
56)
58from dataknobs_common.exceptions import NotFoundError, OperationError
60T = TypeVar("T")
63class Registry(Generic[T]):
64 """Base registry for managing named items with optional metrics.
66 This is a thread-safe registry that manages a collection of items by
67 unique keys. It provides core operations for registration, lookup,
68 and enumeration.
70 The registry is generic, so you can specify the type of items it
71 manages for better type safety.
73 Attributes:
74 name: Name of the registry (for logging/debugging)
76 Args:
77 name: Name for this registry instance
78 enable_metrics: Whether to track registration metrics
80 Example:
81 >>> registry = Registry[str]("my_registry")
82 >>> registry.register("key1", "value1")
83 >>> registry.get("key1")
84 'value1'
85 >>> registry.count()
86 1
87 """
89 def __init__(self, name: str, enable_metrics: bool = False):
90 """Initialize the registry.
92 Args:
93 name: Registry name for identification
94 enable_metrics: Enable metrics tracking
95 """
96 self._name = name
97 self._items: Dict[str, T] = {}
98 self._lock = threading.RLock()
99 self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None
101 @property
102 def name(self) -> str:
103 """Get registry name."""
104 return self._name
106 def register(
107 self,
108 key: str,
109 item: T,
110 metadata: Dict[str, Any] | None = None,
111 allow_overwrite: bool = False,
112 ) -> None:
113 """Register an item by key.
115 Args:
116 key: Unique identifier for the item
117 item: Item to register
118 metadata: Optional metadata about the item
119 allow_overwrite: Whether to allow overwriting existing items
121 Raises:
122 OperationError: If item already exists and allow_overwrite is False
124 Example:
125 >>> registry.register("tool1", my_tool, metadata={"version": "1.0"})
126 """
127 with self._lock:
128 if not allow_overwrite and key in self._items:
129 raise OperationError(
130 f"Item '{key}' already registered in {self._name}",
131 context={"key": key, "registry": self._name},
132 )
134 self._items[key] = item
136 if self._metrics is not None:
137 self._metrics[key] = {
138 "registered_at": time.time(),
139 "metadata": metadata or {},
140 }
142 def unregister(self, key: str) -> T:
143 """Unregister and return an item by key.
145 Args:
146 key: Key of item to unregister
148 Returns:
149 The unregistered item
151 Raises:
152 NotFoundError: If item not found
154 Example:
155 >>> item = registry.unregister("tool1")
156 """
157 with self._lock:
158 if key not in self._items:
159 raise NotFoundError(
160 f"Item not found: {key}",
161 context={"key": key, "registry": self._name},
162 )
164 item = self._items.pop(key)
166 if self._metrics is not None and key in self._metrics:
167 del self._metrics[key]
169 return item
171 def get(self, key: str) -> T:
172 """Get an item by key.
174 Args:
175 key: Key of item to retrieve
177 Returns:
178 The registered item
180 Raises:
181 NotFoundError: If item not found
183 Example:
184 >>> item = registry.get("tool1")
185 """
186 with self._lock:
187 if key not in self._items:
188 raise NotFoundError(
189 f"Item not found: {key}",
190 context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())},
191 )
192 return self._items[key]
194 def get_optional(self, key: str) -> T | None:
195 """Get an item by key, returning None if not found.
197 Args:
198 key: Key of item to retrieve
200 Returns:
201 The registered item or None
203 Example:
204 >>> item = registry.get_optional("tool1")
205 >>> if item is None:
206 ... print("Not found")
207 """
208 with self._lock:
209 return self._items.get(key)
211 def has(self, key: str) -> bool:
212 """Check if item exists.
214 Args:
215 key: Key to check
217 Returns:
218 True if item exists
220 Example:
221 >>> if registry.has("tool1"):
222 ... print("Found")
223 """
224 with self._lock:
225 return key in self._items
227 def list_keys(self) -> List[str]:
228 """List all registered keys.
230 Returns:
231 List of registered keys
233 Example:
234 >>> keys = registry.list_keys()
235 >>> print(keys)
236 ['tool1', 'tool2']
237 """
238 with self._lock:
239 return list(self._items.keys())
241 def list_items(self) -> List[T]:
242 """List all registered items.
244 Returns:
245 List of registered items
247 Example:
248 >>> items = registry.list_items()
249 >>> for item in items:
250 ... print(item)
251 """
252 with self._lock:
253 return list(self._items.values())
255 def items(self) -> List[tuple[str, T]]:
256 """Get all key-item pairs.
258 Returns:
259 List of (key, item) tuples
261 Example:
262 >>> for key, item in registry.items():
263 ... print(f"{key}: {item}")
264 """
265 with self._lock:
266 return list(self._items.items())
268 def count(self) -> int:
269 """Get count of registered items.
271 Returns:
272 Number of items in registry
274 Example:
275 >>> count = registry.count()
276 >>> print(f"Registry has {count} items")
277 """
278 with self._lock:
279 return len(self._items)
281 def clear(self) -> None:
282 """Clear all items from registry.
284 Example:
285 >>> registry.clear()
286 >>> registry.count()
287 0
288 """
289 with self._lock:
290 self._items.clear()
291 if self._metrics is not None:
292 self._metrics.clear()
294 def get_metrics(self, key: str | None = None) -> Dict[str, Any]:
295 """Get registration metrics.
297 Args:
298 key: Optional specific key to get metrics for
300 Returns:
301 Metrics dictionary
303 Example:
304 >>> metrics = registry.get_metrics()
305 >>> print(metrics)
306 {'tool1': {'registered_at': 1699456789.0, 'metadata': {}}}
307 """
308 with self._lock:
309 if self._metrics is None:
310 return {}
312 if key:
313 return self._metrics.get(key, {})
315 return dict(self._metrics)
317 def __len__(self) -> int:
318 """Get number of registered items using len()."""
319 return self.count()
321 def __contains__(self, key: str) -> bool:
322 """Check if item exists using 'in' operator."""
323 return self.has(key)
325 def __iter__(self):
326 """Iterate over registered items."""
327 return iter(self.list_items())
330class CachedRegistry(Registry[T]):
331 """Registry with time-based caching support.
333 Extends the base registry with caching capabilities. Items can be
334 retrieved from cache with automatic expiration and refresh based on TTL.
335 Implements LRU eviction when cache size exceeds limits.
337 Args:
338 name: Registry name
339 cache_ttl: Cache time-to-live in seconds (default: 300)
340 max_cache_size: Maximum number of cached items (default: 1000)
342 Example:
343 >>> registry = CachedRegistry[Bot]("bots", cache_ttl=300)
344 >>> bot = registry.get_cached(
345 ... "client1",
346 ... factory=lambda: create_bot("client1")
347 ... )
348 """
350 def __init__(
351 self,
352 name: str,
353 cache_ttl: int = 300,
354 max_cache_size: int = 1000,
355 ):
356 """Initialize cached registry.
358 Args:
359 name: Registry name
360 cache_ttl: Time-to-live for cached items in seconds
361 max_cache_size: Maximum cache size before eviction
362 """
363 super().__init__(name, enable_metrics=True)
364 self._cache: Dict[str, tuple[T, float]] = {}
365 self._cache_ttl = cache_ttl
366 self._max_cache_size = max_cache_size
367 self._cache_hits = 0
368 self._cache_misses = 0
370 def get_cached(
371 self,
372 key: str,
373 factory: Callable[[], T],
374 force_refresh: bool = False,
375 ) -> T:
376 """Get item from cache with automatic refresh.
378 If item exists in cache and is not expired, returns cached version.
379 Otherwise, calls factory to create new item and caches it.
381 Args:
382 key: Cache key
383 factory: Callable that creates the item if not cached
384 force_refresh: Force refresh even if cached
386 Returns:
387 Cached or newly created item
389 Example:
390 >>> def create_bot():
391 ... return Bot("my-bot")
392 >>> bot = registry.get_cached("bot1", create_bot)
393 """
394 with self._lock:
395 # Check cache
396 if not force_refresh and key in self._cache:
397 item, cached_at = self._cache[key]
398 if time.time() - cached_at < self._cache_ttl:
399 self._cache_hits += 1
400 return item
402 # Cache miss - create new item
403 self._cache_misses += 1
404 item = factory()
405 self._cache[key] = (item, time.time())
407 # Evict if cache too large
408 if len(self._cache) > self._max_cache_size:
409 self._evict_oldest()
411 return item
413 def invalidate_cache(self, key: str | None = None) -> None:
414 """Invalidate cache for a key or all keys.
416 Args:
417 key: Specific key to invalidate, or None to invalidate all
419 Example:
420 >>> registry.invalidate_cache("bot1") # Invalidate one
421 >>> registry.invalidate_cache() # Invalidate all
422 """
423 with self._lock:
424 if key:
425 if key in self._cache:
426 del self._cache[key]
427 else:
428 self._cache.clear()
430 def get_cache_stats(self) -> Dict[str, Any]:
431 """Get cache statistics.
433 Returns:
434 Dictionary with cache statistics
436 Example:
437 >>> stats = registry.get_cache_stats()
438 >>> print(f"Hit rate: {stats['hit_rate']:.2%}")
439 """
440 with self._lock:
441 total = self._cache_hits + self._cache_misses
442 hit_rate = self._cache_hits / total if total > 0 else 0.0
444 return {
445 "size": len(self._cache),
446 "max_size": self._max_cache_size,
447 "ttl_seconds": self._cache_ttl,
448 "hits": self._cache_hits,
449 "misses": self._cache_misses,
450 "total_requests": total,
451 "hit_rate": hit_rate,
452 }
454 def _evict_oldest(self) -> None:
455 """Evict oldest cache entries (LRU).
457 Removes oldest 10% of cache entries when max size is exceeded.
458 """
459 sorted_items = sorted(self._cache.items(), key=lambda x: x[1][1])
460 num_to_remove = max(1, len(sorted_items) // 10)
462 for key, _ in sorted_items[:num_to_remove]:
463 del self._cache[key]
466class AsyncRegistry(Generic[T]):
467 """Async-safe registry for managing named items.
469 Similar to Registry but uses asyncio locks for async-safe operations.
470 Use this when working in async contexts.
472 Args:
473 name: Registry name
474 enable_metrics: Enable metrics tracking
476 Example:
477 >>> registry = AsyncRegistry[Tool]("tools")
478 >>> await registry.register("tool1", my_tool)
479 >>> tool = await registry.get("tool1")
480 """
482 def __init__(self, name: str, enable_metrics: bool = False):
483 """Initialize async registry.
485 Args:
486 name: Registry name
487 enable_metrics: Enable metrics tracking
488 """
489 self._name = name
490 self._items: Dict[str, T] = {}
491 self._lock = asyncio.Lock()
492 self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None
494 @property
495 def name(self) -> str:
496 """Get registry name."""
497 return self._name
499 async def register(
500 self,
501 key: str,
502 item: T,
503 metadata: Dict[str, Any] | None = None,
504 allow_overwrite: bool = False,
505 ) -> None:
506 """Register an item by key.
508 Args:
509 key: Unique identifier
510 item: Item to register
511 metadata: Optional metadata
512 allow_overwrite: Allow overwriting existing items
514 Raises:
515 OperationError: If item exists and allow_overwrite is False
516 """
517 async with self._lock:
518 if not allow_overwrite and key in self._items:
519 raise OperationError(
520 f"Item '{key}' already registered in {self._name}",
521 context={"key": key, "registry": self._name},
522 )
524 self._items[key] = item
526 if self._metrics is not None:
527 self._metrics[key] = {
528 "registered_at": time.time(),
529 "metadata": metadata or {},
530 }
532 async def unregister(self, key: str) -> T:
533 """Unregister and return an item.
535 Args:
536 key: Key to unregister
538 Returns:
539 The unregistered item
541 Raises:
542 NotFoundError: If item not found
543 """
544 async with self._lock:
545 if key not in self._items:
546 raise NotFoundError(
547 f"Item not found: {key}",
548 context={"key": key, "registry": self._name},
549 )
551 item = self._items.pop(key)
553 if self._metrics is not None and key in self._metrics:
554 del self._metrics[key]
556 return item
558 async def get(self, key: str) -> T:
559 """Get an item by key.
561 Args:
562 key: Key to retrieve
564 Returns:
565 The registered item
567 Raises:
568 NotFoundError: If item not found
569 """
570 async with self._lock:
571 if key not in self._items:
572 raise NotFoundError(
573 f"Item not found: {key}",
574 context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())},
575 )
576 return self._items[key]
578 async def get_optional(self, key: str) -> T | None:
579 """Get an item, returning None if not found.
581 Args:
582 key: Key to retrieve
584 Returns:
585 The item or None
586 """
587 async with self._lock:
588 return self._items.get(key)
590 async def has(self, key: str) -> bool:
591 """Check if item exists.
593 Args:
594 key: Key to check
596 Returns:
597 True if exists
598 """
599 async with self._lock:
600 return key in self._items
602 async def list_keys(self) -> List[str]:
603 """List all registered keys.
605 Returns:
606 List of keys
607 """
608 async with self._lock:
609 return list(self._items.keys())
611 async def list_items(self) -> List[T]:
612 """List all registered items.
614 Returns:
615 List of items
616 """
617 async with self._lock:
618 return list(self._items.values())
620 async def items(self) -> List[tuple[str, T]]:
621 """Get all key-item pairs.
623 Returns:
624 List of (key, item) tuples
625 """
626 async with self._lock:
627 return list(self._items.items())
629 async def count(self) -> int:
630 """Get count of registered items.
632 Returns:
633 Number of items
634 """
635 async with self._lock:
636 return len(self._items)
638 async def clear(self) -> None:
639 """Clear all items."""
640 async with self._lock:
641 self._items.clear()
642 if self._metrics is not None:
643 self._metrics.clear()
645 async def get_metrics(self, key: str | None = None) -> Dict[str, Any]:
646 """Get registration metrics.
648 Args:
649 key: Optional specific key
651 Returns:
652 Metrics dictionary
653 """
654 async with self._lock:
655 if self._metrics is None:
656 return {}
658 if key:
659 return self._metrics.get(key, {})
661 return dict(self._metrics)
663 def __len__(self) -> int:
664 """Get number of registered items using len()."""
665 # Note: This is synchronous but safe since it just reads the dict
666 return len(self._items)
668 def __contains__(self, key: str) -> bool:
669 """Check if item exists using 'in' operator."""
670 # Note: This is synchronous but safe since it just reads the dict
671 return key in self._items
673 def __iter__(self):
674 """Iterate over registered items."""
675 # Note: Returns iterator over current snapshot
676 return iter(list(self._items.values()))
679__all__ = [
680 "Registry",
681 "CachedRegistry",
682 "AsyncRegistry",
683]