Coverage for src/symphra_modules/resolver/dependency.py: 98.59%
94 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 18:16 +0800
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-26 18:16 +0800
1"""依赖解析器实现."""
3from collections import defaultdict, deque
5from symphra_modules.config import ModuleMetadata
6from symphra_modules.exceptions import ModuleDependencyError
9class DependencyGraph:
10 """依赖关系图."""
12 def __init__(self) -> None:
13 """初始化依赖图."""
14 self._graph: dict[str, set[str]] = defaultdict(set) # 模块 -> 依赖的模块集合
15 self._reverse_graph: dict[str, set[str]] = defaultdict(set) # 模块 -> 依赖它的模块集合
16 self._modules: dict[str, ModuleMetadata] = {}
18 def add_module(self, metadata: ModuleMetadata) -> None:
19 """添加模块到依赖图.
21 Args:
22 metadata: 模块元数据
23 """
24 name = metadata.name
25 self._modules[name] = metadata
27 # 初始化空依赖集合(即使没有依赖)
28 if name not in self._graph: 28 ↛ 32line 28 didn't jump to line 32 because the condition on line 28 was always true
29 self._graph[name] = set()
31 # 添加依赖关系
32 for dep in metadata.dependencies:
33 self._graph[name].add(dep)
34 self._reverse_graph[dep].add(name)
36 def get_dependencies(self, name: str) -> set[str]:
37 """获取模块的直接依赖.
39 Args:
40 name: 模块名称
42 Returns:
43 依赖的模块名称集合
44 """
45 return self._graph.get(name, set())
47 def get_dependents(self, name: str) -> set[str]:
48 """获取依赖该模块的模块.
50 Args:
51 name: 模块名称
53 Returns:
54 依赖该模块的模块名称集合
55 """
56 return self._reverse_graph.get(name, set())
58 def has_module(self, name: str) -> bool:
59 """检查模块是否存在.
61 Args:
62 name: 模块名称
64 Returns:
65 是否存在
66 """
67 return name in self._modules
69 def get_all_modules(self) -> list[str]:
70 """获取所有模块名称.
72 Returns:
73 模块名称列表
74 """
75 return list(self._modules.keys())
78class DependencyResolver:
79 """依赖解析器 - 使用拓扑排序解析模块加载顺序."""
81 def __init__(self) -> None:
82 """初始化解析器."""
83 self.graph = DependencyGraph()
85 def add_module(self, metadata: ModuleMetadata) -> None:
86 """添加模块.
88 Args:
89 metadata: 模块元数据
90 """
91 self.graph.add_module(metadata)
93 def resolve(self) -> list[str]:
94 """解析模块加载顺序(拓扑排序).
96 使用 Kahn 算法进行拓扑排序。
98 Returns:
99 按依赖顺序排列的模块名称列表
101 Raises:
102 ModuleDependencyError: 存在循环依赖或缺失依赖时抛出
103 """
104 # 检查缺失的依赖
105 self._check_missing_dependencies()
107 # 计算每个模块的入度(被依赖的次数)
108 in_degree: dict[str, int] = defaultdict(int)
109 for module in self.graph.get_all_modules():
110 in_degree[module] = 0
112 for module in self.graph.get_all_modules():
113 for _dep in self.graph.get_dependencies(module):
114 in_degree[module] += 1
116 # 找到所有入度为 0 的模块(没有依赖的模块)
117 queue: deque[str] = deque()
118 for module in self.graph.get_all_modules():
119 if in_degree[module] == 0:
120 queue.append(module)
122 result: list[str] = []
124 while queue:
125 # 取出一个入度为 0 的模块
126 current = queue.popleft()
127 result.append(current)
129 # 对于所有依赖当前模块的模块,减少其入度
130 for dependent in self.graph.get_dependents(current):
131 in_degree[dependent] -= 1
132 if in_degree[dependent] == 0:
133 queue.append(dependent)
135 # 如果结果数量不等于模块总数,说明存在循环依赖
136 if len(result) != len(self.graph.get_all_modules()):
137 circular = self._detect_circular_dependencies(in_degree)
138 raise ModuleDependencyError(
139 "检测到循环依赖",
140 circular_dependencies=circular,
141 )
143 return result
145 def _check_missing_dependencies(self) -> None:
146 """检查缺失的依赖.
148 Raises:
149 ModuleDependencyError: 存在缺失依赖时抛出
150 """
151 missing_deps: dict[str, list[str]] = {}
153 for module in self.graph.get_all_modules():
154 missing: list[str] = []
155 for dep in self.graph.get_dependencies(module):
156 if not self.graph.has_module(dep):
157 missing.append(dep)
159 if missing:
160 missing_deps[module] = missing
162 if missing_deps:
163 # 格式化错误消息
164 error_details = []
165 for module, deps in missing_deps.items():
166 error_details.append(f"{module}: {', '.join(deps)}")
168 raise ModuleDependencyError(
169 f"缺失依赖: {'; '.join(error_details)}",
170 missing_dependencies=list(missing_deps.keys()),
171 )
173 def _detect_circular_dependencies(self, in_degree: dict[str, int]) -> list[str]:
174 """检测循环依赖.
176 Args:
177 in_degree: 模块入度映射
179 Returns:
180 参与循环依赖的模块列表
181 """
182 circular: list[str] = []
183 for module, degree in in_degree.items():
184 if degree > 0: 184 ↛ 183line 184 didn't jump to line 183 because the condition on line 184 was always true
185 circular.append(module)
186 return circular
188 def get_load_order_for_module(self, name: str) -> list[str]:
189 """获取加载特定模块所需的顺序(包括其所有依赖).
191 Args:
192 name: 模块名称
194 Returns:
195 按依赖顺序排列的模块名称列表(包含该模块自身)
197 Raises:
198 ModuleDependencyError: 模块不存在或依赖有问题时抛出
199 """
200 if not self.graph.has_module(name):
201 raise ModuleDependencyError(f"模块不存在: {name}", module_name=name)
203 # 使用 DFS 收集所有依赖
204 visited: set[str] = set()
205 stack: list[str] = []
206 visiting: set[str] = set() # 用于检测循环依赖
208 def dfs(module: str) -> None:
209 if module in visited:
210 return
211 if module in visiting:
212 # 检测到循环依赖
213 raise ModuleDependencyError(
214 f"检测到循环依赖,涉及模块: {module}",
215 module_name=module,
216 circular_dependencies=[module],
217 )
219 visiting.add(module)
221 # 先处理依赖
222 for dep in self.graph.get_dependencies(module):
223 if not self.graph.has_module(dep):
224 raise ModuleDependencyError(
225 f"缺失依赖: {module} 依赖 {dep}",
226 module_name=module,
227 missing_dependencies=[dep],
228 )
229 dfs(dep)
231 visiting.remove(module)
232 visited.add(module)
233 stack.append(module)
235 dfs(name)
236 return stack