Coverage for metrics / duplication / core.py: 99%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-08 15:04 -0800

1"""Core algorithm for AST-based code duplication detection (Type 1 & Type 2).""" 

2 

3from __future__ import annotations 

4 

5import hashlib 

6from collections import defaultdict 

7 

8from metrics.duplication.config import LanguageConfig 

9from models import DuplicateCluster, FileComplexity, FunctionComplexity 

10 

11MIN_NODE_COUNT = 5 

12 

13 

14def _is_docstring(node) -> bool: 

15 """Check if a node is a Python-style docstring (expression_statement containing only a string).""" 

16 if node.type != "expression_statement": 

17 return False 

18 named = [c for c in node.children if c.is_named] 

19 return len(named) == 1 and named[0].type == "string" 

20 

21 

22def _collect_named_leaves(node, config: LanguageConfig, acc: list) -> None: 

23 """DFS traversal collecting (node_type, node_text) for named leaf nodes, skipping comments and docstrings.""" 

24 if node.type in config.comment_types: 

25 return 

26 if _is_docstring(node): 

27 return 

28 if node.child_count == 0 and node.is_named: 

29 acc.append((node.type, node.text.decode())) 

30 return 

31 for child in node.children: 

32 _collect_named_leaves(child, config, acc) 

33 

34 

35_FUNC_NAME_TYPES = {"identifier", "property_identifier", "field_identifier", "name"} 

36 

37 

38def _collect_body_leaves(func_node, config: LanguageConfig) -> list[tuple[str, str]]: 

39 """Collect named leaves from a function node, excluding the function's own name.""" 

40 leaves: list[tuple[str, str]] = [] 

41 skipped_name = False 

42 for child in func_node.children: 

43 if ( 

44 not skipped_name 

45 and child.type in _FUNC_NAME_TYPES 

46 and child.child_count == 0 

47 ): 

48 skipped_name = True 

49 continue 

50 _collect_named_leaves(child, config, leaves) 

51 return leaves 

52 

53 

54def compute_type1_hash(func_node, config: LanguageConfig) -> str | None: 

55 """Hash a function's named-leaf sequence with exact text values (Type 1 / exact clone).""" 

56 leaves = _collect_body_leaves(func_node, config) 

57 if len(leaves) < MIN_NODE_COUNT: 

58 return None 

59 raw = "|".join(f"{ntype}:{text}" for ntype, text in leaves) 

60 return hashlib.sha256(raw.encode()).hexdigest() 

61 

62 

63def compute_type2_hash(func_node, config: LanguageConfig) -> str | None: 

64 """Hash with identifiers as positional placeholders and literals as LIT (Type 2 / parametric clone).""" 

65 leaves = _collect_body_leaves(func_node, config) 

66 if len(leaves) < MIN_NODE_COUNT: 

67 return None 

68 var_map: dict[str, str] = {} 

69 tokens: list[str] = [] 

70 for ntype, text in leaves: 

71 if ntype in config.identifier_types: 

72 if text not in var_map: 

73 var_map[text] = f"VAR_{len(var_map)}" 

74 tokens.append(f"{ntype}:{var_map[text]}") 

75 elif ntype in config.literal_types: 

76 tokens.append(f"{ntype}:LIT") 

77 else: 

78 tokens.append(f"{ntype}:{text}") 

79 raw = "|".join(tokens) 

80 return hashlib.sha256(raw.encode()).hexdigest() 

81 

82 

83def count_statements(func_node) -> int: 

84 """Count top-level executable statements in a function body for Type 2 filtering.""" 

85 block_types = {"block", "function_body", "statement_block"} 

86 for child in func_node.children: 

87 if child.type in block_types: 

88 stmt_count = 0 

89 for stmt in child.children: 

90 if stmt.is_named and stmt.type.endswith("_statement"): 

91 if stmt.type == "expression_statement" and _is_docstring(stmt): 

92 continue 

93 stmt_count += 1 

94 elif stmt.type == "statement_list": 

95 for sub_stmt in stmt.children: 

96 if sub_stmt.is_named and sub_stmt.type.endswith("_statement"): 

97 if ( 

98 sub_stmt.type == "expression_statement" 

99 and _is_docstring(sub_stmt) 

100 ): 

101 continue 

102 stmt_count += 1 

103 return stmt_count 

104 return 0 

105 

106 

107def find_duplicates(results: list[FileComplexity]) -> list[DuplicateCluster]: 

108 """Group functions by hash across files; return clusters with 2+ members.""" 

109 type1_groups: dict[str, list[tuple[str, FunctionComplexity]]] = defaultdict(list) 

110 type2_groups: dict[str, list[tuple[str, FunctionComplexity]]] = defaultdict(list) 

111 

112 for result in results: 

113 for func in result.functions: 

114 if func.type1_hash: 

115 type1_groups[func.type1_hash].append((result.path, func)) 

116 if func.type2_hash: 

117 type2_groups[func.type2_hash].append((result.path, func)) 

118 

119 clusters: list[DuplicateCluster] = [] 

120 type1_hashes = set() 

121 

122 for h, members in type1_groups.items(): 

123 if len(members) >= 2: 

124 clusters.append(DuplicateCluster(dup_type=1, hash_value=h, members=members)) 

125 type1_hashes.add(h) 

126 

127 # Type 2 clusters exclude groups already fully caught as Type 1 

128 type1_member_sets: dict[str, frozenset] = {} 

129 for c in clusters: 

130 keys = frozenset((p, f.name, f.line) for p, f in c.members) 

131 type1_member_sets[c.hash_value] = keys 

132 

133 for h, members in type2_groups.items(): 

134 if len(members) < 2: 

135 continue 

136 member_keys = frozenset((p, f.name, f.line) for p, f in members) 

137 already_covered = any( 

138 member_keys == t1_keys for t1_keys in type1_member_sets.values() 

139 ) 

140 if not already_covered: 

141 filtered = [m for m in members if m[1].stmt_count >= 2] 

142 if len(filtered) >= 2: 

143 clusters.append( 

144 DuplicateCluster(dup_type=2, hash_value=h, members=filtered) 

145 ) 

146 

147 return clusters