Coverage for src / orchestration / review_tracking.py: 9%
100 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Review tracking issue creation for MalaOrchestrator.
3This module handles creating beads issues from low-priority (P2/P3)
4review findings that didn't block the review but should be tracked.
5"""
7from __future__ import annotations
9import hashlib
10import re
11from typing import TYPE_CHECKING
13if TYPE_CHECKING:
14 from src.core.protocols import IssueProvider, MalaEventSink, ReviewIssueProtocol
17def _get_finding_fingerprint(issue: ReviewIssueProtocol) -> str:
18 """Generate a unique fingerprint for a single finding.
20 Returns a hex hash to ensure safe regex matching (no special characters).
21 """
22 content = f"{issue.file}:{issue.line_start}:{issue.line_end}:{issue.title}"
23 return hashlib.sha256(content.encode()).hexdigest()[:16]
26def _build_findings_section(
27 review_issues: list[ReviewIssueProtocol],
28 start_idx: int = 1,
29) -> tuple[str, str, list[str]]:
30 """Build markdown sections for review findings.
32 Args:
33 review_issues: List of review issues to format.
34 start_idx: Starting index for finding numbering.
36 Returns:
37 Tuple of (formatted sections string, batch dedup tag, list of individual fingerprints).
38 """
39 # Build fingerprints for each finding
40 finding_fingerprints = [_get_finding_fingerprint(issue) for issue in review_issues]
41 sorted_fingerprints = sorted(finding_fingerprints)
42 content_hash = hashlib.sha256("|".join(sorted_fingerprints).encode()).hexdigest()[
43 :12
44 ]
45 dedup_tag = f"review_finding:{content_hash}"
47 parts: list[str] = []
48 for idx, issue in enumerate(review_issues, start_idx):
49 file_path = issue.file
50 line_start = issue.line_start
51 line_end = issue.line_end
52 priority = issue.priority
53 title = issue.title
54 body = issue.body
55 reviewer = issue.reviewer
57 finding_priority = f"P{priority}" if priority is not None else "P3"
59 # Build location string
60 if line_start == line_end or line_end == 0:
61 location = f"{file_path}:{line_start}" if file_path else ""
62 else:
63 location = f"{file_path}:{line_start}-{line_end}" if file_path else ""
65 parts.append(f"### Finding {idx}: {title}")
66 parts.append("")
67 parts.append(f"**Priority:** {finding_priority}")
68 parts.append(f"**Reviewer:** {reviewer}")
69 if location:
70 parts.append(f"**Location:** {location}")
71 if body:
72 parts.extend(["", body])
73 parts.extend(["", "---", ""])
75 return "\n".join(parts), dedup_tag, finding_fingerprints
78def _extract_existing_fingerprints(description: str) -> set[str]:
79 """Extract individual finding fingerprints from existing description.
81 Fingerprints are stored as HTML comments: <!-- fp:hex_hash -->
82 We use a hex hash to avoid issues with special characters in titles.
84 Also supports legacy format <!-- fp:file:line:line:title --> for backwards
85 compatibility with existing tracking issues. Legacy fingerprints are hashed
86 to match the format used by _get_finding_fingerprint.
87 """
88 # Match new hex-only format (16 hex chars)
89 hex_pattern = r"<!-- fp:([a-f0-9]{16}) -->"
90 hex_matches = set(re.findall(hex_pattern, description))
92 # Match legacy format (file:line:line:title) for backwards compatibility
93 # Legacy fingerprints contain colons and non-hex characters
94 legacy_pattern = r"<!-- fp:([^>]+:[^>]+) -->"
95 legacy_matches = re.findall(legacy_pattern, description)
96 # Hash legacy fingerprints to match the format used by _get_finding_fingerprint
97 legacy_hashes = {
98 hashlib.sha256(m.encode()).hexdigest()[:16] for m in legacy_matches
99 }
101 return hex_matches | legacy_hashes
104def _update_header_count(description: str, new_count: int) -> str:
105 """Update the finding count in the description header using regex.
107 Handles both singular and plural forms. Targets the specific header pattern
108 to avoid matching similar text in finding bodies.
109 """
110 plural_s = "s" if new_count != 1 else ""
111 # Match specifically "consolidates N non-blocking finding(s)" to avoid false matches
112 pattern = r"consolidates \d+ non-blocking findings?"
113 replacement = f"consolidates {new_count} non-blocking finding{plural_s}"
114 return re.sub(pattern, replacement, description)
117async def create_review_tracking_issues(
118 beads: IssueProvider,
119 event_sink: MalaEventSink,
120 source_issue_id: str,
121 review_issues: list[ReviewIssueProtocol],
122 parent_epic_id: str | None = None,
123) -> None:
124 """Create or update a beads issue from P2/P3 review findings.
126 All low-priority issues that didn't block the review are consolidated
127 into a single tracking issue per source issue. If a tracking issue already
128 exists for this source, new findings are appended to it.
130 Args:
131 beads: Issue provider for creating/updating issues.
132 event_sink: Event sink for warnings.
133 source_issue_id: The issue ID that triggered the review.
134 review_issues: List of ReviewIssueProtocol objects from the review.
135 parent_epic_id: Optional parent epic ID to attach new tracking issues to.
136 """
137 if not review_issues:
138 return
140 # Build the new findings section and get a content-based dedup tag
141 new_findings_section, new_dedup_tag, new_fingerprints = _build_findings_section(
142 review_issues
143 )
145 # Check for existing tracking issue for this source
146 source_tag = f"source:{source_issue_id}"
147 existing_id = await beads.find_issue_by_tag_async(source_tag)
149 if existing_id:
150 # Fetch existing description - skip update on failure (Finding 4)
151 existing_desc = await beads.get_issue_description_async(existing_id)
152 if existing_desc is None:
153 event_sink.on_warning(
154 f"Failed to fetch description for {existing_id}, skipping update",
155 agent_id=source_issue_id,
156 )
157 return
159 # Check batch-level dedup first (fast path)
160 if new_dedup_tag in existing_desc:
161 return
163 # Finding 6: Filter out individually duplicate findings
164 existing_fingerprints = _extract_existing_fingerprints(existing_desc)
165 unique_issues = [
166 issue
167 for issue in review_issues
168 if _get_finding_fingerprint(issue) not in existing_fingerprints
169 ]
171 if not unique_issues:
172 # All findings already exist individually
173 return
175 # Append new findings to existing issue
176 # Count existing findings to continue numbering
177 existing_finding_count = existing_desc.count("### Finding ")
178 new_findings_section, new_dedup_tag, unique_fingerprints = (
179 _build_findings_section(unique_issues, start_idx=existing_finding_count + 1)
180 )
182 # Build updated description with proper count (Findings 2, 5)
183 total_count = existing_finding_count + len(unique_issues)
184 updated_desc = _update_header_count(existing_desc, total_count)
186 # Add fingerprint markers for individual dedup (Finding 6)
187 fingerprint_comments = "\n".join(
188 f"<!-- fp:{fp} -->" for fp in unique_fingerprints
189 )
191 # Append new findings and dedup tag before the end
192 updated_desc = (
193 updated_desc.rstrip()
194 + f"\n\n{new_findings_section}\n{fingerprint_comments}\n<!-- {new_dedup_tag} -->\n"
195 )
197 # Finding 3: Compute new highest priority across all findings
198 new_priorities = [i.priority for i in unique_issues if i.priority is not None]
199 new_highest = min(new_priorities) if new_priorities else 3
201 # Extract current highest priority from description
202 priority_match = re.search(r"\*\*Highest priority:\*\* P(\d+)", existing_desc)
203 current_highest = int(priority_match.group(1)) if priority_match else 3
205 # Update if new findings have higher priority (lower number)
206 final_highest = min(current_highest, new_highest)
207 if final_highest != current_highest:
208 updated_desc = re.sub(
209 r"\*\*Highest priority:\*\* P\d+",
210 f"**Highest priority:** P{final_highest}",
211 updated_desc,
212 )
214 # Finding 3: Update issue title
215 plural_s = "s" if total_count != 1 else ""
216 new_title = f"[Review] {total_count} non-blocking finding{plural_s} from {source_issue_id}"
218 # Finding 1: Check return value of update
219 update_success = await beads.update_issue_description_async(
220 existing_id, updated_desc
221 )
222 if not update_success:
223 event_sink.on_warning(
224 f"Failed to update tracking issue {existing_id}",
225 agent_id=source_issue_id,
226 )
227 return
229 # Update title and priority (Finding 3)
230 title_update_success = await beads.update_issue_async(
231 existing_id,
232 title=new_title,
233 priority=f"P{final_highest}",
234 )
235 if not title_update_success:
236 event_sink.on_warning(
237 f"Failed to update title/priority for tracking issue {existing_id}",
238 agent_id=source_issue_id,
239 )
241 event_sink.on_warning(
242 f"Appended {len(unique_issues)} finding{'s' if len(unique_issues) > 1 else ''} to tracking issue {existing_id}",
243 agent_id=source_issue_id,
244 )
245 return
247 # No existing issue - create a new one
248 # Determine highest priority among findings (lowest number = highest priority)
249 priorities = [i.priority for i in review_issues if i.priority is not None]
250 highest_priority = min(priorities) if priorities else 3
251 priority_str = f"P{highest_priority}"
253 # Build consolidated issue title
254 issue_count = len(review_issues)
255 issue_title = f"[Review] {issue_count} non-blocking finding{'s' if issue_count > 1 else ''} from {source_issue_id}"
257 # Add fingerprint markers for individual dedup (Finding 6)
258 fingerprint_comments = "\n".join(f"<!-- fp:{fp} -->" for fp in new_fingerprints)
260 # Build description with all findings
261 description_parts = [
262 "## Review Findings",
263 "",
264 f"This issue consolidates {issue_count} non-blocking finding{'s' if issue_count > 1 else ''} from code review.",
265 "",
266 f"**Source issue:** {source_issue_id}",
267 f"**Highest priority:** {priority_str}",
268 "",
269 "---",
270 "",
271 new_findings_section,
272 fingerprint_comments,
273 f"<!-- {new_dedup_tag} -->",
274 ]
276 description = "\n".join(description_parts)
278 # Tags for tracking
279 tags = [
280 "auto_generated",
281 "review_finding",
282 source_tag,
283 ]
285 new_issue_id = await beads.create_issue_async(
286 title=issue_title,
287 description=description,
288 priority=priority_str,
289 tags=tags,
290 parent_id=parent_epic_id,
291 )
292 if new_issue_id:
293 event_sink.on_warning(
294 f"Created tracking issue {new_issue_id} for {issue_count} {priority_str}+ review finding{'s' if issue_count > 1 else ''}",
295 agent_id=source_issue_id,
296 )