Coverage for circular_deps / core.py: 57%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-08 15:04 -0800

1from __future__ import annotations 

2 

3import os 

4from pathlib import Path 

5from typing import TYPE_CHECKING 

6 

7from circular_deps.models import Cycle, DependencyGraph, ImportInfo 

8from circular_deps.parsers.go import GoImportParser 

9from circular_deps.parsers.python import PythonImportParser 

10from circular_deps.parsers.typescript import TypeScriptImportParser 

11from circular_deps.resolvers.go import GoPathResolver 

12from circular_deps.resolvers.python import PythonPathResolver 

13from circular_deps.resolvers.typescript import TypeScriptPathResolver 

14 

15if TYPE_CHECKING: 

16 pass 

17 

18_PY_EXTENSIONS = {".py"} 

19_TS_EXTENSIONS = {".ts", ".tsx", ".js", ".jsx"} 

20_GO_EXTENSIONS = {".go"} 

21_UNSUPPORTED_EXTENSIONS = {".json", ".md", ".txt"} 

22 

23 

24def _detect_language(filepath): 

25 ext = os.path.splitext(filepath)[1].lower() 

26 if ext in _PY_EXTENSIONS: 

27 return "python" 

28 elif ext in _TS_EXTENSIONS: 

29 return "typescript" 

30 elif ext in _GO_EXTENSIONS: 

31 return "go" 

32 return None 

33 

34 

35def _get_parser(lang): 

36 if lang == "python": 

37 return PythonImportParser() 

38 elif lang == "typescript": 

39 return TypeScriptImportParser() 

40 elif lang == "go": 

41 return GoImportParser() 

42 return None 

43 

44 

45def _get_resolver(lang, config=None): 

46 module_resolution = None 

47 if config: 

48 module_resolution = config.get("ts_module_resolution") 

49 

50 if lang == "python": 

51 return PythonPathResolver() 

52 elif lang == "typescript": 

53 return TypeScriptPathResolver( 

54 module_resolution=module_resolution, filepath=None 

55 ) 

56 elif lang == "go": 

57 return GoPathResolver() 

58 return None 

59 

60 

61def _find_common_root(files): 

62 if not files: 

63 return "" 

64 

65 abs_files = [os.path.abspath(str(f)) for f in files] 

66 

67 if len(abs_files) == 1: 

68 return os.path.dirname(abs_files[0]) 

69 

70 path_parts = [p.split(os.sep) for p in abs_files] 

71 

72 common_prefix = [] 

73 min_len = min(len(p) for p in path_parts) 

74 

75 for i in range(min_len): 

76 part = path_parts[0][i] 

77 if all(p[i] == part for p in path_parts): 

78 common_prefix.append(part) 

79 else: 

80 break 

81 

82 if not common_prefix: 

83 return os.sep 

84 

85 return os.sep.join(common_prefix) 

86 

87 

88def build_dependency_graph(files, config=None): 

89 abs_files = [os.path.abspath(str(f)) for f in files] 

90 edges = {f: [] for f in abs_files} 

91 local_files = set(abs_files) 

92 

93 if not files: 

94 return DependencyGraph({}, local_files, "") 

95 root_dir = _find_common_root(abs_files) 

96 

97 for abs_file in abs_files: 

98 lang = _detect_language(Path(abs_file)) 

99 if lang is None: 

100 continue 

101 

102 ext = os.path.splitext(abs_file)[1].lower() 

103 if ext in _UNSUPPORTED_EXTENSIONS: 

104 continue 

105 

106 parser = _get_parser(lang) 

107 resolver = _get_resolver(lang, config=config) 

108 

109 if parser is None or resolver is None: 

110 continue 

111 

112 try: 

113 source = Path(abs_file).read_text() 

114 except Exception: 

115 continue 

116 

117 raw_imports = parser.extract_imports(source, abs_file) 

118 

119 for imp in raw_imports: 

120 imp.resolved_path = resolver.resolve(imp.raw_module, abs_file, root_dir) 

121 

122 if imp.resolved_path is not None and imp.resolved_path in local_files: 

123 edges[abs_file].append(imp) 

124 

125 return DependencyGraph(edges, local_files, root_dir) 

126 

127 

128def find_cycles(files, config=None): 

129 graph = build_dependency_graph(files) 

130 

131 colors = {f: "WHITE" for f in graph.local_files} 

132 cycles = [] 

133 

134 def dfs(file, path, edges_in_path): 

135 colors[file] = "GRAY" 

136 path.append(file) 

137 

138 for import_info in graph.edges.get(file, []): 

139 target = import_info.resolved_path 

140 if target is None or target not in graph.local_files: 

141 continue 

142 

143 edges_in_path.append((file, target, import_info.line)) 

144 

145 if colors[target] == "GRAY": 

146 cycle_start = path.index(target) 

147 cycle_edges = _extract_cycle_edges( 

148 edges_in_path, cycle_start, len(path) 

149 ) 

150 cycle = Cycle( 

151 files=path[cycle_start:] + [target], 

152 edges=cycle_edges, 

153 depth=len(path[cycle_start:]), 

154 ) 

155 cycles.append(cycle) 

156 elif colors[target] == "WHITE": 

157 dfs(target, path.copy(), edges_in_path.copy()) 

158 

159 colors[file] = "BLACK" 

160 

161 for file in sorted(graph.local_files): 

162 if colors[file] == "WHITE": 

163 dfs(file, [], []) 

164 

165 unique_cycles = [] 

166 seen = set() 

167 for cycle in cycles: 

168 cycle_key = tuple(sorted(cycle.files)) 

169 if cycle_key not in seen: 

170 seen.add(cycle_key) 

171 unique_cycles.append(cycle) 

172 

173 return unique_cycles 

174 

175 

176def _extract_cycle_edges(edges_in_path, cycle_start, path_len): 

177 return edges_in_path[cycle_start:] 

178 

179 

180def find_cycles_tarjan(files, config=None): 

181 graph = build_dependency_graph(files) 

182 

183 index = 0 

184 indices = {} 

185 lowlink = {} 

186 stack = [] 

187 on_stack = set() 

188 sccs = [] 

189 

190 def strongconnect(node): 

191 nonlocal index 

192 indices[node] = index 

193 lowlink[node] = index 

194 index += 1 

195 stack.append(node) 

196 on_stack.add(node) 

197 

198 for import_info in graph.edges.get(node, []): 

199 target = import_info.resolved_path 

200 if target is None or target not in graph.local_files: 

201 continue 

202 

203 if target not in indices: 

204 strongconnect(target) 

205 lowlink[node] = min(lowlink[node], lowlink[target]) 

206 elif target in on_stack: 

207 lowlink[node] = min(lowlink[node], indices[target]) 

208 

209 if lowlink[node] == indices[node]: 

210 scc = [] 

211 while True: 

212 popped = stack.pop() 

213 on_stack.remove(popped) 

214 scc.append(popped) 

215 if popped == node: 

216 break 

217 sccs.append(scc) 

218 

219 for node in sorted(graph.local_files): 

220 if node not in indices: 

221 strongconnect(node) 

222 

223 cycles = [] 

224 for scc in sccs: 

225 if len(scc) > 1: 

226 scc_set = set(scc) 

227 subgraph_edges = { 

228 node: [ 

229 imp 

230 for imp in graph.edges.get(node, []) 

231 if imp.resolved_path in scc_set 

232 ] 

233 for node in scc 

234 } 

235 colors = {node: "WHITE" for node in scc} 

236 

237 def dfs_scc(node, path, edges_in_path): 

238 colors[node] = "GRAY" 

239 path.append(node) 

240 

241 for imp in subgraph_edges.get(node, []): 

242 target = imp.resolved_path 

243 if target is None: 

244 continue 

245 

246 edges_in_path.append((node, target, imp.line)) 

247 

248 if colors[target] == "GRAY": 

249 cycle_start = path.index(target) 

250 cycle = Cycle( 

251 files=path[cycle_start:] + [target], 

252 edges=edges_in_path[cycle_start:], 

253 depth=len(path[cycle_start:]), 

254 ) 

255 cycles.append(cycle) 

256 elif colors[target] == "WHITE": 

257 dfs_scc(target, path.copy(), edges_in_path.copy()) 

258 

259 colors[node] = "BLACK" 

260 

261 for node in sorted(scc): 

262 if colors[node] == "WHITE": 

263 dfs_scc(node, [], []) 

264 

265 unique_cycles = [] 

266 seen = set() 

267 for cycle in cycles: 

268 cycle_key = tuple(sorted(cycle.files)) 

269 if cycle_key not in seen: 

270 seen.add(cycle_key) 

271 unique_cycles.append(cycle) 

272 

273 return unique_cycles