Coverage for circular_deps / core.py: 57%
188 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-08 15:04 -0800
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-08 15:04 -0800
1from __future__ import annotations
3import os
4from pathlib import Path
5from typing import TYPE_CHECKING
7from circular_deps.models import Cycle, DependencyGraph, ImportInfo
8from circular_deps.parsers.go import GoImportParser
9from circular_deps.parsers.python import PythonImportParser
10from circular_deps.parsers.typescript import TypeScriptImportParser
11from circular_deps.resolvers.go import GoPathResolver
12from circular_deps.resolvers.python import PythonPathResolver
13from circular_deps.resolvers.typescript import TypeScriptPathResolver
15if TYPE_CHECKING:
16 pass
18_PY_EXTENSIONS = {".py"}
19_TS_EXTENSIONS = {".ts", ".tsx", ".js", ".jsx"}
20_GO_EXTENSIONS = {".go"}
21_UNSUPPORTED_EXTENSIONS = {".json", ".md", ".txt"}
24def _detect_language(filepath):
25 ext = os.path.splitext(filepath)[1].lower()
26 if ext in _PY_EXTENSIONS:
27 return "python"
28 elif ext in _TS_EXTENSIONS:
29 return "typescript"
30 elif ext in _GO_EXTENSIONS:
31 return "go"
32 return None
35def _get_parser(lang):
36 if lang == "python":
37 return PythonImportParser()
38 elif lang == "typescript":
39 return TypeScriptImportParser()
40 elif lang == "go":
41 return GoImportParser()
42 return None
45def _get_resolver(lang, config=None):
46 module_resolution = None
47 if config:
48 module_resolution = config.get("ts_module_resolution")
50 if lang == "python":
51 return PythonPathResolver()
52 elif lang == "typescript":
53 return TypeScriptPathResolver(
54 module_resolution=module_resolution, filepath=None
55 )
56 elif lang == "go":
57 return GoPathResolver()
58 return None
61def _find_common_root(files):
62 if not files:
63 return ""
65 abs_files = [os.path.abspath(str(f)) for f in files]
67 if len(abs_files) == 1:
68 return os.path.dirname(abs_files[0])
70 path_parts = [p.split(os.sep) for p in abs_files]
72 common_prefix = []
73 min_len = min(len(p) for p in path_parts)
75 for i in range(min_len):
76 part = path_parts[0][i]
77 if all(p[i] == part for p in path_parts):
78 common_prefix.append(part)
79 else:
80 break
82 if not common_prefix:
83 return os.sep
85 return os.sep.join(common_prefix)
88def build_dependency_graph(files, config=None):
89 abs_files = [os.path.abspath(str(f)) for f in files]
90 edges = {f: [] for f in abs_files}
91 local_files = set(abs_files)
93 if not files:
94 return DependencyGraph({}, local_files, "")
95 root_dir = _find_common_root(abs_files)
97 for abs_file in abs_files:
98 lang = _detect_language(Path(abs_file))
99 if lang is None:
100 continue
102 ext = os.path.splitext(abs_file)[1].lower()
103 if ext in _UNSUPPORTED_EXTENSIONS:
104 continue
106 parser = _get_parser(lang)
107 resolver = _get_resolver(lang, config=config)
109 if parser is None or resolver is None:
110 continue
112 try:
113 source = Path(abs_file).read_text()
114 except Exception:
115 continue
117 raw_imports = parser.extract_imports(source, abs_file)
119 for imp in raw_imports:
120 imp.resolved_path = resolver.resolve(imp.raw_module, abs_file, root_dir)
122 if imp.resolved_path is not None and imp.resolved_path in local_files:
123 edges[abs_file].append(imp)
125 return DependencyGraph(edges, local_files, root_dir)
128def find_cycles(files, config=None):
129 graph = build_dependency_graph(files)
131 colors = {f: "WHITE" for f in graph.local_files}
132 cycles = []
134 def dfs(file, path, edges_in_path):
135 colors[file] = "GRAY"
136 path.append(file)
138 for import_info in graph.edges.get(file, []):
139 target = import_info.resolved_path
140 if target is None or target not in graph.local_files:
141 continue
143 edges_in_path.append((file, target, import_info.line))
145 if colors[target] == "GRAY":
146 cycle_start = path.index(target)
147 cycle_edges = _extract_cycle_edges(
148 edges_in_path, cycle_start, len(path)
149 )
150 cycle = Cycle(
151 files=path[cycle_start:] + [target],
152 edges=cycle_edges,
153 depth=len(path[cycle_start:]),
154 )
155 cycles.append(cycle)
156 elif colors[target] == "WHITE":
157 dfs(target, path.copy(), edges_in_path.copy())
159 colors[file] = "BLACK"
161 for file in sorted(graph.local_files):
162 if colors[file] == "WHITE":
163 dfs(file, [], [])
165 unique_cycles = []
166 seen = set()
167 for cycle in cycles:
168 cycle_key = tuple(sorted(cycle.files))
169 if cycle_key not in seen:
170 seen.add(cycle_key)
171 unique_cycles.append(cycle)
173 return unique_cycles
176def _extract_cycle_edges(edges_in_path, cycle_start, path_len):
177 return edges_in_path[cycle_start:]
180def find_cycles_tarjan(files, config=None):
181 graph = build_dependency_graph(files)
183 index = 0
184 indices = {}
185 lowlink = {}
186 stack = []
187 on_stack = set()
188 sccs = []
190 def strongconnect(node):
191 nonlocal index
192 indices[node] = index
193 lowlink[node] = index
194 index += 1
195 stack.append(node)
196 on_stack.add(node)
198 for import_info in graph.edges.get(node, []):
199 target = import_info.resolved_path
200 if target is None or target not in graph.local_files:
201 continue
203 if target not in indices:
204 strongconnect(target)
205 lowlink[node] = min(lowlink[node], lowlink[target])
206 elif target in on_stack:
207 lowlink[node] = min(lowlink[node], indices[target])
209 if lowlink[node] == indices[node]:
210 scc = []
211 while True:
212 popped = stack.pop()
213 on_stack.remove(popped)
214 scc.append(popped)
215 if popped == node:
216 break
217 sccs.append(scc)
219 for node in sorted(graph.local_files):
220 if node not in indices:
221 strongconnect(node)
223 cycles = []
224 for scc in sccs:
225 if len(scc) > 1:
226 scc_set = set(scc)
227 subgraph_edges = {
228 node: [
229 imp
230 for imp in graph.edges.get(node, [])
231 if imp.resolved_path in scc_set
232 ]
233 for node in scc
234 }
235 colors = {node: "WHITE" for node in scc}
237 def dfs_scc(node, path, edges_in_path):
238 colors[node] = "GRAY"
239 path.append(node)
241 for imp in subgraph_edges.get(node, []):
242 target = imp.resolved_path
243 if target is None:
244 continue
246 edges_in_path.append((node, target, imp.line))
248 if colors[target] == "GRAY":
249 cycle_start = path.index(target)
250 cycle = Cycle(
251 files=path[cycle_start:] + [target],
252 edges=edges_in_path[cycle_start:],
253 depth=len(path[cycle_start:]),
254 )
255 cycles.append(cycle)
256 elif colors[target] == "WHITE":
257 dfs_scc(target, path.copy(), edges_in_path.copy())
259 colors[node] = "BLACK"
261 for node in sorted(scc):
262 if colors[node] == "WHITE":
263 dfs_scc(node, [], [])
265 unique_cycles = []
266 seen = set()
267 for cycle in cycles:
268 cycle_key = tuple(sorted(cycle.files))
269 if cycle_key not in seen:
270 seen.add(cycle_key)
271 unique_cycles.append(cycle)
273 return unique_cycles