Coverage for src/symphra_modules/events/bus.py: 90.48%
45 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 18:16 +0800
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 18:16 +0800
1"""事件总线实现."""
3from collections import defaultdict
4from collections.abc import Callable
6from symphra_modules.events.types import Event
7from symphra_modules.exceptions import EventError
8from symphra_modules.utils import get_logger
10logger = get_logger()
12# 事件处理器类型定义
13EventHandler = Callable[[Event], None]
16class EventBus:
17 """事件总线 - 简单的发布/订阅实现."""
19 def __init__(self) -> None:
20 """初始化事件总线."""
21 self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
22 self._wildcard_handlers: list[EventHandler] = []
24 def subscribe(self, event_type: str, handler: EventHandler) -> None:
25 """订阅事件.
27 Args:
28 event_type: 事件类型,使用 "*" 订阅所有事件
29 handler: 事件处理器函数
31 Raises:
32 EventError: 处理器不可调用时抛出
33 """
34 if not callable(handler):
35 raise EventError("事件处理器必须是可调用对象")
37 if event_type == "*":
38 self._wildcard_handlers.append(handler)
39 else:
40 self._handlers[event_type].append(handler)
42 logger.debug(f"订阅事件: {event_type}")
44 def unsubscribe(self, event_type: str, handler: EventHandler) -> None:
45 """取消订阅事件.
47 Args:
48 event_type: 事件类型
49 handler: 事件处理器函数
50 """
51 if event_type == "*": 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 if handler in self._wildcard_handlers:
53 self._wildcard_handlers.remove(handler)
54 else:
55 if handler in self._handlers[event_type]: 55 ↛ 58line 55 didn't jump to line 58 because the condition on line 55 was always true
56 self._handlers[event_type].remove(handler)
58 logger.debug(f"取消订阅事件: {event_type}")
60 def publish(self, event: Event) -> None:
61 """发布事件.
63 Args:
64 event: 事件对象
65 """
66 logger.debug(f"发布事件: {event.event_type} - {event.module_name}")
68 # 调用通配符处理器
69 for handler in self._wildcard_handlers:
70 self._call_handler(handler, event)
72 # 调用特定类型的处理器
73 if event.event_type in self._handlers:
74 for handler in self._handlers[event.event_type]:
75 self._call_handler(handler, event)
77 def _call_handler(self, handler: EventHandler, event: Event) -> None:
78 """调用事件处理器.
80 Args:
81 handler: 事件处理器
82 event: 事件对象
83 """
84 try:
85 handler(event)
86 except Exception as e:
87 logger.error(f"事件处理器执行失败: {handler.__name__} - {e}")
89 def clear(self) -> None:
90 """清除所有订阅."""
91 self._handlers.clear()
92 self._wildcard_handlers.clear()
93 logger.debug("已清除所有事件订阅")
95 def get_subscribers(self, event_type: str) -> list[EventHandler]:
96 """获取指定事件类型的订阅者.
98 Args:
99 event_type: 事件类型
101 Returns:
102 订阅者列表
103 """
104 if event_type == "*":
105 return self._wildcard_handlers.copy()
106 return self._handlers.get(event_type, []).copy()