Coverage for metrics / duplication / core.py: 99%
100 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-08 15:04 -0800
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-08 15:04 -0800
1"""Core algorithm for AST-based code duplication detection (Type 1 & Type 2)."""
3from __future__ import annotations
5import hashlib
6from collections import defaultdict
8from metrics.duplication.config import LanguageConfig
9from models import DuplicateCluster, FileComplexity, FunctionComplexity
11MIN_NODE_COUNT = 5
14def _is_docstring(node) -> bool:
15 """Check if a node is a Python-style docstring (expression_statement containing only a string)."""
16 if node.type != "expression_statement":
17 return False
18 named = [c for c in node.children if c.is_named]
19 return len(named) == 1 and named[0].type == "string"
22def _collect_named_leaves(node, config: LanguageConfig, acc: list) -> None:
23 """DFS traversal collecting (node_type, node_text) for named leaf nodes, skipping comments and docstrings."""
24 if node.type in config.comment_types:
25 return
26 if _is_docstring(node):
27 return
28 if node.child_count == 0 and node.is_named:
29 acc.append((node.type, node.text.decode()))
30 return
31 for child in node.children:
32 _collect_named_leaves(child, config, acc)
35_FUNC_NAME_TYPES = {"identifier", "property_identifier", "field_identifier", "name"}
38def _collect_body_leaves(func_node, config: LanguageConfig) -> list[tuple[str, str]]:
39 """Collect named leaves from a function node, excluding the function's own name."""
40 leaves: list[tuple[str, str]] = []
41 skipped_name = False
42 for child in func_node.children:
43 if (
44 not skipped_name
45 and child.type in _FUNC_NAME_TYPES
46 and child.child_count == 0
47 ):
48 skipped_name = True
49 continue
50 _collect_named_leaves(child, config, leaves)
51 return leaves
54def compute_type1_hash(func_node, config: LanguageConfig) -> str | None:
55 """Hash a function's named-leaf sequence with exact text values (Type 1 / exact clone)."""
56 leaves = _collect_body_leaves(func_node, config)
57 if len(leaves) < MIN_NODE_COUNT:
58 return None
59 raw = "|".join(f"{ntype}:{text}" for ntype, text in leaves)
60 return hashlib.sha256(raw.encode()).hexdigest()
63def compute_type2_hash(func_node, config: LanguageConfig) -> str | None:
64 """Hash with identifiers as positional placeholders and literals as LIT (Type 2 / parametric clone)."""
65 leaves = _collect_body_leaves(func_node, config)
66 if len(leaves) < MIN_NODE_COUNT:
67 return None
68 var_map: dict[str, str] = {}
69 tokens: list[str] = []
70 for ntype, text in leaves:
71 if ntype in config.identifier_types:
72 if text not in var_map:
73 var_map[text] = f"VAR_{len(var_map)}"
74 tokens.append(f"{ntype}:{var_map[text]}")
75 elif ntype in config.literal_types:
76 tokens.append(f"{ntype}:LIT")
77 else:
78 tokens.append(f"{ntype}:{text}")
79 raw = "|".join(tokens)
80 return hashlib.sha256(raw.encode()).hexdigest()
83def count_statements(func_node) -> int:
84 """Count top-level executable statements in a function body for Type 2 filtering."""
85 block_types = {"block", "function_body", "statement_block"}
86 for child in func_node.children:
87 if child.type in block_types:
88 stmt_count = 0
89 for stmt in child.children:
90 if stmt.is_named and stmt.type.endswith("_statement"):
91 if stmt.type == "expression_statement" and _is_docstring(stmt):
92 continue
93 stmt_count += 1
94 elif stmt.type == "statement_list":
95 for sub_stmt in stmt.children:
96 if sub_stmt.is_named and sub_stmt.type.endswith("_statement"):
97 if (
98 sub_stmt.type == "expression_statement"
99 and _is_docstring(sub_stmt)
100 ):
101 continue
102 stmt_count += 1
103 return stmt_count
104 return 0
107def find_duplicates(results: list[FileComplexity]) -> list[DuplicateCluster]:
108 """Group functions by hash across files; return clusters with 2+ members."""
109 type1_groups: dict[str, list[tuple[str, FunctionComplexity]]] = defaultdict(list)
110 type2_groups: dict[str, list[tuple[str, FunctionComplexity]]] = defaultdict(list)
112 for result in results:
113 for func in result.functions:
114 if func.type1_hash:
115 type1_groups[func.type1_hash].append((result.path, func))
116 if func.type2_hash:
117 type2_groups[func.type2_hash].append((result.path, func))
119 clusters: list[DuplicateCluster] = []
120 type1_hashes = set()
122 for h, members in type1_groups.items():
123 if len(members) >= 2:
124 clusters.append(DuplicateCluster(dup_type=1, hash_value=h, members=members))
125 type1_hashes.add(h)
127 # Type 2 clusters exclude groups already fully caught as Type 1
128 type1_member_sets: dict[str, frozenset] = {}
129 for c in clusters:
130 keys = frozenset((p, f.name, f.line) for p, f in c.members)
131 type1_member_sets[c.hash_value] = keys
133 for h, members in type2_groups.items():
134 if len(members) < 2:
135 continue
136 member_keys = frozenset((p, f.name, f.line) for p, f in members)
137 already_covered = any(
138 member_keys == t1_keys for t1_keys in type1_member_sets.values()
139 )
140 if not already_covered:
141 filtered = [m for m in members if m[1].stmt_count >= 2]
142 if len(filtered) >= 2:
143 clusters.append(
144 DuplicateCluster(dup_type=2, hash_value=h, members=filtered)
145 )
147 return clusters