Coverage for src / orchestration / review_tracking.py: 9%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Review tracking issue creation for MalaOrchestrator. 

2 

3This module handles creating beads issues from low-priority (P2/P3) 

4review findings that didn't block the review but should be tracked. 

5""" 

6 

7from __future__ import annotations 

8 

9import hashlib 

10import re 

11from typing import TYPE_CHECKING 

12 

13if TYPE_CHECKING: 

14 from src.core.protocols import IssueProvider, MalaEventSink, ReviewIssueProtocol 

15 

16 

17def _get_finding_fingerprint(issue: ReviewIssueProtocol) -> str: 

18 """Generate a unique fingerprint for a single finding. 

19 

20 Returns a hex hash to ensure safe regex matching (no special characters). 

21 """ 

22 content = f"{issue.file}:{issue.line_start}:{issue.line_end}:{issue.title}" 

23 return hashlib.sha256(content.encode()).hexdigest()[:16] 

24 

25 

26def _build_findings_section( 

27 review_issues: list[ReviewIssueProtocol], 

28 start_idx: int = 1, 

29) -> tuple[str, str, list[str]]: 

30 """Build markdown sections for review findings. 

31 

32 Args: 

33 review_issues: List of review issues to format. 

34 start_idx: Starting index for finding numbering. 

35 

36 Returns: 

37 Tuple of (formatted sections string, batch dedup tag, list of individual fingerprints). 

38 """ 

39 # Build fingerprints for each finding 

40 finding_fingerprints = [_get_finding_fingerprint(issue) for issue in review_issues] 

41 sorted_fingerprints = sorted(finding_fingerprints) 

42 content_hash = hashlib.sha256("|".join(sorted_fingerprints).encode()).hexdigest()[ 

43 :12 

44 ] 

45 dedup_tag = f"review_finding:{content_hash}" 

46 

47 parts: list[str] = [] 

48 for idx, issue in enumerate(review_issues, start_idx): 

49 file_path = issue.file 

50 line_start = issue.line_start 

51 line_end = issue.line_end 

52 priority = issue.priority 

53 title = issue.title 

54 body = issue.body 

55 reviewer = issue.reviewer 

56 

57 finding_priority = f"P{priority}" if priority is not None else "P3" 

58 

59 # Build location string 

60 if line_start == line_end or line_end == 0: 

61 location = f"{file_path}:{line_start}" if file_path else "" 

62 else: 

63 location = f"{file_path}:{line_start}-{line_end}" if file_path else "" 

64 

65 parts.append(f"### Finding {idx}: {title}") 

66 parts.append("") 

67 parts.append(f"**Priority:** {finding_priority}") 

68 parts.append(f"**Reviewer:** {reviewer}") 

69 if location: 

70 parts.append(f"**Location:** {location}") 

71 if body: 

72 parts.extend(["", body]) 

73 parts.extend(["", "---", ""]) 

74 

75 return "\n".join(parts), dedup_tag, finding_fingerprints 

76 

77 

78def _extract_existing_fingerprints(description: str) -> set[str]: 

79 """Extract individual finding fingerprints from existing description. 

80 

81 Fingerprints are stored as HTML comments: <!-- fp:hex_hash --> 

82 We use a hex hash to avoid issues with special characters in titles. 

83 

84 Also supports legacy format <!-- fp:file:line:line:title --> for backwards 

85 compatibility with existing tracking issues. Legacy fingerprints are hashed 

86 to match the format used by _get_finding_fingerprint. 

87 """ 

88 # Match new hex-only format (16 hex chars) 

89 hex_pattern = r"<!-- fp:([a-f0-9]{16}) -->" 

90 hex_matches = set(re.findall(hex_pattern, description)) 

91 

92 # Match legacy format (file:line:line:title) for backwards compatibility 

93 # Legacy fingerprints contain colons and non-hex characters 

94 legacy_pattern = r"<!-- fp:([^>]+:[^>]+) -->" 

95 legacy_matches = re.findall(legacy_pattern, description) 

96 # Hash legacy fingerprints to match the format used by _get_finding_fingerprint 

97 legacy_hashes = { 

98 hashlib.sha256(m.encode()).hexdigest()[:16] for m in legacy_matches 

99 } 

100 

101 return hex_matches | legacy_hashes 

102 

103 

104def _update_header_count(description: str, new_count: int) -> str: 

105 """Update the finding count in the description header using regex. 

106 

107 Handles both singular and plural forms. Targets the specific header pattern 

108 to avoid matching similar text in finding bodies. 

109 """ 

110 plural_s = "s" if new_count != 1 else "" 

111 # Match specifically "consolidates N non-blocking finding(s)" to avoid false matches 

112 pattern = r"consolidates \d+ non-blocking findings?" 

113 replacement = f"consolidates {new_count} non-blocking finding{plural_s}" 

114 return re.sub(pattern, replacement, description) 

115 

116 

117async def create_review_tracking_issues( 

118 beads: IssueProvider, 

119 event_sink: MalaEventSink, 

120 source_issue_id: str, 

121 review_issues: list[ReviewIssueProtocol], 

122 parent_epic_id: str | None = None, 

123) -> None: 

124 """Create or update a beads issue from P2/P3 review findings. 

125 

126 All low-priority issues that didn't block the review are consolidated 

127 into a single tracking issue per source issue. If a tracking issue already 

128 exists for this source, new findings are appended to it. 

129 

130 Args: 

131 beads: Issue provider for creating/updating issues. 

132 event_sink: Event sink for warnings. 

133 source_issue_id: The issue ID that triggered the review. 

134 review_issues: List of ReviewIssueProtocol objects from the review. 

135 parent_epic_id: Optional parent epic ID to attach new tracking issues to. 

136 """ 

137 if not review_issues: 

138 return 

139 

140 # Build the new findings section and get a content-based dedup tag 

141 new_findings_section, new_dedup_tag, new_fingerprints = _build_findings_section( 

142 review_issues 

143 ) 

144 

145 # Check for existing tracking issue for this source 

146 source_tag = f"source:{source_issue_id}" 

147 existing_id = await beads.find_issue_by_tag_async(source_tag) 

148 

149 if existing_id: 

150 # Fetch existing description - skip update on failure (Finding 4) 

151 existing_desc = await beads.get_issue_description_async(existing_id) 

152 if existing_desc is None: 

153 event_sink.on_warning( 

154 f"Failed to fetch description for {existing_id}, skipping update", 

155 agent_id=source_issue_id, 

156 ) 

157 return 

158 

159 # Check batch-level dedup first (fast path) 

160 if new_dedup_tag in existing_desc: 

161 return 

162 

163 # Finding 6: Filter out individually duplicate findings 

164 existing_fingerprints = _extract_existing_fingerprints(existing_desc) 

165 unique_issues = [ 

166 issue 

167 for issue in review_issues 

168 if _get_finding_fingerprint(issue) not in existing_fingerprints 

169 ] 

170 

171 if not unique_issues: 

172 # All findings already exist individually 

173 return 

174 

175 # Append new findings to existing issue 

176 # Count existing findings to continue numbering 

177 existing_finding_count = existing_desc.count("### Finding ") 

178 new_findings_section, new_dedup_tag, unique_fingerprints = ( 

179 _build_findings_section(unique_issues, start_idx=existing_finding_count + 1) 

180 ) 

181 

182 # Build updated description with proper count (Findings 2, 5) 

183 total_count = existing_finding_count + len(unique_issues) 

184 updated_desc = _update_header_count(existing_desc, total_count) 

185 

186 # Add fingerprint markers for individual dedup (Finding 6) 

187 fingerprint_comments = "\n".join( 

188 f"<!-- fp:{fp} -->" for fp in unique_fingerprints 

189 ) 

190 

191 # Append new findings and dedup tag before the end 

192 updated_desc = ( 

193 updated_desc.rstrip() 

194 + f"\n\n{new_findings_section}\n{fingerprint_comments}\n<!-- {new_dedup_tag} -->\n" 

195 ) 

196 

197 # Finding 3: Compute new highest priority across all findings 

198 new_priorities = [i.priority for i in unique_issues if i.priority is not None] 

199 new_highest = min(new_priorities) if new_priorities else 3 

200 

201 # Extract current highest priority from description 

202 priority_match = re.search(r"\*\*Highest priority:\*\* P(\d+)", existing_desc) 

203 current_highest = int(priority_match.group(1)) if priority_match else 3 

204 

205 # Update if new findings have higher priority (lower number) 

206 final_highest = min(current_highest, new_highest) 

207 if final_highest != current_highest: 

208 updated_desc = re.sub( 

209 r"\*\*Highest priority:\*\* P\d+", 

210 f"**Highest priority:** P{final_highest}", 

211 updated_desc, 

212 ) 

213 

214 # Finding 3: Update issue title 

215 plural_s = "s" if total_count != 1 else "" 

216 new_title = f"[Review] {total_count} non-blocking finding{plural_s} from {source_issue_id}" 

217 

218 # Finding 1: Check return value of update 

219 update_success = await beads.update_issue_description_async( 

220 existing_id, updated_desc 

221 ) 

222 if not update_success: 

223 event_sink.on_warning( 

224 f"Failed to update tracking issue {existing_id}", 

225 agent_id=source_issue_id, 

226 ) 

227 return 

228 

229 # Update title and priority (Finding 3) 

230 title_update_success = await beads.update_issue_async( 

231 existing_id, 

232 title=new_title, 

233 priority=f"P{final_highest}", 

234 ) 

235 if not title_update_success: 

236 event_sink.on_warning( 

237 f"Failed to update title/priority for tracking issue {existing_id}", 

238 agent_id=source_issue_id, 

239 ) 

240 

241 event_sink.on_warning( 

242 f"Appended {len(unique_issues)} finding{'s' if len(unique_issues) > 1 else ''} to tracking issue {existing_id}", 

243 agent_id=source_issue_id, 

244 ) 

245 return 

246 

247 # No existing issue - create a new one 

248 # Determine highest priority among findings (lowest number = highest priority) 

249 priorities = [i.priority for i in review_issues if i.priority is not None] 

250 highest_priority = min(priorities) if priorities else 3 

251 priority_str = f"P{highest_priority}" 

252 

253 # Build consolidated issue title 

254 issue_count = len(review_issues) 

255 issue_title = f"[Review] {issue_count} non-blocking finding{'s' if issue_count > 1 else ''} from {source_issue_id}" 

256 

257 # Add fingerprint markers for individual dedup (Finding 6) 

258 fingerprint_comments = "\n".join(f"<!-- fp:{fp} -->" for fp in new_fingerprints) 

259 

260 # Build description with all findings 

261 description_parts = [ 

262 "## Review Findings", 

263 "", 

264 f"This issue consolidates {issue_count} non-blocking finding{'s' if issue_count > 1 else ''} from code review.", 

265 "", 

266 f"**Source issue:** {source_issue_id}", 

267 f"**Highest priority:** {priority_str}", 

268 "", 

269 "---", 

270 "", 

271 new_findings_section, 

272 fingerprint_comments, 

273 f"<!-- {new_dedup_tag} -->", 

274 ] 

275 

276 description = "\n".join(description_parts) 

277 

278 # Tags for tracking 

279 tags = [ 

280 "auto_generated", 

281 "review_finding", 

282 source_tag, 

283 ] 

284 

285 new_issue_id = await beads.create_issue_async( 

286 title=issue_title, 

287 description=description, 

288 priority=priority_str, 

289 tags=tags, 

290 parent_id=parent_epic_id, 

291 ) 

292 if new_issue_id: 

293 event_sink.on_warning( 

294 f"Created tracking issue {new_issue_id} for {issue_count} {priority_str}+ review finding{'s' if issue_count > 1 else ''}", 

295 agent_id=source_issue_id, 

296 )