Coverage for src/symphra_modules/resolver/dependency.py: 98.59%

94 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-26 18:16 +0800

1"""依赖解析器实现.""" 

2 

3from collections import defaultdict, deque 

4 

5from symphra_modules.config import ModuleMetadata 

6from symphra_modules.exceptions import ModuleDependencyError 

7 

8 

9class DependencyGraph: 

10 """依赖关系图.""" 

11 

12 def __init__(self) -> None: 

13 """初始化依赖图.""" 

14 self._graph: dict[str, set[str]] = defaultdict(set) # 模块 -> 依赖的模块集合 

15 self._reverse_graph: dict[str, set[str]] = defaultdict(set) # 模块 -> 依赖它的模块集合 

16 self._modules: dict[str, ModuleMetadata] = {} 

17 

18 def add_module(self, metadata: ModuleMetadata) -> None: 

19 """添加模块到依赖图. 

20 

21 Args: 

22 metadata: 模块元数据 

23 """ 

24 name = metadata.name 

25 self._modules[name] = metadata 

26 

27 # 初始化空依赖集合(即使没有依赖) 

28 if name not in self._graph: 28 ↛ 32line 28 didn't jump to line 32 because the condition on line 28 was always true

29 self._graph[name] = set() 

30 

31 # 添加依赖关系 

32 for dep in metadata.dependencies: 

33 self._graph[name].add(dep) 

34 self._reverse_graph[dep].add(name) 

35 

36 def get_dependencies(self, name: str) -> set[str]: 

37 """获取模块的直接依赖. 

38 

39 Args: 

40 name: 模块名称 

41 

42 Returns: 

43 依赖的模块名称集合 

44 """ 

45 return self._graph.get(name, set()) 

46 

47 def get_dependents(self, name: str) -> set[str]: 

48 """获取依赖该模块的模块. 

49 

50 Args: 

51 name: 模块名称 

52 

53 Returns: 

54 依赖该模块的模块名称集合 

55 """ 

56 return self._reverse_graph.get(name, set()) 

57 

58 def has_module(self, name: str) -> bool: 

59 """检查模块是否存在. 

60 

61 Args: 

62 name: 模块名称 

63 

64 Returns: 

65 是否存在 

66 """ 

67 return name in self._modules 

68 

69 def get_all_modules(self) -> list[str]: 

70 """获取所有模块名称. 

71 

72 Returns: 

73 模块名称列表 

74 """ 

75 return list(self._modules.keys()) 

76 

77 

78class DependencyResolver: 

79 """依赖解析器 - 使用拓扑排序解析模块加载顺序.""" 

80 

81 def __init__(self) -> None: 

82 """初始化解析器.""" 

83 self.graph = DependencyGraph() 

84 

85 def add_module(self, metadata: ModuleMetadata) -> None: 

86 """添加模块. 

87 

88 Args: 

89 metadata: 模块元数据 

90 """ 

91 self.graph.add_module(metadata) 

92 

93 def resolve(self) -> list[str]: 

94 """解析模块加载顺序(拓扑排序). 

95 

96 使用 Kahn 算法进行拓扑排序。 

97 

98 Returns: 

99 按依赖顺序排列的模块名称列表 

100 

101 Raises: 

102 ModuleDependencyError: 存在循环依赖或缺失依赖时抛出 

103 """ 

104 # 检查缺失的依赖 

105 self._check_missing_dependencies() 

106 

107 # 计算每个模块的入度(被依赖的次数) 

108 in_degree: dict[str, int] = defaultdict(int) 

109 for module in self.graph.get_all_modules(): 

110 in_degree[module] = 0 

111 

112 for module in self.graph.get_all_modules(): 

113 for _dep in self.graph.get_dependencies(module): 

114 in_degree[module] += 1 

115 

116 # 找到所有入度为 0 的模块(没有依赖的模块) 

117 queue: deque[str] = deque() 

118 for module in self.graph.get_all_modules(): 

119 if in_degree[module] == 0: 

120 queue.append(module) 

121 

122 result: list[str] = [] 

123 

124 while queue: 

125 # 取出一个入度为 0 的模块 

126 current = queue.popleft() 

127 result.append(current) 

128 

129 # 对于所有依赖当前模块的模块,减少其入度 

130 for dependent in self.graph.get_dependents(current): 

131 in_degree[dependent] -= 1 

132 if in_degree[dependent] == 0: 

133 queue.append(dependent) 

134 

135 # 如果结果数量不等于模块总数,说明存在循环依赖 

136 if len(result) != len(self.graph.get_all_modules()): 

137 circular = self._detect_circular_dependencies(in_degree) 

138 raise ModuleDependencyError( 

139 "检测到循环依赖", 

140 circular_dependencies=circular, 

141 ) 

142 

143 return result 

144 

145 def _check_missing_dependencies(self) -> None: 

146 """检查缺失的依赖. 

147 

148 Raises: 

149 ModuleDependencyError: 存在缺失依赖时抛出 

150 """ 

151 missing_deps: dict[str, list[str]] = {} 

152 

153 for module in self.graph.get_all_modules(): 

154 missing: list[str] = [] 

155 for dep in self.graph.get_dependencies(module): 

156 if not self.graph.has_module(dep): 

157 missing.append(dep) 

158 

159 if missing: 

160 missing_deps[module] = missing 

161 

162 if missing_deps: 

163 # 格式化错误消息 

164 error_details = [] 

165 for module, deps in missing_deps.items(): 

166 error_details.append(f"{module}: {', '.join(deps)}") 

167 

168 raise ModuleDependencyError( 

169 f"缺失依赖: {'; '.join(error_details)}", 

170 missing_dependencies=list(missing_deps.keys()), 

171 ) 

172 

173 def _detect_circular_dependencies(self, in_degree: dict[str, int]) -> list[str]: 

174 """检测循环依赖. 

175 

176 Args: 

177 in_degree: 模块入度映射 

178 

179 Returns: 

180 参与循环依赖的模块列表 

181 """ 

182 circular: list[str] = [] 

183 for module, degree in in_degree.items(): 

184 if degree > 0: 184 ↛ 183line 184 didn't jump to line 183 because the condition on line 184 was always true

185 circular.append(module) 

186 return circular 

187 

188 def get_load_order_for_module(self, name: str) -> list[str]: 

189 """获取加载特定模块所需的顺序(包括其所有依赖). 

190 

191 Args: 

192 name: 模块名称 

193 

194 Returns: 

195 按依赖顺序排列的模块名称列表(包含该模块自身) 

196 

197 Raises: 

198 ModuleDependencyError: 模块不存在或依赖有问题时抛出 

199 """ 

200 if not self.graph.has_module(name): 

201 raise ModuleDependencyError(f"模块不存在: {name}", module_name=name) 

202 

203 # 使用 DFS 收集所有依赖 

204 visited: set[str] = set() 

205 stack: list[str] = [] 

206 visiting: set[str] = set() # 用于检测循环依赖 

207 

208 def dfs(module: str) -> None: 

209 if module in visited: 

210 return 

211 if module in visiting: 

212 # 检测到循环依赖 

213 raise ModuleDependencyError( 

214 f"检测到循环依赖,涉及模块: {module}", 

215 module_name=module, 

216 circular_dependencies=[module], 

217 ) 

218 

219 visiting.add(module) 

220 

221 # 先处理依赖 

222 for dep in self.graph.get_dependencies(module): 

223 if not self.graph.has_module(dep): 

224 raise ModuleDependencyError( 

225 f"缺失依赖: {module} 依赖 {dep}", 

226 module_name=module, 

227 missing_dependencies=[dep], 

228 ) 

229 dfs(dep) 

230 

231 visiting.remove(module) 

232 visited.add(module) 

233 stack.append(module) 

234 

235 dfs(name) 

236 return stack