Coverage for src / infra / epic_verifier.py: 16%
309 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Epic verification for ensuring epic acceptance criteria are met.
3This module provides the EpicVerifier class which verifies that all code changes
4made for issues under an epic collectively meet the epic's acceptance criteria
5before allowing the epic to close.
7Key components:
8- EpicVerifier: Main verification orchestrator
9- ClaudeEpicVerificationModel: Claude-based implementation of EpicVerificationModel protocol
11Design principles:
12- Scoped commits: Only include commits linked to child issues (by bd-<id>: prefix)
13- Agent-driven exploration: Verification agent explores repo using commit list
14- Remediation issues: Auto-create with deduplication for unmet criteria
15"""
17from __future__ import annotations
19import asyncio
20import hashlib
21import json
22import os
23import re
24from contextlib import asynccontextmanager
25from dataclasses import dataclass
26from pathlib import Path
27from typing import TYPE_CHECKING
29from src.core.models import (
30 EpicVerdict,
31 EpicVerificationResult,
32 RetryConfig,
33 UnmetCriterion,
34)
35from src.infra.epic_scope import EpicScopeAnalyzer
37if TYPE_CHECKING:
38 from collections.abc import AsyncIterator
40 from src.core.protocols import EpicVerificationModel, LockManagerPort, MalaEventSink
41 from src.infra.clients.beads_client import BeadsClient
42 from src.infra.tools.command_runner import CommandRunner
44# Spec path patterns from docs (case-insensitive)
45SPEC_PATH_PATTERNS = [
46 r"[Ss]ee\s+(specs/[\w/-]+\.(?:md|MD))", # "See specs/foo/bar.md"
47 r"[Ss]pec:\s*(specs/[\w/-]+\.(?:md|MD))", # "Spec: specs/foo.md"
48 r"\[(specs/[\w/-]+\.(?:md|MD))\]", # "[specs/foo.md]"
49 r"(?:^|[\s(])(specs/[\w/-]+\.(?:md|MD))(?:\s|[.,;:!?)]|$)", # Bare "specs/foo.md" or "(specs/foo.md)"
50]
53# Lock timeout for epic verification (5 minutes)
54EPIC_VERIFY_LOCK_TIMEOUT_SECONDS = 300
57@asynccontextmanager
58async def epic_verify_lock(
59 epic_id: str,
60 repo_path: Path,
61 lock_manager: LockManagerPort | None,
62) -> AsyncIterator[bool]:
63 """Acquire per-epic verification lock with automatic cleanup.
65 Yields True if lock acquired (or locking unavailable), False if timed out.
66 Caller decides how to handle False (skip, return empty result, etc.)
67 """
68 if lock_manager is None:
69 yield True
70 return
72 lock_key = f"epic_verify:{epic_id}"
73 lock_agent_id = f"epic_verifier_{os.getpid()}"
75 acquired = await asyncio.to_thread(
76 lock_manager.wait_for_lock,
77 lock_key,
78 lock_agent_id,
79 str(repo_path),
80 EPIC_VERIFY_LOCK_TIMEOUT_SECONDS,
81 )
83 try:
84 yield acquired
85 finally:
86 if acquired:
87 lock_manager.release_lock(lock_key, lock_agent_id, str(repo_path))
90def _compute_criterion_hash(criterion: str) -> str:
91 """Compute SHA256 hash of criterion text for deduplication."""
92 return hashlib.sha256(criterion.encode()).hexdigest()
95def extract_spec_paths(text: str) -> list[str]:
96 """Extract spec file paths from text using documented patterns.
98 Args:
99 text: The text to search for spec paths.
101 Returns:
102 List of unique spec paths found.
103 """
104 paths: list[str] = []
105 for pattern in SPEC_PATH_PATTERNS:
106 matches = re.findall(pattern, text, re.MULTILINE)
107 paths.extend(matches)
108 # Deduplicate while preserving order
109 seen: set[str] = set()
110 result: list[str] = []
111 for path in paths:
112 if path not in seen:
113 seen.add(path)
114 result.append(path)
115 return result
118def _load_prompt_template() -> str:
119 """Load the epic verification prompt template."""
120 # Navigate from src/infra/ to src/prompts/
121 prompt_path = Path(__file__).parent.parent / "prompts" / "epic_verification.md"
122 template = prompt_path.read_text()
123 escaped = template.replace("{", "{{").replace("}", "}}")
124 for key in ("epic_criteria", "spec_content", "commit_range", "commit_list"):
125 escaped = escaped.replace(f"{{{{{key}}}}}", f"{{{key}}}")
126 return escaped
129@dataclass
130class EpicVerificationContext:
131 """Context captured during verification for richer remediation issues."""
133 epic_description: str
134 spec_content: str | None
135 child_ids: set[str]
136 blocker_ids: set[str]
137 commit_shas: list[str]
138 commit_range: str | None
139 commit_summary: str
142def _extract_json_from_code_blocks(text: str) -> str | None:
143 """Extract JSON content from markdown code blocks.
145 This function properly handles responses with multiple code blocks by
146 extracting each code block individually and returning the first one
147 that contains valid JSON (i.e., starts with '{').
149 Args:
150 text: The raw model response text that may contain markdown code blocks.
152 Returns:
153 The JSON string extracted from the first JSON code block, or None if
154 no valid JSON code block is found.
155 """
156 # Pattern to match individual code blocks non-greedily
157 # Matches: ```json ... ``` or ``` ... ```
158 # The .*? is non-greedy and DOTALL is NOT used, so we need to handle newlines
159 # Use a pattern that matches until the closing ```
160 code_block_pattern = r"```(?:json)?\s*\n?([\s\S]*?)```"
162 matches = re.finditer(code_block_pattern, text)
164 for match in matches:
165 content = match.group(1).strip()
166 # Check if this looks like JSON (starts with '{')
167 if content.startswith("{"):
168 return content
170 return None
173class ClaudeEpicVerificationModel:
174 """Claude-based implementation of EpicVerificationModel protocol.
176 Uses the Claude Agent SDK to verify epic acceptance criteria
177 against scoped commits, matching the main agent execution path.
178 """
180 # Default model for epic verification
181 DEFAULT_MODEL = "claude-sonnet-4-20250514"
183 def __init__(
184 self,
185 model: str | None = None,
186 timeout_ms: int = 300000,
187 repo_path: Path | None = None,
188 retry_config: RetryConfig | None = None,
189 ):
190 """Initialize ClaudeEpicVerificationModel.
192 Args:
193 model: The Claude model to use for verification. Defaults to
194 DEFAULT_MODEL if not specified.
195 timeout_ms: Timeout for model calls in milliseconds. Default is
196 5 minutes (300000ms) to allow sufficient time for agent-driven
197 repository exploration on non-trivial epics.
198 repo_path: Repository path for agent execution context.
199 retry_config: Configuration retained for compatibility with prior
200 retry behavior. Currently unused but stored for future use.
201 """
202 self.model = model or self.DEFAULT_MODEL
203 self.timeout_ms = timeout_ms
204 self.repo_path = repo_path or Path.cwd()
205 self.retry_config = retry_config or RetryConfig()
206 self._prompt_template = _load_prompt_template()
208 async def verify(
209 self,
210 epic_criteria: str,
211 commit_range: str,
212 commit_list: str,
213 spec_content: str | None,
214 ) -> EpicVerdict:
215 """Verify if the commit scope satisfies the epic's acceptance criteria.
217 Args:
218 epic_criteria: The epic's acceptance criteria text.
219 commit_range: Commit range hint covering child issue commits.
220 commit_list: Authoritative list of commit SHAs to inspect.
221 spec_content: Optional content of linked spec file.
223 Returns:
224 Structured verdict with pass/fail and unmet criteria details.
226 """
227 prompt = self._prompt_template.format(
228 epic_criteria=epic_criteria,
229 spec_content=spec_content or "No spec file available.",
230 commit_range=commit_range or "Unavailable.",
231 commit_list=commit_list or "No commits found.",
232 )
234 response_text = await self._verify_with_agent_sdk(prompt)
235 return self._parse_verdict(response_text)
237 async def _verify_with_agent_sdk(self, prompt: str) -> str:
238 """Verify using Claude Agent SDK."""
239 try:
240 from claude_agent_sdk import (
241 AssistantMessage,
242 ClaudeAgentOptions,
243 ClaudeSDKClient,
244 TextBlock,
245 )
246 except ImportError as exc:
247 return json.dumps(
248 {
249 "passed": False,
250 "confidence": 0.0,
251 "reasoning": (
252 "claude_agent_sdk is not installed; epic verification "
253 f"requires the agent SDK ({exc})"
254 ),
255 "unmet_criteria": [],
256 }
257 )
259 options = ClaudeAgentOptions(
260 cwd=str(self.repo_path),
261 permission_mode="bypassPermissions",
262 model=self.model,
263 system_prompt={"type": "preset", "preset": "claude_code"},
264 setting_sources=["project", "user"],
265 mcp_servers={},
266 allowed_tools=[
267 # Only these tools are permitted; all others (Edit, Write, etc.)
268 # are blocked by omission. Bash is needed for git commands.
269 "Bash",
270 "Glob",
271 "Grep",
272 "Read",
273 "Task",
274 ],
275 env=dict(os.environ),
276 )
278 response_chunks: list[str] = []
279 async with ClaudeSDKClient(options=options) as client:
280 async with asyncio.timeout(self.timeout_ms / 1000):
281 await client.query(prompt)
282 async for message in client.receive_response():
283 if isinstance(message, AssistantMessage):
284 for block in message.content:
285 if isinstance(block, TextBlock):
286 response_chunks.append(block.text)
287 # Note: ResultMessage.result (tool outputs) are intentionally
288 # excluded to avoid polluting the response with JSON that may
289 # appear in tool outputs (e.g., file contents). The final
290 # verdict JSON should come from the assistant's text response.
292 return "".join(response_chunks).strip()
294 def _parse_verdict(self, response_text: str) -> EpicVerdict:
295 """Parse model response into EpicVerdict.
297 Uses a robust code block parser that handles responses with multiple
298 markdown code blocks by extracting each block individually, avoiding
299 the issue where a greedy regex could span multiple blocks.
301 Args:
302 response_text: The raw model response text.
304 Returns:
305 Parsed EpicVerdict.
306 """
307 # Extract JSON from code blocks using the robust non-greedy parser
308 json_str = _extract_json_from_code_blocks(response_text)
310 if json_str is None:
311 # Try to find raw JSON object (not in code block)
312 json_match = re.search(
313 r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}", response_text, re.DOTALL
314 )
315 if json_match:
316 json_str = json_match.group(0)
317 else:
318 # Fallback: assume entire response is JSON
319 json_str = response_text
321 try:
322 data = json.loads(json_str)
323 except json.JSONDecodeError:
324 # If parsing fails, return a low-confidence failure
325 return EpicVerdict(
326 passed=False,
327 unmet_criteria=[],
328 confidence=0.0,
329 reasoning=f"Failed to parse model response: {response_text[:500]}",
330 )
332 unmet_criteria = []
333 for item in data.get("unmet_criteria", []):
334 criterion = item.get("criterion", "")
335 priority = item.get("priority", 1)
336 # Accept int, float, or numeric string; cast to int, clamp to valid range 0-3
337 if isinstance(priority, (int, float)):
338 priority = max(0, min(3, int(priority)))
339 elif isinstance(priority, str) and priority.isdigit():
340 priority = max(0, min(3, int(priority)))
341 else:
342 priority = 1
343 unmet_criteria.append(
344 UnmetCriterion(
345 criterion=criterion,
346 evidence=item.get("evidence", ""),
347 priority=priority,
348 criterion_hash=_compute_criterion_hash(criterion),
349 )
350 )
352 return EpicVerdict(
353 passed=data.get("passed", False),
354 unmet_criteria=unmet_criteria,
355 confidence=data.get("confidence", 0.5),
356 reasoning=data.get("reasoning", ""),
357 )
360class EpicVerifier:
361 """Main verification orchestrator for epic acceptance criteria.
363 Gathers epic data, computes scoped commits from child issue commits,
364 invokes verification model, and creates remediation issues for unmet criteria.
365 """
367 def __init__(
368 self,
369 beads: BeadsClient,
370 model: EpicVerificationModel,
371 repo_path: Path,
372 command_runner: CommandRunner,
373 retry_config: RetryConfig | None = None,
374 lock_manager: LockManagerPort | None = None,
375 event_sink: MalaEventSink | None = None,
376 scope_analyzer: EpicScopeAnalyzer | None = None,
377 ):
378 """Initialize EpicVerifier.
380 Args:
381 beads: BeadsClient for issue operations.
382 model: EpicVerificationModel for verification.
383 repo_path: Path to the repository.
384 command_runner: Command runner for executing commands.
385 retry_config: Configuration for retry behavior.
386 lock_manager: Optional lock manager for sequential processing.
387 event_sink: Optional event sink for emitting verification lifecycle events.
388 scope_analyzer: Optional EpicScopeAnalyzer for computing scoped commits.
389 If not provided, a default instance is created.
390 """
391 self.beads = beads
392 self.model = model
393 self.repo_path = repo_path
394 self.retry_config = retry_config or RetryConfig()
395 self.lock_manager = lock_manager
396 self.event_sink = event_sink
397 self._runner = command_runner
398 self.scope_analyzer = scope_analyzer or EpicScopeAnalyzer(
399 repo_path, self._runner
400 )
402 async def verify_and_close_eligible(
403 self,
404 human_override_epic_ids: set[str] | None = None,
405 ) -> EpicVerificationResult:
406 """Check for epics eligible to close, verify each, and close those that pass.
408 This method discovers eligible epics BEFORE attempting to close them,
409 ensuring verification happens first.
411 Args:
412 human_override_epic_ids: Epic IDs to close without verification
413 (explicit human override).
415 Returns:
416 Summary of verification results.
417 """
418 human_override_epic_ids = human_override_epic_ids or set()
420 # Get eligible epics (those with all children closed)
421 eligible_epics = await self._get_eligible_epics()
423 # Aggregate results from individual verifications
424 all_verdicts: dict[str, EpicVerdict] = {}
425 all_remediation_issues: list[str] = []
426 verified_count = 0
427 passed_count = 0
428 failed_count = 0
430 for epic_id in eligible_epics:
431 is_override = epic_id in human_override_epic_ids
432 result = await self.verify_epic_with_options(
433 epic_id,
434 human_override=is_override,
435 require_eligible=False, # Already filtered by _get_eligible_epics
436 close_epic=True,
437 )
438 # Aggregate result counts and verdicts
439 all_verdicts.update(result.verdicts)
440 all_remediation_issues.extend(result.remediation_issues_created)
441 verified_count += result.verified_count
442 passed_count += result.passed_count
443 failed_count += result.failed_count
445 return EpicVerificationResult(
446 verified_count=verified_count,
447 passed_count=passed_count,
448 failed_count=failed_count,
449 verdicts=all_verdicts,
450 remediation_issues_created=all_remediation_issues,
451 )
453 async def verify_epic(self, epic_id: str) -> EpicVerdict:
454 """Verify a single epic against its acceptance criteria.
456 Args:
457 epic_id: The epic ID to verify.
459 Returns:
460 EpicVerdict with verification result.
461 """
462 verdict, _ = await self._verify_epic_with_context(epic_id)
463 return verdict
465 async def _verify_epic_with_context(
466 self, epic_id: str
467 ) -> tuple[EpicVerdict, EpicVerificationContext | None]:
468 """Verify a single epic and return verdict plus context."""
469 # Get epic description (contains acceptance criteria)
470 epic_description = await self.beads.get_issue_description_async(epic_id)
471 if not epic_description:
472 return (
473 EpicVerdict(
474 passed=False,
475 unmet_criteria=[],
476 confidence=0.0,
477 reasoning="No acceptance criteria found for epic",
478 ),
479 None,
480 )
482 # Get child issue IDs
483 child_ids = await self.beads.get_epic_children_async(epic_id)
484 if not child_ids:
485 return (
486 EpicVerdict(
487 passed=False,
488 unmet_criteria=[],
489 confidence=0.0,
490 reasoning="No child issues found for epic",
491 ),
492 None,
493 )
495 # Get blocker issue IDs (remediation issues from previous verification runs)
496 blocker_ids = await self.beads.get_epic_blockers_async(epic_id) or set()
498 # Compute scoped commits using EpicScopeAnalyzer
499 scoped = await self.scope_analyzer.compute_scoped_commits(
500 child_ids, blocker_ids
501 )
502 if not scoped.commit_shas:
503 return (
504 EpicVerdict(
505 passed=False,
506 unmet_criteria=[],
507 confidence=0.0,
508 reasoning="No commits found for child issues",
509 ),
510 None,
511 )
513 # Extract and load spec content if referenced
514 spec_paths = extract_spec_paths(epic_description)
515 spec_content = None
516 for spec_path in spec_paths:
517 full_path = self.repo_path / spec_path
518 if full_path.exists():
519 try:
520 spec_content = await asyncio.to_thread(full_path.read_text)
521 break # Use first found spec
522 except OSError:
523 pass
525 context = EpicVerificationContext(
526 epic_description=epic_description,
527 spec_content=spec_content,
528 child_ids=child_ids,
529 blocker_ids=blocker_ids,
530 commit_shas=scoped.commit_shas,
531 commit_range=scoped.commit_range,
532 commit_summary=scoped.commit_summary,
533 )
535 # Invoke verification model with error handling
536 # Per spec: timeouts/errors should trigger human review, not abort
537 try:
538 result = await self.model.verify(
539 epic_description,
540 scoped.commit_range or "",
541 scoped.commit_summary,
542 spec_content,
543 )
544 verdict = EpicVerdict(
545 passed=result.passed,
546 unmet_criteria=[
547 UnmetCriterion(
548 criterion=c.criterion,
549 evidence=c.evidence,
550 priority=c.priority,
551 criterion_hash=c.criterion_hash,
552 )
553 for c in result.unmet_criteria
554 ],
555 confidence=result.confidence,
556 reasoning=result.reasoning,
557 )
558 except Exception as e:
559 verdict = EpicVerdict(
560 passed=False,
561 unmet_criteria=[],
562 confidence=0.0,
563 reasoning=f"Model verification failed: {e}",
564 )
566 return verdict, context
568 async def _get_eligible_epics(self) -> list[str]:
569 """Get list of epics eligible for closure.
571 Returns:
572 List of epic IDs that have all children closed.
573 """
574 # Prefer bd epic status --eligible-only when supported; fall back to
575 # full status and filter eligible_for_close.
576 result = await self._runner.run_async(
577 ["bd", "epic", "status", "--eligible-only", "--json"]
578 )
579 if not result.ok:
580 result = await self._runner.run_async(["bd", "epic", "status", "--json"])
581 if not result.ok:
582 return []
583 try:
584 rows = json.loads(result.stdout)
585 eligible: list[str] = []
586 if isinstance(rows, list):
587 for row in rows:
588 if not isinstance(row, dict):
589 continue
590 if row.get("eligible_for_close"):
591 epic = row.get("epic") or {}
592 if isinstance(epic, dict):
593 epic_id = epic.get("id")
594 if epic_id:
595 eligible.append(str(epic_id))
596 return eligible
597 except json.JSONDecodeError:
598 return []
600 async def _is_epic_eligible(self, epic_id: str) -> bool:
601 """Check if a specific epic is eligible for closure.
603 An epic is eligible if all its children are closed.
605 Args:
606 epic_id: The epic ID to check.
608 Returns:
609 True if the epic is eligible for closure, False otherwise.
610 """
611 eligible_epics = await self._get_eligible_epics()
612 return epic_id in eligible_epics
614 async def verify_and_close_epic(
615 self,
616 epic_id: str,
617 human_override: bool = False,
618 ) -> EpicVerificationResult:
619 """Verify and close a single specific epic if eligible.
621 This method checks if the specified epic is eligible (all children closed),
622 then verifies it if eligible, and closes it if verification passes.
624 Args:
625 epic_id: The epic ID to verify and close.
626 human_override: If True, bypass verification and close directly.
628 Returns:
629 Summary of verification result for this epic.
630 """
631 return await self.verify_epic_with_options(
632 epic_id,
633 human_override=human_override,
634 require_eligible=True,
635 close_epic=True,
636 )
638 async def verify_epic_with_options(
639 self,
640 epic_id: str,
641 *,
642 human_override: bool = False,
643 require_eligible: bool = True,
644 close_epic: bool = True,
645 ) -> EpicVerificationResult:
646 """Verify a specific epic with optional eligibility and closing behavior.
648 Args:
649 epic_id: The epic ID to verify.
650 human_override: If True, bypass verification and (optionally) close.
651 require_eligible: If True, only run when all children are closed.
652 close_epic: If True, close the epic after a passing verification.
654 Returns:
655 Summary of verification result for this epic.
656 """
657 verdicts: dict[str, EpicVerdict] = {}
658 remediation_issues: list[str] = []
659 passed_count = 0
660 failed_count = 0
661 verified_count = 0
663 if require_eligible and not await self._is_epic_eligible(epic_id):
664 return EpicVerificationResult(
665 verified_count=0,
666 passed_count=0,
667 failed_count=0,
668 verdicts={},
669 remediation_issues_created=[],
670 )
672 if human_override:
673 verified_count = 1
674 if close_epic:
675 closed = await self.beads.close_async(epic_id)
676 if closed:
677 passed_count = 1
678 verdicts[epic_id] = EpicVerdict(
679 passed=True,
680 unmet_criteria=[],
681 confidence=1.0,
682 reasoning="Human override - bypassed verification",
683 )
684 else:
685 failed_count = 1
686 verdicts[epic_id] = EpicVerdict(
687 passed=False,
688 unmet_criteria=[],
689 confidence=0.0,
690 reasoning="Human override close failed - epic could not be closed",
691 )
692 if self.event_sink is not None:
693 self.event_sink.on_epic_verification_failed(epic_id, 0, [])
694 else:
695 verdicts[epic_id] = EpicVerdict(
696 passed=True,
697 unmet_criteria=[],
698 confidence=1.0,
699 reasoning="Human override - bypassed verification (no close)",
700 )
701 passed_count = 1
703 return EpicVerificationResult(
704 verified_count=verified_count,
705 passed_count=passed_count,
706 failed_count=failed_count,
707 verdicts=verdicts,
708 remediation_issues_created=remediation_issues,
709 )
711 async with epic_verify_lock(
712 epic_id, self.repo_path, self.lock_manager
713 ) as acquired:
714 if not acquired:
715 return EpicVerificationResult(
716 verified_count=0,
717 passed_count=0,
718 failed_count=0,
719 verdicts={},
720 remediation_issues_created=[],
721 )
723 if self.event_sink is not None:
724 self.event_sink.on_epic_verification_started(epic_id)
726 verdict, context = await self._verify_epic_with_context(epic_id)
727 verdicts[epic_id] = verdict
728 verified_count = 1
730 # Create remediation/advisory issues for any unmet criteria
731 blocking_ids: list[str] = []
732 informational_ids: list[str] = []
733 if verdict.unmet_criteria:
734 (
735 blocking_ids,
736 informational_ids,
737 ) = await self.create_remediation_issues(epic_id, verdict, context)
738 remediation_issues.extend(blocking_ids)
739 remediation_issues.extend(informational_ids)
741 # Epic fails if: has P0/P1 blocking issues OR model explicitly said passed=false
742 if blocking_ids or not verdict.passed:
743 if blocking_ids:
744 await self.add_epic_blockers(epic_id, blocking_ids)
745 failed_count = 1
746 if self.event_sink is not None:
747 self.event_sink.on_epic_verification_failed(
748 epic_id, len(blocking_ids), blocking_ids
749 )
750 else:
751 # No blocking issues and verdict.passed=True - close epic if requested (may have P2/P3 advisories)
752 if close_epic:
753 closed = await self.beads.close_async(epic_id)
754 if closed:
755 passed_count = 1
756 if self.event_sink is not None:
757 self.event_sink.on_epic_verification_passed(
758 epic_id, verdict.confidence
759 )
760 else:
761 # Close failed - treat as verification failure
762 failed_count = 1
763 if self.event_sink is not None:
764 self.event_sink.on_epic_verification_failed(epic_id, 0, [])
765 else:
766 passed_count = 1
767 if self.event_sink is not None:
768 self.event_sink.on_epic_verification_passed(
769 epic_id, verdict.confidence
770 )
771 return EpicVerificationResult(
772 verified_count=verified_count,
773 passed_count=passed_count,
774 failed_count=failed_count,
775 verdicts=verdicts,
776 remediation_issues_created=remediation_issues,
777 )
779 def _truncate_text(self, text: str, max_chars: int = 4000) -> str:
780 """Truncate text for issue descriptions to keep context manageable."""
781 if len(text) <= max_chars:
782 return text
783 return f"{text[:max_chars]}\n\n[truncated]"
785 def _format_remediation_context(
786 self, context: EpicVerificationContext | None
787 ) -> str:
788 """Build a rich context block for remediation issue descriptions."""
789 if context is None:
790 return ""
792 sections: list[str] = []
793 sections.append(
794 "## Epic Description / Acceptance Criteria\n"
795 + self._truncate_text(context.epic_description)
796 )
798 if context.spec_content:
799 sections.append(
800 "## Spec Content\n" + self._truncate_text(context.spec_content)
801 )
803 commit_range = context.commit_range or "Unavailable"
804 commit_list = context.commit_summary or "No commits found."
805 sections.append(
806 f"## Commit Scope\n- Range hint: {commit_range}\n- Commits:\n{commit_list}"
807 )
809 if context.child_ids:
810 child_list = "\n".join(f"- {cid}" for cid in sorted(context.child_ids))
811 sections.append(f"## Child Issues\n{child_list}")
813 if context.blocker_ids:
814 blocker_list = "\n".join(f"- {bid}" for bid in sorted(context.blocker_ids))
815 sections.append(f"## Existing Blockers\n{blocker_list}")
817 return "\n\n".join(sections)
819 async def create_remediation_issues(
820 self,
821 epic_id: str,
822 verdict: EpicVerdict,
823 context: EpicVerificationContext | None = None,
824 ) -> tuple[list[str], list[str]]:
825 """Create issues for unmet criteria, return blocking and informational IDs.
827 Deduplication: Checks for existing issues with matching
828 epic_remediation:<epic_id>:<criterion_hash> tag before creating.
830 P0/P1 issues are blocking (parented to epic, block closure).
831 P2/P3 issues are informational (standalone, don't block closure).
833 Args:
834 epic_id: The epic ID the issues are for.
835 verdict: The verification verdict with unmet criteria.
836 context: Optional verification context for richer issue descriptions.
838 Returns:
839 Tuple of (blocking_issue_ids, informational_issue_ids).
840 """
841 blocking_ids: list[str] = []
842 informational_ids: list[str] = []
843 context_block = self._format_remediation_context(context)
845 for criterion in verdict.unmet_criteria:
846 is_blocking = criterion.priority <= 1 # P0/P1 are blocking
848 # Build dedup tag
849 dedup_tag = f"epic_remediation:{epic_id}:{criterion.criterion_hash}"
851 # Check for existing issue with this tag
852 existing_id = await self.beads.find_issue_by_tag_async(dedup_tag)
853 if existing_id:
854 if is_blocking:
855 blocking_ids.append(existing_id)
856 else:
857 informational_ids.append(existing_id)
858 continue
860 # Create new remediation issue
861 # Sanitize criterion text for title: remove newlines, collapse whitespace
862 sanitized_criterion = re.sub(r"\s+", " ", criterion.criterion.strip())
863 prefix = "[Remediation]" if is_blocking else "[Advisory]"
864 title = f"{prefix} {sanitized_criterion[:60]}"
865 if len(sanitized_criterion) > 60:
866 title = title + "..."
868 priority_label = f"P{criterion.priority}"
869 blocking_note = (
870 "Address this criterion to unblock epic closure."
871 if is_blocking
872 else "This is advisory (P2/P3) and does not block epic closure."
873 )
875 description = f"""## Context
876This issue was auto-created by epic verification for epic `{epic_id}`.
878{context_block}
880## Unmet Criterion
881{criterion.criterion}
883## Evidence
884{criterion.evidence}
886## Priority
887{priority_label} ({"blocking" if is_blocking else "informational"})
889## Resolution
890{blocking_note} When complete, close this issue.
891"""
893 priority_str = f"P{criterion.priority}"
895 # Blocking issues are parented to epic; informational are standalone
896 parent_id = epic_id if is_blocking else None
898 issue_id = await self.beads.create_issue_async(
899 title=title,
900 description=description,
901 priority=priority_str,
902 tags=[dedup_tag, "auto_generated"],
903 parent_id=parent_id,
904 )
905 if issue_id:
906 if is_blocking:
907 blocking_ids.append(issue_id)
908 else:
909 informational_ids.append(issue_id)
910 # Emit remediation created event
911 if self.event_sink is not None:
912 self.event_sink.on_epic_remediation_created(
913 epic_id, issue_id, criterion.criterion
914 )
916 return blocking_ids, informational_ids
918 async def add_epic_blockers(
919 self, epic_id: str, blocker_issue_ids: list[str]
920 ) -> None:
921 """Add issues as blockers of the epic via bd dep add.
923 Args:
924 epic_id: The epic to block.
925 blocker_issue_ids: Issues that must be resolved before epic closes.
926 """
927 for blocker_id in blocker_issue_ids:
928 await self._runner.run_async(
929 ["bd", "dep", "add", epic_id, "--blocked-by", blocker_id]
930 )