Coverage for src / dataknobs_common / registry.py: 24%
281 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 17:37 -0700
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 17:37 -0700
1"""Generic registry pattern for managing named items.
3This module provides reusable registry implementations that packages can extend
4to manage collections of named items (tools, bots, resources, etc.).
6The registry patterns support:
7- Thread-safe item management
8- Optional caching with TTL
9- Optional metrics collection
10- Generic typing for type safety
11- Both sync and async variants
13Example:
14 ```python
15 from dataknobs_common.registry import Registry
17 # Create a simple registry
18 class ToolRegistry(Registry[Tool]):
19 def __init__(self):
20 super().__init__("tools")
22 def register_tool(self, tool: Tool) -> None:
23 self.register(tool.name, tool, metadata={"type": "tool"})
25 registry = ToolRegistry()
26 registry.register_tool(my_tool)
27 tool = registry.get("my_tool")
28 ```
30With Caching:
31 ```python
32 from dataknobs_common.registry import CachedRegistry
34 class BotRegistry(CachedRegistry[Bot]):
35 def __init__(self):
36 super().__init__("bots", cache_ttl=300)
38 def get_or_create_bot(self, client_id: str) -> Bot:
39 return self.get_cached(
40 client_id,
41 factory=lambda: self._create_bot(client_id)
42 )
43 ```
44"""
46import asyncio
47import threading
48import time
49from typing import (
50 Any,
51 Callable,
52 Dict,
53 Generic,
54 List,
55 TypeVar,
56)
58from dataknobs_common.exceptions import NotFoundError, OperationError
60T = TypeVar("T")
63class Registry(Generic[T]):
64 """Base registry for managing named items with optional metrics.
66 This is a thread-safe registry that manages a collection of items by
67 unique keys. It provides core operations for registration, lookup,
68 and enumeration.
70 The registry is generic, so you can specify the type of items it
71 manages for better type safety.
73 Attributes:
74 name: Name of the registry (for logging/debugging)
76 Args:
77 name: Name for this registry instance
78 enable_metrics: Whether to track registration metrics
80 Example:
81 ```python
82 registry = Registry[str]("my_registry")
83 registry.register("key1", "value1")
84 registry.get("key1")
85 # 'value1'
86 registry.count()
87 # 1
88 ```
89 """
91 def __init__(self, name: str, enable_metrics: bool = False):
92 """Initialize the registry.
94 Args:
95 name: Registry name for identification
96 enable_metrics: Enable metrics tracking
97 """
98 self._name = name
99 self._items: Dict[str, T] = {}
100 self._lock = threading.RLock()
101 self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None
103 @property
104 def name(self) -> str:
105 """Get registry name."""
106 return self._name
108 def register(
109 self,
110 key: str,
111 item: T,
112 metadata: Dict[str, Any] | None = None,
113 allow_overwrite: bool = False,
114 ) -> None:
115 """Register an item by key.
117 Args:
118 key: Unique identifier for the item
119 item: Item to register
120 metadata: Optional metadata about the item
121 allow_overwrite: Whether to allow overwriting existing items
123 Raises:
124 OperationError: If item already exists and allow_overwrite is False
126 Example:
127 ```python
128 registry.register("tool1", my_tool, metadata={"version": "1.0"})
129 ```
130 """
131 with self._lock:
132 if not allow_overwrite and key in self._items:
133 raise OperationError(
134 f"Item '{key}' already registered in {self._name}",
135 context={"key": key, "registry": self._name},
136 )
138 self._items[key] = item
140 if self._metrics is not None:
141 self._metrics[key] = {
142 "registered_at": time.time(),
143 "metadata": metadata or {},
144 }
146 def unregister(self, key: str) -> T:
147 """Unregister and return an item by key.
149 Args:
150 key: Key of item to unregister
152 Returns:
153 The unregistered item
155 Raises:
156 NotFoundError: If item not found
158 Example:
159 ```python
160 item = registry.unregister("tool1")
161 ```
162 """
163 with self._lock:
164 if key not in self._items:
165 raise NotFoundError(
166 f"Item not found: {key}",
167 context={"key": key, "registry": self._name},
168 )
170 item = self._items.pop(key)
172 if self._metrics is not None and key in self._metrics:
173 del self._metrics[key]
175 return item
177 def get(self, key: str) -> T:
178 """Get an item by key.
180 Args:
181 key: Key of item to retrieve
183 Returns:
184 The registered item
186 Raises:
187 NotFoundError: If item not found
189 Example:
190 ```python
191 item = registry.get("tool1")
192 ```
193 """
194 with self._lock:
195 if key not in self._items:
196 raise NotFoundError(
197 f"Item not found: {key}",
198 context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())},
199 )
200 return self._items[key]
202 def get_optional(self, key: str) -> T | None:
203 """Get an item by key, returning None if not found.
205 Args:
206 key: Key of item to retrieve
208 Returns:
209 The registered item or None
211 Example:
212 ```python
213 item = registry.get_optional("tool1")
214 if item is None:
215 print("Not found")
216 ```
217 """
218 with self._lock:
219 return self._items.get(key)
221 def has(self, key: str) -> bool:
222 """Check if item exists.
224 Args:
225 key: Key to check
227 Returns:
228 True if item exists
230 Example:
231 ```python
232 if registry.has("tool1"):
233 print("Found")
234 ```
235 """
236 with self._lock:
237 return key in self._items
239 def list_keys(self) -> List[str]:
240 """List all registered keys.
242 Returns:
243 List of registered keys
245 Example:
246 ```python
247 keys = registry.list_keys()
248 print(keys)
249 # ['tool1', 'tool2']
250 ```
251 """
252 with self._lock:
253 return list(self._items.keys())
255 def list_items(self) -> List[T]:
256 """List all registered items.
258 Returns:
259 List of registered items
261 Example:
262 ```python
263 items = registry.list_items()
264 for item in items:
265 print(item)
266 ```
267 """
268 with self._lock:
269 return list(self._items.values())
271 def items(self) -> List[tuple[str, T]]:
272 """Get all key-item pairs.
274 Returns:
275 List of (key, item) tuples
277 Example:
278 ```python
279 for key, item in registry.items():
280 print(f"{key}: {item}")
281 ```
282 """
283 with self._lock:
284 return list(self._items.items())
286 def count(self) -> int:
287 """Get count of registered items.
289 Returns:
290 Number of items in registry
292 Example:
293 ```python
294 count = registry.count()
295 print(f"Registry has {count} items")
296 ```
297 """
298 with self._lock:
299 return len(self._items)
301 def clear(self) -> None:
302 """Clear all items from registry.
304 Example:
305 ```python
306 registry.clear()
307 registry.count()
308 # 0
309 ```
310 """
311 with self._lock:
312 self._items.clear()
313 if self._metrics is not None:
314 self._metrics.clear()
316 def get_metrics(self, key: str | None = None) -> Dict[str, Any]:
317 """Get registration metrics.
319 Args:
320 key: Optional specific key to get metrics for
322 Returns:
323 Metrics dictionary
325 Example:
326 ```python
327 metrics = registry.get_metrics()
328 print(metrics)
329 # {'tool1': {'registered_at': 1699456789.0, 'metadata': {}}}
330 ```
331 """
332 with self._lock:
333 if self._metrics is None:
334 return {}
336 if key:
337 return self._metrics.get(key, {})
339 return dict(self._metrics)
341 def __len__(self) -> int:
342 """Get number of registered items using len()."""
343 return self.count()
345 def __contains__(self, key: str) -> bool:
346 """Check if item exists using 'in' operator."""
347 return self.has(key)
349 def __iter__(self):
350 """Iterate over registered items."""
351 return iter(self.list_items())
354class CachedRegistry(Registry[T]):
355 """Registry with time-based caching support.
357 Extends the base registry with caching capabilities. Items can be
358 retrieved from cache with automatic expiration and refresh based on TTL.
359 Implements LRU eviction when cache size exceeds limits.
361 Args:
362 name: Registry name
363 cache_ttl: Cache time-to-live in seconds (default: 300)
364 max_cache_size: Maximum number of cached items (default: 1000)
366 Example:
367 ```python
368 registry = CachedRegistry[Bot]("bots", cache_ttl=300)
369 bot = registry.get_cached(
370 "client1",
371 factory=lambda: create_bot("client1")
372 )
373 ```
374 """
376 def __init__(
377 self,
378 name: str,
379 cache_ttl: int = 300,
380 max_cache_size: int = 1000,
381 ):
382 """Initialize cached registry.
384 Args:
385 name: Registry name
386 cache_ttl: Time-to-live for cached items in seconds
387 max_cache_size: Maximum cache size before eviction
388 """
389 super().__init__(name, enable_metrics=True)
390 self._cache: Dict[str, tuple[T, float]] = {}
391 self._cache_ttl = cache_ttl
392 self._max_cache_size = max_cache_size
393 self._cache_hits = 0
394 self._cache_misses = 0
396 def get_cached(
397 self,
398 key: str,
399 factory: Callable[[], T],
400 force_refresh: bool = False,
401 ) -> T:
402 """Get item from cache with automatic refresh.
404 If item exists in cache and is not expired, returns cached version.
405 Otherwise, calls factory to create new item and caches it.
407 Args:
408 key: Cache key
409 factory: Callable that creates the item if not cached
410 force_refresh: Force refresh even if cached
412 Returns:
413 Cached or newly created item
415 Example:
416 ```python
417 def create_bot():
418 return Bot("my-bot")
419 bot = registry.get_cached("bot1", create_bot)
420 ```
421 """
422 with self._lock:
423 # Check cache
424 if not force_refresh and key in self._cache:
425 item, cached_at = self._cache[key]
426 if time.time() - cached_at < self._cache_ttl:
427 self._cache_hits += 1
428 return item
430 # Cache miss - create new item
431 self._cache_misses += 1
432 item = factory()
433 self._cache[key] = (item, time.time())
435 # Evict if cache too large
436 if len(self._cache) > self._max_cache_size:
437 self._evict_oldest()
439 return item
441 def invalidate_cache(self, key: str | None = None) -> None:
442 """Invalidate cache for a key or all keys.
444 Args:
445 key: Specific key to invalidate, or None to invalidate all
447 Example:
448 ```python
449 registry.invalidate_cache("bot1") # Invalidate one
450 registry.invalidate_cache() # Invalidate all
451 ```
452 """
453 with self._lock:
454 if key:
455 if key in self._cache:
456 del self._cache[key]
457 else:
458 self._cache.clear()
460 def get_cache_stats(self) -> Dict[str, Any]:
461 """Get cache statistics.
463 Returns:
464 Dictionary with cache statistics
466 Example:
467 ```python
468 stats = registry.get_cache_stats()
469 print(f"Hit rate: {stats['hit_rate']:.2%}")
470 ```
471 """
472 with self._lock:
473 total = self._cache_hits + self._cache_misses
474 hit_rate = self._cache_hits / total if total > 0 else 0.0
476 return {
477 "size": len(self._cache),
478 "max_size": self._max_cache_size,
479 "ttl_seconds": self._cache_ttl,
480 "hits": self._cache_hits,
481 "misses": self._cache_misses,
482 "total_requests": total,
483 "hit_rate": hit_rate,
484 }
486 def _evict_oldest(self) -> None:
487 """Evict oldest cache entries (LRU).
489 Removes oldest 10% of cache entries when max size is exceeded.
490 """
491 sorted_items = sorted(self._cache.items(), key=lambda x: x[1][1])
492 num_to_remove = max(1, len(sorted_items) // 10)
494 for key, _ in sorted_items[:num_to_remove]:
495 del self._cache[key]
498class AsyncRegistry(Generic[T]):
499 """Async-safe registry for managing named items.
501 Similar to Registry but uses asyncio locks for async-safe operations.
502 Use this when working in async contexts.
504 Args:
505 name: Registry name
506 enable_metrics: Enable metrics tracking
508 Example:
509 >>> registry = AsyncRegistry[Tool]("tools")
510 >>> await registry.register("tool1", my_tool)
511 >>> tool = await registry.get("tool1")
512 """
514 def __init__(self, name: str, enable_metrics: bool = False):
515 """Initialize async registry.
517 Args:
518 name: Registry name
519 enable_metrics: Enable metrics tracking
520 """
521 self._name = name
522 self._items: Dict[str, T] = {}
523 self._lock = asyncio.Lock()
524 self._metrics: Dict[str, Dict[str, Any]] | None = {} if enable_metrics else None
526 @property
527 def name(self) -> str:
528 """Get registry name."""
529 return self._name
531 async def register(
532 self,
533 key: str,
534 item: T,
535 metadata: Dict[str, Any] | None = None,
536 allow_overwrite: bool = False,
537 ) -> None:
538 """Register an item by key.
540 Args:
541 key: Unique identifier
542 item: Item to register
543 metadata: Optional metadata
544 allow_overwrite: Allow overwriting existing items
546 Raises:
547 OperationError: If item exists and allow_overwrite is False
548 """
549 async with self._lock:
550 if not allow_overwrite and key in self._items:
551 raise OperationError(
552 f"Item '{key}' already registered in {self._name}",
553 context={"key": key, "registry": self._name},
554 )
556 self._items[key] = item
558 if self._metrics is not None:
559 self._metrics[key] = {
560 "registered_at": time.time(),
561 "metadata": metadata or {},
562 }
564 async def unregister(self, key: str) -> T:
565 """Unregister and return an item.
567 Args:
568 key: Key to unregister
570 Returns:
571 The unregistered item
573 Raises:
574 NotFoundError: If item not found
575 """
576 async with self._lock:
577 if key not in self._items:
578 raise NotFoundError(
579 f"Item not found: {key}",
580 context={"key": key, "registry": self._name},
581 )
583 item = self._items.pop(key)
585 if self._metrics is not None and key in self._metrics:
586 del self._metrics[key]
588 return item
590 async def get(self, key: str) -> T:
591 """Get an item by key.
593 Args:
594 key: Key to retrieve
596 Returns:
597 The registered item
599 Raises:
600 NotFoundError: If item not found
601 """
602 async with self._lock:
603 if key not in self._items:
604 raise NotFoundError(
605 f"Item not found: {key}",
606 context={"key": key, "registry": self._name, "available_keys": list(self._items.keys())},
607 )
608 return self._items[key]
610 async def get_optional(self, key: str) -> T | None:
611 """Get an item, returning None if not found.
613 Args:
614 key: Key to retrieve
616 Returns:
617 The item or None
618 """
619 async with self._lock:
620 return self._items.get(key)
622 async def has(self, key: str) -> bool:
623 """Check if item exists.
625 Args:
626 key: Key to check
628 Returns:
629 True if exists
630 """
631 async with self._lock:
632 return key in self._items
634 async def list_keys(self) -> List[str]:
635 """List all registered keys.
637 Returns:
638 List of keys
639 """
640 async with self._lock:
641 return list(self._items.keys())
643 async def list_items(self) -> List[T]:
644 """List all registered items.
646 Returns:
647 List of items
648 """
649 async with self._lock:
650 return list(self._items.values())
652 async def items(self) -> List[tuple[str, T]]:
653 """Get all key-item pairs.
655 Returns:
656 List of (key, item) tuples
657 """
658 async with self._lock:
659 return list(self._items.items())
661 async def count(self) -> int:
662 """Get count of registered items.
664 Returns:
665 Number of items
666 """
667 async with self._lock:
668 return len(self._items)
670 async def clear(self) -> None:
671 """Clear all items."""
672 async with self._lock:
673 self._items.clear()
674 if self._metrics is not None:
675 self._metrics.clear()
677 async def get_metrics(self, key: str | None = None) -> Dict[str, Any]:
678 """Get registration metrics.
680 Args:
681 key: Optional specific key
683 Returns:
684 Metrics dictionary
685 """
686 async with self._lock:
687 if self._metrics is None:
688 return {}
690 if key:
691 return self._metrics.get(key, {})
693 return dict(self._metrics)
695 def __len__(self) -> int:
696 """Get number of registered items using len()."""
697 # Note: This is synchronous but safe since it just reads the dict
698 return len(self._items)
700 def __contains__(self, key: str) -> bool:
701 """Check if item exists using 'in' operator."""
702 # Note: This is synchronous but safe since it just reads the dict
703 return key in self._items
705 def __iter__(self):
706 """Iterate over registered items."""
707 # Note: Returns iterator over current snapshot
708 return iter(list(self._items.values()))
711class PluginRegistry(Generic[T]):
712 """Registry for plugins with factory support and defaults.
714 A specialized registry pattern for managing plugins (adapters, handlers,
715 providers, etc.) that supports:
716 - Class or factory function registration
717 - Lazy instantiation with configuration
718 - Default fallback when plugin not found
719 - Instance caching
720 - Type validation
722 This pattern is useful when you need to:
723 - Register different implementations of an interface
724 - Create instances on-demand with configuration
725 - Provide graceful fallbacks for unregistered keys
727 Args:
728 name: Registry name
729 default_factory: Default factory to use when key not found
731 Example:
732 ```python
733 from dataknobs_common.registry import PluginRegistry
735 # Define base class
736 class Handler:
737 def __init__(self, name: str, config: dict):
738 self.name = name
739 self.config = config
741 class DefaultHandler(Handler):
742 pass
744 class CustomHandler(Handler):
745 pass
747 # Create registry with default
748 registry = PluginRegistry[Handler]("handlers", default_factory=DefaultHandler)
750 # Register plugins
751 registry.register("custom", CustomHandler)
753 # Get instances
754 handler = registry.get("custom", config={"timeout": 30})
755 default = registry.get("unknown", config={}) # Uses default
756 ```
758 With async factories:
759 ```python
760 async def create_async_handler(name, config):
761 handler = AsyncHandler(name, config)
762 await handler.initialize()
763 return handler
765 registry.register("async", create_async_handler)
766 handler = await registry.get_async("async", config={"url": "..."})
767 ```
768 """
770 def __init__(
771 self,
772 name: str,
773 default_factory: type[T] | Callable[..., T] | None = None,
774 validate_type: type | None = None,
775 ):
776 """Initialize plugin registry.
778 Args:
779 name: Registry name for identification
780 default_factory: Default class or factory to use when key not found
781 validate_type: Optional base type to validate registrations against
782 """
783 self._name = name
784 self._factories: Dict[str, type[T] | Callable[..., T]] = {}
785 self._instances: Dict[str, T] = {}
786 self._lock = threading.RLock()
787 self._default_factory = default_factory
788 self._validate_type = validate_type
790 @property
791 def name(self) -> str:
792 """Get registry name."""
793 return self._name
795 def register(
796 self,
797 key: str,
798 factory: type[T] | Callable[..., T],
799 override: bool = False,
800 ) -> None:
801 """Register a plugin class or factory.
803 Args:
804 key: Unique identifier for the plugin
805 factory: Plugin class or factory function that creates instances
806 override: If True, allow overriding existing registration
808 Raises:
809 OperationError: If key already registered and override=False
810 TypeError: If factory doesn't match validate_type
812 Example:
813 ```python
814 # Register a class
815 registry.register("handler1", MyHandler)
817 # Register a factory function
818 registry.register("handler2", lambda name, config: create_handler(name, config))
819 ```
820 """
821 with self._lock:
822 # Check for existing registration
823 if not override and key in self._factories:
824 raise OperationError(
825 f"Plugin '{key}' already registered in {self._name}. "
826 f"Use override=True to replace.",
827 context={"key": key, "registry": self._name},
828 )
830 # Validate type if specified
831 if self._validate_type and isinstance(factory, type):
832 if not issubclass(factory, self._validate_type):
833 raise TypeError(
834 f"Factory class must be a subclass of {self._validate_type.__name__}, "
835 f"got {factory.__name__}"
836 )
837 elif not callable(factory):
838 raise TypeError(
839 f"Factory must be a class or callable, got {type(factory).__name__}"
840 )
842 # Register
843 self._factories[key] = factory
845 # Clear cached instance if overriding
846 if key in self._instances:
847 del self._instances[key]
849 def unregister(self, key: str) -> None:
850 """Unregister a plugin.
852 Args:
853 key: Key to unregister
855 Raises:
856 NotFoundError: If key not registered
857 """
858 with self._lock:
859 if key not in self._factories:
860 raise NotFoundError(
861 f"Plugin not found: {key}",
862 context={"key": key, "registry": self._name},
863 )
865 del self._factories[key]
867 # Clear cached instance
868 if key in self._instances:
869 del self._instances[key]
871 def is_registered(self, key: str) -> bool:
872 """Check if a plugin is registered.
874 Args:
875 key: Key to check
877 Returns:
878 True if registered
879 """
880 with self._lock:
881 return key in self._factories
883 def get(
884 self,
885 key: str,
886 config: Dict[str, Any] | None = None,
887 use_cache: bool = True,
888 use_default: bool = True,
889 ) -> T:
890 """Get a plugin instance.
892 Creates instance if not cached, using the registered factory.
894 Args:
895 key: Plugin identifier
896 config: Configuration dictionary passed to factory
897 use_cache: Return cached instance if available
898 use_default: Use default factory if key not registered
900 Returns:
901 Plugin instance
903 Raises:
904 NotFoundError: If key not registered and use_default=False
906 Example:
907 ```python
908 handler = registry.get("custom", config={"timeout": 30})
909 ```
910 """
911 with self._lock:
912 # Check cache
913 if use_cache and key in self._instances:
914 return self._instances[key]
916 # Get factory
917 if key in self._factories:
918 factory = self._factories[key]
919 elif use_default and self._default_factory:
920 factory = self._default_factory
921 else:
922 raise NotFoundError(
923 f"Plugin '{key}' not registered and no default available",
924 context={
925 "key": key,
926 "registry": self._name,
927 "available": list(self._factories.keys()),
928 },
929 )
931 # Create instance
932 try:
933 if isinstance(factory, type):
934 instance = factory(key, config or {})
935 else:
936 instance = factory(key, config or {})
938 # Validate instance type if specified
939 if self._validate_type and not isinstance(instance, self._validate_type):
940 raise TypeError(
941 f"Factory must return a {self._validate_type.__name__} instance, "
942 f"got {type(instance).__name__}"
943 )
945 except Exception as e:
946 raise OperationError(
947 f"Failed to create plugin '{key}': {e}",
948 context={"key": key, "registry": self._name},
949 ) from e
951 # Cache instance
952 if use_cache:
953 self._instances[key] = instance
955 return instance
957 async def get_async(
958 self,
959 key: str,
960 config: Dict[str, Any] | None = None,
961 use_cache: bool = True,
962 use_default: bool = True,
963 ) -> T:
964 """Get a plugin instance, supporting async factories.
966 Like get() but awaits the factory if it's a coroutine function.
968 Args:
969 key: Plugin identifier
970 config: Configuration dictionary
971 use_cache: Return cached instance if available
972 use_default: Use default factory if key not registered
974 Returns:
975 Plugin instance
977 Example:
978 ```python
979 handler = await registry.get_async("async-handler", config={"url": "..."})
980 ```
981 """
982 with self._lock:
983 # Check cache
984 if use_cache and key in self._instances:
985 return self._instances[key]
987 # Get factory
988 if key in self._factories:
989 factory = self._factories[key]
990 elif use_default and self._default_factory:
991 factory = self._default_factory
992 else:
993 raise NotFoundError(
994 f"Plugin '{key}' not registered and no default available",
995 context={
996 "key": key,
997 "registry": self._name,
998 "available": list(self._factories.keys()),
999 },
1000 )
1002 # Create instance (outside lock for async)
1003 try:
1004 if isinstance(factory, type):
1005 instance = factory(key, config or {})
1006 else:
1007 result = factory(key, config or {})
1008 # Await if coroutine
1009 if asyncio.iscoroutine(result):
1010 instance = await result
1011 else:
1012 instance = result
1014 # Validate instance type
1015 if self._validate_type and not isinstance(instance, self._validate_type):
1016 raise TypeError(
1017 f"Factory must return a {self._validate_type.__name__} instance, "
1018 f"got {type(instance).__name__}"
1019 )
1021 except Exception as e:
1022 raise OperationError(
1023 f"Failed to create plugin '{key}': {e}",
1024 context={"key": key, "registry": self._name},
1025 ) from e
1027 # Cache instance
1028 with self._lock:
1029 if use_cache:
1030 self._instances[key] = instance
1032 return instance
1034 def list_keys(self) -> List[str]:
1035 """List all registered plugin keys.
1037 Returns:
1038 List of registered keys
1039 """
1040 with self._lock:
1041 return list(self._factories.keys())
1043 def clear_cache(self, key: str | None = None) -> None:
1044 """Clear cached instances.
1046 Args:
1047 key: Specific key to clear, or None for all
1048 """
1049 with self._lock:
1050 if key:
1051 if key in self._instances:
1052 del self._instances[key]
1053 else:
1054 self._instances.clear()
1056 def get_factory(self, key: str) -> type[T] | Callable[..., T] | None:
1057 """Get the registered factory for a key.
1059 Args:
1060 key: Plugin identifier
1062 Returns:
1063 Factory class or function, or None if not registered
1064 """
1065 with self._lock:
1066 return self._factories.get(key)
1068 @property
1069 def cached_instances(self) -> Dict[str, T]:
1070 """Get the dictionary of cached instances.
1072 Returns:
1073 Dictionary mapping keys to cached instances
1075 Note:
1076 This returns the internal cache dictionary. Modifications
1077 will affect the cache directly.
1078 """
1079 return self._instances
1081 def set_default_factory(self, factory: type[T] | Callable[..., T]) -> None:
1082 """Set the default factory.
1084 Args:
1085 factory: New default factory
1087 Raises:
1088 TypeError: If factory doesn't match validate_type
1089 """
1090 if self._validate_type and isinstance(factory, type):
1091 if not issubclass(factory, self._validate_type):
1092 raise TypeError(
1093 f"Default factory must be a subclass of {self._validate_type.__name__}"
1094 )
1096 self._default_factory = factory
1098 def bulk_register(
1099 self,
1100 factories: Dict[str, type[T] | Callable[..., T]],
1101 override: bool = False,
1102 ) -> None:
1103 """Register multiple plugins at once.
1105 Args:
1106 factories: Dictionary mapping keys to factories
1107 override: Allow overriding existing registrations
1109 Example:
1110 ```python
1111 registry.bulk_register({
1112 "handler1": Handler1,
1113 "handler2": Handler2,
1114 })
1115 ```
1116 """
1117 for key, factory in factories.items():
1118 self.register(key, factory, override=override)
1120 def copy(self) -> Dict[str, type[T] | Callable[..., T]]:
1121 """Get a copy of all registered factories.
1123 Returns:
1124 Dictionary of key to factory mappings
1125 """
1126 with self._lock:
1127 return dict(self._factories)
1129 def __len__(self) -> int:
1130 """Get number of registered plugins."""
1131 return len(self._factories)
1133 def __contains__(self, key: str) -> bool:
1134 """Check if plugin is registered using 'in' operator."""
1135 return self.is_registered(key)
1137 def __repr__(self) -> str:
1138 """Get string representation."""
1139 return (
1140 f"PluginRegistry("
1141 f"name='{self._name}', "
1142 f"plugins={len(self._factories)}, "
1143 f"cached={len(self._instances)}"
1144 f")"
1145 )
1148__all__ = [
1149 "Registry",
1150 "CachedRegistry",
1151 "AsyncRegistry",
1152 "PluginRegistry",
1153]