Coverage for src/symphra_modules/events/bus.py: 90.48%

45 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-26 18:16 +0800

1"""事件总线实现.""" 

2 

3from collections import defaultdict 

4from collections.abc import Callable 

5 

6from symphra_modules.events.types import Event 

7from symphra_modules.exceptions import EventError 

8from symphra_modules.utils import get_logger 

9 

10logger = get_logger() 

11 

12# 事件处理器类型定义 

13EventHandler = Callable[[Event], None] 

14 

15 

16class EventBus: 

17 """事件总线 - 简单的发布/订阅实现.""" 

18 

19 def __init__(self) -> None: 

20 """初始化事件总线.""" 

21 self._handlers: dict[str, list[EventHandler]] = defaultdict(list) 

22 self._wildcard_handlers: list[EventHandler] = [] 

23 

24 def subscribe(self, event_type: str, handler: EventHandler) -> None: 

25 """订阅事件. 

26 

27 Args: 

28 event_type: 事件类型,使用 "*" 订阅所有事件 

29 handler: 事件处理器函数 

30 

31 Raises: 

32 EventError: 处理器不可调用时抛出 

33 """ 

34 if not callable(handler): 

35 raise EventError("事件处理器必须是可调用对象") 

36 

37 if event_type == "*": 

38 self._wildcard_handlers.append(handler) 

39 else: 

40 self._handlers[event_type].append(handler) 

41 

42 logger.debug(f"订阅事件: {event_type}") 

43 

44 def unsubscribe(self, event_type: str, handler: EventHandler) -> None: 

45 """取消订阅事件. 

46 

47 Args: 

48 event_type: 事件类型 

49 handler: 事件处理器函数 

50 """ 

51 if event_type == "*": 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 if handler in self._wildcard_handlers: 

53 self._wildcard_handlers.remove(handler) 

54 else: 

55 if handler in self._handlers[event_type]: 55 ↛ 58line 55 didn't jump to line 58 because the condition on line 55 was always true

56 self._handlers[event_type].remove(handler) 

57 

58 logger.debug(f"取消订阅事件: {event_type}") 

59 

60 def publish(self, event: Event) -> None: 

61 """发布事件. 

62 

63 Args: 

64 event: 事件对象 

65 """ 

66 logger.debug(f"发布事件: {event.event_type} - {event.module_name}") 

67 

68 # 调用通配符处理器 

69 for handler in self._wildcard_handlers: 

70 self._call_handler(handler, event) 

71 

72 # 调用特定类型的处理器 

73 if event.event_type in self._handlers: 

74 for handler in self._handlers[event.event_type]: 

75 self._call_handler(handler, event) 

76 

77 def _call_handler(self, handler: EventHandler, event: Event) -> None: 

78 """调用事件处理器. 

79 

80 Args: 

81 handler: 事件处理器 

82 event: 事件对象 

83 """ 

84 try: 

85 handler(event) 

86 except Exception as e: 

87 logger.error(f"事件处理器执行失败: {handler.__name__} - {e}") 

88 

89 def clear(self) -> None: 

90 """清除所有订阅.""" 

91 self._handlers.clear() 

92 self._wildcard_handlers.clear() 

93 logger.debug("已清除所有事件订阅") 

94 

95 def get_subscribers(self, event_type: str) -> list[EventHandler]: 

96 """获取指定事件类型的订阅者. 

97 

98 Args: 

99 event_type: 事件类型 

100 

101 Returns: 

102 订阅者列表 

103 """ 

104 if event_type == "*": 

105 return self._wildcard_handlers.copy() 

106 return self._handlers.get(event_type, []).copy()