Coverage for src / pipeline / epic_verification_coordinator.py: 34%
90 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""EpicVerificationCoordinator: Epic closure verification pipeline stage.
3Extracted from MalaOrchestrator to separate epic verification logic from orchestration.
4This module handles:
5- Checking if closing an issue should close its parent epic
6- Retry loop for epic verification
7- Executing remediation issues when verification fails
9Design principles:
10- Protocol-based callbacks for orchestrator-owned operations
11- State management: verified_epics, epics_being_verified sets
12- Explicit config for retry behavior
13"""
15from __future__ import annotations
17import asyncio
18from dataclasses import dataclass, field
19from typing import TYPE_CHECKING
21if TYPE_CHECKING:
22 from collections.abc import Awaitable, Callable
24 from src.core.models import EpicVerificationResult
25 from src.infra.io.log_output.run_metadata import RunMetadata
26 from src.pipeline.issue_result import IssueResult
29@dataclass
30class EpicVerificationConfig:
31 """Configuration for EpicVerificationCoordinator.
33 Attributes:
34 max_retries: Maximum number of retry attempts after initial verification.
35 """
37 max_retries: int = 0
40@dataclass
41class EpicVerificationCallbacks:
42 """Callbacks for orchestrator-owned operations during epic verification.
44 These callbacks allow the coordinator to trigger orchestrator operations
45 without taking dependencies on orchestrator internals.
47 Attributes:
48 get_parent_epic: Get the parent epic ID for an issue.
49 verify_epic: Run epic verification, returns verification result.
50 spawn_remediation: Spawn an agent for a remediation issue.
51 finalize_remediation: Finalize a remediation issue result.
52 mark_completed: Mark an issue as completed in the coordinator.
53 is_issue_failed: Check if an issue has failed.
54 close_eligible_epics: Fallback for mock providers without EpicVerifier.
55 on_epic_closed: Emit epic closed event.
56 on_warning: Emit warning event.
57 has_epic_verifier: Check if an EpicVerifier is available (callable for test patching).
58 get_agent_id: Get the agent ID for an issue (for error attribution).
59 """
61 get_parent_epic: Callable[[str], Awaitable[str | None]]
62 verify_epic: Callable[[str, bool], Awaitable[EpicVerificationResult]]
63 spawn_remediation: Callable[[str], Awaitable[asyncio.Task[IssueResult] | None]]
64 finalize_remediation: Callable[[str, IssueResult, RunMetadata], Awaitable[None]]
65 mark_completed: Callable[[str], None]
66 is_issue_failed: Callable[[str], bool]
67 close_eligible_epics: Callable[[], Awaitable[bool]]
68 on_epic_closed: Callable[[str], None]
69 on_warning: Callable[[str], None]
70 has_epic_verifier: Callable[[], bool]
71 get_agent_id: Callable[[str], str]
74@dataclass
75class EpicVerificationCoordinator:
76 """Epic verification pipeline stage.
78 This class encapsulates the epic closure verification logic that was previously
79 inline in MalaOrchestrator._check_epic_closure. It manages:
80 - Tracking verified epics to avoid re-verification
81 - Re-entrant guard for epics being verified
82 - Retry loop with remediation issue execution
83 - Fallback for mock providers
85 Attributes:
86 config: Verification configuration.
87 callbacks: Callbacks for orchestrator-owned operations.
88 epic_override_ids: Set of epic IDs to force human override.
89 """
91 config: EpicVerificationConfig
92 callbacks: EpicVerificationCallbacks
93 epic_override_ids: set[str] = field(default_factory=set)
95 # State: Tracked across multiple check_epic_closure calls
96 verified_epics: set[str] = field(default_factory=set)
97 epics_being_verified: set[str] = field(default_factory=set)
99 async def check_epic_closure(
100 self, issue_id: str, run_metadata: RunMetadata
101 ) -> None:
102 """Check if closing this issue should also close its parent epic.
104 Implements a retry loop for epic verification:
105 1. Run verification
106 2. If verification fails and creates remediation issues, execute them
107 3. Re-verify the epic
108 4. Repeat until verification passes OR max retries reached
110 Args:
111 issue_id: The issue that was just closed.
112 run_metadata: Run metadata for recording remediation issue results.
113 """
114 parent_epic = await self.callbacks.get_parent_epic(issue_id)
115 if parent_epic is None or parent_epic in self.verified_epics:
116 return
118 # Guard against re-entrant verification (e.g., when remediation tasks complete)
119 if parent_epic in self.epics_being_verified:
120 return
122 if self.callbacks.has_epic_verifier():
123 # Mark as being verified to prevent parallel verification loops
124 self.epics_being_verified.add(parent_epic)
125 try:
126 await self._verify_epic_with_retries(parent_epic, run_metadata)
127 finally:
128 # Always remove from being_verified set when done
129 self.epics_being_verified.discard(parent_epic)
131 elif await self.callbacks.close_eligible_epics():
132 # Fallback for mock providers without EpicVerifier
133 self.callbacks.on_epic_closed(issue_id)
135 async def _verify_epic_with_retries(
136 self, epic_id: str, run_metadata: RunMetadata
137 ) -> None:
138 """Run epic verification with retry loop.
140 Args:
141 epic_id: The epic to verify.
142 run_metadata: Run metadata for recording remediation issue results.
143 """
144 # max_retries is the number of retries AFTER the first attempt
145 # So total attempts = 1 (initial) + max_retries
146 max_retries = self.config.max_retries
147 max_attempts = 1 + max_retries
148 human_override = epic_id in self.epic_override_ids
150 for attempt in range(1, max_attempts + 1):
151 # Log attempt if retrying (attempt > 1)
152 if attempt > 1:
153 self.callbacks.on_warning(
154 f"Epic verification retry {attempt - 1}/{max_retries} for {epic_id}"
155 )
157 verification_result = await self.callbacks.verify_epic(
158 epic_id, human_override
159 )
161 # If epic wasn't eligible (children still open), don't mark as verified
162 # so it can be re-checked when more children close
163 if verification_result.verified_count == 0:
164 return
166 # If epic passed verification, mark as verified and return
167 if verification_result.passed_count > 0:
168 self.verified_epics.add(epic_id)
169 return
171 # If no remediation issues were created, or max attempts reached,
172 # mark as verified (to prevent infinite loops) and return
173 if (
174 not verification_result.remediation_issues_created
175 or attempt >= max_attempts
176 ):
177 if attempt >= max_attempts and verification_result.failed_count > 0:
178 self.callbacks.on_warning(
179 f"Epic verification failed after {max_retries} retries for {epic_id}"
180 )
181 self.verified_epics.add(epic_id)
182 return
184 # Execute remediation issues before next verification attempt
185 await self._execute_remediation_issues(
186 verification_result.remediation_issues_created,
187 run_metadata,
188 )
190 async def _execute_remediation_issues(
191 self,
192 issue_ids: list[str],
193 run_metadata: RunMetadata,
194 ) -> None:
195 """Execute remediation issues and wait for their completion.
197 Spawns agents for remediation issues, waits for completion, and finalizes
198 results (closes issues, records metadata). This ensures remediation issues
199 are properly tracked even though they bypass the main run_loop.
201 Args:
202 issue_ids: List of remediation issue IDs to execute.
203 run_metadata: Run metadata for recording issue results.
204 """
205 if not issue_ids:
206 return
208 # Track (issue_id, task) pairs for finalization
209 task_pairs: list[tuple[str, asyncio.Task[IssueResult]]] = []
211 for issue_id in issue_ids:
212 # Skip if already failed (remediation issues are freshly created, so won't be completed)
213 if self.callbacks.is_issue_failed(issue_id):
214 continue
216 # Spawn agent for this issue
217 task = await self.callbacks.spawn_remediation(issue_id)
218 if task:
219 task_pairs.append((issue_id, task))
221 # Wait for all remediation tasks to complete
222 if not task_pairs:
223 return
225 tasks = [pair[1] for pair in task_pairs]
226 await asyncio.gather(*tasks, return_exceptions=True)
228 # Finalize each task result (close issue, record metadata, emit events)
229 for issue_id, task in task_pairs:
230 result = self._extract_task_result(issue_id, task)
232 # Finalize (closes issue, records to run_metadata, emits events)
233 # Wrap in try/except to ensure all issues are finalized even if one fails
234 try:
235 await self.callbacks.finalize_remediation(
236 issue_id, result, run_metadata
237 )
238 except asyncio.CancelledError:
239 raise
240 except Exception as e:
241 self.callbacks.on_warning(
242 f"Failed to finalize remediation result for {issue_id} "
243 f"(agent: {result.agent_id}): {e}",
244 )
246 # Mark as completed in the coordinator
247 self.callbacks.mark_completed(issue_id)
249 def _extract_task_result(
250 self, issue_id: str, task: asyncio.Task[IssueResult]
251 ) -> IssueResult:
252 """Extract result from a completed task, handling exceptions.
254 Args:
255 issue_id: The issue ID for error results.
256 task: The completed task.
258 Returns:
259 The task result or an error IssueResult.
260 """
261 # Import here to avoid circular dependency
262 from src.pipeline.issue_result import IssueResult
264 try:
265 return task.result()
266 except asyncio.CancelledError:
267 return IssueResult(
268 issue_id=issue_id,
269 agent_id=self.callbacks.get_agent_id(issue_id),
270 success=False,
271 summary="Remediation task was cancelled",
272 )
273 except Exception as e:
274 return IssueResult(
275 issue_id=issue_id,
276 agent_id=self.callbacks.get_agent_id(issue_id),
277 success=False,
278 summary=str(e),
279 )