Coverage for src/symphra_modules/abc.py: 96.77%
48 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 18:16 +0800
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 18:16 +0800
1"""模块接口和基类定义."""
3import asyncio
4import inspect
5from abc import ABC, abstractmethod
6from typing import Any, Protocol, runtime_checkable
8from symphra_modules.config import ModuleMetadata
11@runtime_checkable
12class ModuleInterface(Protocol):
13 """模块接口协议."""
15 @property
16 def metadata(self) -> ModuleMetadata:
17 """获取模块元数据."""
18 ...
20 def bootstrap(self) -> None:
21 """模块引导,注册依赖等操作."""
22 ...
24 def install(self, config: dict[str, Any] | None = None) -> None:
25 """安装模块."""
26 ...
28 def uninstall(self) -> None:
29 """卸载模块."""
30 ...
32 def configure(self, config: dict[str, Any] | None = None) -> None:
33 """配置模块."""
34 ...
36 def start(self) -> None:
37 """启动模块."""
38 ...
40 def stop(self) -> None:
41 """停止模块."""
42 ...
44 def reload(self) -> None:
45 """重载模块."""
46 ...
48 def get_config(self) -> dict[str, Any]:
49 """获取当前配置."""
50 ...
52 def validate_config(self, config: dict[str, Any] | None = None) -> bool:
53 """验证配置."""
54 ...
57class BaseModule(ABC):
58 """模块基类,提供默认实现."""
60 def __init__(self, config: dict[str, Any] | None = None) -> None:
61 """初始化模块.
63 Args:
64 config: 初始配置(可选)
65 """
66 self._config = config or {}
68 # 使用 __slots__ 降低每个模块实例的内存占用
69 __slots__ = ("_config",)
71 @property
72 @abstractmethod
73 def metadata(self) -> ModuleMetadata:
74 """获取模块元数据 - 子类必须实现."""
76 def bootstrap(self) -> None:
77 """模块引导,注册依赖等操作.
79 默认实现为空,子类可覆盖此方法执行初始化逻辑。
80 """
81 pass
83 def install(self, config: dict[str, Any] | None = None) -> None:
84 """安装模块的默认实现.
86 Args:
87 config: 安装配置(可选)
88 """
89 if config:
90 self.configure(config)
92 def uninstall(self) -> None:
93 """卸载模块的默认实现.
95 默认行为是先停止模块。
96 """
97 self.stop()
99 def configure(self, config: dict[str, Any] | None = None) -> None:
100 """配置模块.
102 Args:
103 config: 配置字典(可选)
105 Raises:
106 ModuleConfigError: 配置验证失败时抛出
107 """
108 if config and self.validate_config(config):
109 self._config.update(config)
110 elif config: 110 ↛ exitline 110 didn't return from function 'configure' because the condition on line 110 was always true
111 from symphra_modules.exceptions import ModuleConfigError
113 raise ModuleConfigError(
114 f"配置验证失败: {self.metadata.name}",
115 module_name=self.metadata.name,
116 )
118 def start(self) -> None:
119 """启动模块的默认实现.
121 子类应覆盖此方法实现实际的启动逻辑。
122 """
123 pass
125 def stop(self) -> None:
126 """停止模块的默认实现.
128 子类应覆盖此方法实现实际的停止逻辑。
129 """
130 pass
132 def reload(self) -> None:
133 """重载模块的默认实现.
135 默认行为是先停止再启动。
136 """
137 self.stop()
138 self.start()
140 def get_config(self) -> dict[str, Any]:
141 """获取当前配置.
143 Returns:
144 配置字典的副本
145 """
146 return self._config.copy() if self._config else {}
148 def validate_config(self, config: dict[str, Any] | None = None) -> bool:
149 """验证配置 - 默认实现.
151 默认总是返回 True,子类应覆盖此方法实现实际的验证逻辑。
153 Args:
154 config: 待验证的配置(可选)
156 Returns:
157 验证是否通过
158 """
159 return True
162def is_async_module(module: object) -> bool:
163 """检测模块是否实现了异步方法.
165 Args:
166 module: 要检测的模块实例
168 Returns:
169 如果模块的核心方法是协程函数则返回 True
170 """
171 # 检查关键方法是否为协程函数
172 key_methods = ["install", "uninstall", "start", "stop", "configure"]
173 for method_name in key_methods:
174 if hasattr(module, method_name): 174 ↛ 173line 174 didn't jump to line 173 because the condition on line 174 was always true
175 method = getattr(module, method_name)
176 if inspect.iscoroutinefunction(method):
177 return True
178 return False
181async def call_module_method(
182 module: object,
183 method_name: str,
184 *args: Any,
185 **kwargs: Any,
186) -> Any:
187 """统一调用模块方法(自动检测同步/异步).
189 Args:
190 module: 模块实例
191 method_name: 方法名称
192 *args: 位置参数
193 **kwargs: 关键字参数
195 Returns:
196 方法返回值
198 Raises:
199 AttributeError: 方法不存在时抛出
200 """
201 if not hasattr(module, method_name):
202 raise AttributeError(f"模块 {type(module).__name__} 没有方法 {method_name}")
204 method = getattr(module, method_name)
206 # 如果是协程函数,直接 await
207 if inspect.iscoroutinefunction(method):
208 return await method(*args, **kwargs)
210 # 如果是同步函数,在执行器中运行
211 loop = asyncio.get_running_loop()
212 return await loop.run_in_executor(None, lambda: method(*args, **kwargs))