Coverage for src / pipeline / epic_verification_coordinator.py: 34%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""EpicVerificationCoordinator: Epic closure verification pipeline stage. 

2 

3Extracted from MalaOrchestrator to separate epic verification logic from orchestration. 

4This module handles: 

5- Checking if closing an issue should close its parent epic 

6- Retry loop for epic verification 

7- Executing remediation issues when verification fails 

8 

9Design principles: 

10- Protocol-based callbacks for orchestrator-owned operations 

11- State management: verified_epics, epics_being_verified sets 

12- Explicit config for retry behavior 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18from dataclasses import dataclass, field 

19from typing import TYPE_CHECKING 

20 

21if TYPE_CHECKING: 

22 from collections.abc import Awaitable, Callable 

23 

24 from src.core.models import EpicVerificationResult 

25 from src.infra.io.log_output.run_metadata import RunMetadata 

26 from src.pipeline.issue_result import IssueResult 

27 

28 

29@dataclass 

30class EpicVerificationConfig: 

31 """Configuration for EpicVerificationCoordinator. 

32 

33 Attributes: 

34 max_retries: Maximum number of retry attempts after initial verification. 

35 """ 

36 

37 max_retries: int = 0 

38 

39 

40@dataclass 

41class EpicVerificationCallbacks: 

42 """Callbacks for orchestrator-owned operations during epic verification. 

43 

44 These callbacks allow the coordinator to trigger orchestrator operations 

45 without taking dependencies on orchestrator internals. 

46 

47 Attributes: 

48 get_parent_epic: Get the parent epic ID for an issue. 

49 verify_epic: Run epic verification, returns verification result. 

50 spawn_remediation: Spawn an agent for a remediation issue. 

51 finalize_remediation: Finalize a remediation issue result. 

52 mark_completed: Mark an issue as completed in the coordinator. 

53 is_issue_failed: Check if an issue has failed. 

54 close_eligible_epics: Fallback for mock providers without EpicVerifier. 

55 on_epic_closed: Emit epic closed event. 

56 on_warning: Emit warning event. 

57 has_epic_verifier: Check if an EpicVerifier is available (callable for test patching). 

58 get_agent_id: Get the agent ID for an issue (for error attribution). 

59 """ 

60 

61 get_parent_epic: Callable[[str], Awaitable[str | None]] 

62 verify_epic: Callable[[str, bool], Awaitable[EpicVerificationResult]] 

63 spawn_remediation: Callable[[str], Awaitable[asyncio.Task[IssueResult] | None]] 

64 finalize_remediation: Callable[[str, IssueResult, RunMetadata], Awaitable[None]] 

65 mark_completed: Callable[[str], None] 

66 is_issue_failed: Callable[[str], bool] 

67 close_eligible_epics: Callable[[], Awaitable[bool]] 

68 on_epic_closed: Callable[[str], None] 

69 on_warning: Callable[[str], None] 

70 has_epic_verifier: Callable[[], bool] 

71 get_agent_id: Callable[[str], str] 

72 

73 

74@dataclass 

75class EpicVerificationCoordinator: 

76 """Epic verification pipeline stage. 

77 

78 This class encapsulates the epic closure verification logic that was previously 

79 inline in MalaOrchestrator._check_epic_closure. It manages: 

80 - Tracking verified epics to avoid re-verification 

81 - Re-entrant guard for epics being verified 

82 - Retry loop with remediation issue execution 

83 - Fallback for mock providers 

84 

85 Attributes: 

86 config: Verification configuration. 

87 callbacks: Callbacks for orchestrator-owned operations. 

88 epic_override_ids: Set of epic IDs to force human override. 

89 """ 

90 

91 config: EpicVerificationConfig 

92 callbacks: EpicVerificationCallbacks 

93 epic_override_ids: set[str] = field(default_factory=set) 

94 

95 # State: Tracked across multiple check_epic_closure calls 

96 verified_epics: set[str] = field(default_factory=set) 

97 epics_being_verified: set[str] = field(default_factory=set) 

98 

99 async def check_epic_closure( 

100 self, issue_id: str, run_metadata: RunMetadata 

101 ) -> None: 

102 """Check if closing this issue should also close its parent epic. 

103 

104 Implements a retry loop for epic verification: 

105 1. Run verification 

106 2. If verification fails and creates remediation issues, execute them 

107 3. Re-verify the epic 

108 4. Repeat until verification passes OR max retries reached 

109 

110 Args: 

111 issue_id: The issue that was just closed. 

112 run_metadata: Run metadata for recording remediation issue results. 

113 """ 

114 parent_epic = await self.callbacks.get_parent_epic(issue_id) 

115 if parent_epic is None or parent_epic in self.verified_epics: 

116 return 

117 

118 # Guard against re-entrant verification (e.g., when remediation tasks complete) 

119 if parent_epic in self.epics_being_verified: 

120 return 

121 

122 if self.callbacks.has_epic_verifier(): 

123 # Mark as being verified to prevent parallel verification loops 

124 self.epics_being_verified.add(parent_epic) 

125 try: 

126 await self._verify_epic_with_retries(parent_epic, run_metadata) 

127 finally: 

128 # Always remove from being_verified set when done 

129 self.epics_being_verified.discard(parent_epic) 

130 

131 elif await self.callbacks.close_eligible_epics(): 

132 # Fallback for mock providers without EpicVerifier 

133 self.callbacks.on_epic_closed(issue_id) 

134 

135 async def _verify_epic_with_retries( 

136 self, epic_id: str, run_metadata: RunMetadata 

137 ) -> None: 

138 """Run epic verification with retry loop. 

139 

140 Args: 

141 epic_id: The epic to verify. 

142 run_metadata: Run metadata for recording remediation issue results. 

143 """ 

144 # max_retries is the number of retries AFTER the first attempt 

145 # So total attempts = 1 (initial) + max_retries 

146 max_retries = self.config.max_retries 

147 max_attempts = 1 + max_retries 

148 human_override = epic_id in self.epic_override_ids 

149 

150 for attempt in range(1, max_attempts + 1): 

151 # Log attempt if retrying (attempt > 1) 

152 if attempt > 1: 

153 self.callbacks.on_warning( 

154 f"Epic verification retry {attempt - 1}/{max_retries} for {epic_id}" 

155 ) 

156 

157 verification_result = await self.callbacks.verify_epic( 

158 epic_id, human_override 

159 ) 

160 

161 # If epic wasn't eligible (children still open), don't mark as verified 

162 # so it can be re-checked when more children close 

163 if verification_result.verified_count == 0: 

164 return 

165 

166 # If epic passed verification, mark as verified and return 

167 if verification_result.passed_count > 0: 

168 self.verified_epics.add(epic_id) 

169 return 

170 

171 # If no remediation issues were created, or max attempts reached, 

172 # mark as verified (to prevent infinite loops) and return 

173 if ( 

174 not verification_result.remediation_issues_created 

175 or attempt >= max_attempts 

176 ): 

177 if attempt >= max_attempts and verification_result.failed_count > 0: 

178 self.callbacks.on_warning( 

179 f"Epic verification failed after {max_retries} retries for {epic_id}" 

180 ) 

181 self.verified_epics.add(epic_id) 

182 return 

183 

184 # Execute remediation issues before next verification attempt 

185 await self._execute_remediation_issues( 

186 verification_result.remediation_issues_created, 

187 run_metadata, 

188 ) 

189 

190 async def _execute_remediation_issues( 

191 self, 

192 issue_ids: list[str], 

193 run_metadata: RunMetadata, 

194 ) -> None: 

195 """Execute remediation issues and wait for their completion. 

196 

197 Spawns agents for remediation issues, waits for completion, and finalizes 

198 results (closes issues, records metadata). This ensures remediation issues 

199 are properly tracked even though they bypass the main run_loop. 

200 

201 Args: 

202 issue_ids: List of remediation issue IDs to execute. 

203 run_metadata: Run metadata for recording issue results. 

204 """ 

205 if not issue_ids: 

206 return 

207 

208 # Track (issue_id, task) pairs for finalization 

209 task_pairs: list[tuple[str, asyncio.Task[IssueResult]]] = [] 

210 

211 for issue_id in issue_ids: 

212 # Skip if already failed (remediation issues are freshly created, so won't be completed) 

213 if self.callbacks.is_issue_failed(issue_id): 

214 continue 

215 

216 # Spawn agent for this issue 

217 task = await self.callbacks.spawn_remediation(issue_id) 

218 if task: 

219 task_pairs.append((issue_id, task)) 

220 

221 # Wait for all remediation tasks to complete 

222 if not task_pairs: 

223 return 

224 

225 tasks = [pair[1] for pair in task_pairs] 

226 await asyncio.gather(*tasks, return_exceptions=True) 

227 

228 # Finalize each task result (close issue, record metadata, emit events) 

229 for issue_id, task in task_pairs: 

230 result = self._extract_task_result(issue_id, task) 

231 

232 # Finalize (closes issue, records to run_metadata, emits events) 

233 # Wrap in try/except to ensure all issues are finalized even if one fails 

234 try: 

235 await self.callbacks.finalize_remediation( 

236 issue_id, result, run_metadata 

237 ) 

238 except asyncio.CancelledError: 

239 raise 

240 except Exception as e: 

241 self.callbacks.on_warning( 

242 f"Failed to finalize remediation result for {issue_id} " 

243 f"(agent: {result.agent_id}): {e}", 

244 ) 

245 

246 # Mark as completed in the coordinator 

247 self.callbacks.mark_completed(issue_id) 

248 

249 def _extract_task_result( 

250 self, issue_id: str, task: asyncio.Task[IssueResult] 

251 ) -> IssueResult: 

252 """Extract result from a completed task, handling exceptions. 

253 

254 Args: 

255 issue_id: The issue ID for error results. 

256 task: The completed task. 

257 

258 Returns: 

259 The task result or an error IssueResult. 

260 """ 

261 # Import here to avoid circular dependency 

262 from src.pipeline.issue_result import IssueResult 

263 

264 try: 

265 return task.result() 

266 except asyncio.CancelledError: 

267 return IssueResult( 

268 issue_id=issue_id, 

269 agent_id=self.callbacks.get_agent_id(issue_id), 

270 success=False, 

271 summary="Remediation task was cancelled", 

272 ) 

273 except Exception as e: 

274 return IssueResult( 

275 issue_id=issue_id, 

276 agent_id=self.callbacks.get_agent_id(issue_id), 

277 success=False, 

278 summary=str(e), 

279 )