Coverage for src / pipeline / issue_finalizer.py: 56%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""IssueFinalizer: Finalization pipeline stage for issue results. 

2 

3Extracted from MalaOrchestrator to separate finalization logic from orchestration. 

4This module handles: 

5- Recording issue runs to metadata 

6- Cleaning up session paths 

7- Emitting completion events 

8- Closing issues and triggering epic checks 

9 

10The IssueFinalizer receives explicit inputs and returns explicit outputs, 

11making it testable without orchestrator dependencies. 

12 

13Design principles: 

14- Callback-based dependencies for orchestrator-owned operations 

15- Explicit input/output types for clarity 

16- Reuses existing gate_metadata helpers 

17""" 

18 

19from __future__ import annotations 

20 

21from dataclasses import dataclass 

22from typing import TYPE_CHECKING 

23 

24from src.infra.io.log_output.console import truncate_text 

25from src.infra.io.log_output.run_metadata import IssueRun 

26from src.pipeline.gate_metadata import ( 

27 GateMetadata, 

28 build_gate_metadata, 

29 build_gate_metadata_from_logs, 

30) 

31 

32if TYPE_CHECKING: 

33 from collections.abc import Awaitable, Callable 

34 from pathlib import Path 

35 

36 from src.core.protocols import ( 

37 GateChecker, 

38 GateResultProtocol, 

39 ReviewIssueProtocol, 

40 ) 

41 from src.domain.quality_gate import GateResult 

42 from src.domain.validation.spec import ValidationSpec 

43 from src.infra.io.log_output.run_metadata import RunMetadata 

44 from src.pipeline.issue_result import IssueResult 

45 

46 

47@dataclass 

48class IssueFinalizeConfig: 

49 """Configuration for IssueFinalizer behavior. 

50 

51 Attributes: 

52 track_review_issues: Whether to create tracking issues for P2/P3 findings. 

53 """ 

54 

55 track_review_issues: bool = False 

56 

57 

58@dataclass 

59class IssueFinalizeInput: 

60 """Input for issue finalization. 

61 

62 Bundles all the data needed to finalize a single issue result. 

63 

64 Attributes: 

65 issue_id: The issue being finalized. 

66 result: The issue result to finalize. 

67 run_metadata: The run metadata to record to. 

68 log_path: Path to the session log (if any). 

69 stored_gate_result: Stored gate result (if available). 

70 review_log_path: Path to the review log (if any). 

71 """ 

72 

73 issue_id: str 

74 result: IssueResult 

75 run_metadata: RunMetadata 

76 log_path: Path | None = None 

77 stored_gate_result: GateResult | GateResultProtocol | None = None 

78 review_log_path: str | None = None 

79 

80 

81@dataclass 

82class IssueFinalizeOutput: 

83 """Output from issue finalization. 

84 

85 Attributes: 

86 success: Whether finalization completed successfully. 

87 closed: Whether the issue was closed. 

88 gate_metadata: The extracted gate metadata. 

89 """ 

90 

91 success: bool 

92 closed: bool 

93 gate_metadata: GateMetadata 

94 

95 

96@dataclass 

97class IssueFinalizeCallbacks: 

98 """Callbacks for orchestrator-owned operations during finalization. 

99 

100 These callbacks allow the finalizer to trigger orchestrator operations 

101 without taking dependencies on orchestrator internals. 

102 

103 Attributes: 

104 close_issue: Close an issue in beads. Returns True if closed successfully. 

105 mark_needs_followup: Mark an issue as needing followup in beads. 

106 on_issue_closed: Emit issue closed event. 

107 on_issue_completed: Emit issue completed event. 

108 trigger_epic_closure: Trigger epic closure check. 

109 create_tracking_issues: Create tracking issues for P2/P3 review findings. 

110 """ 

111 

112 close_issue: Callable[[str], Awaitable[bool]] 

113 mark_needs_followup: Callable[[str, str, Path | None], Awaitable[None]] 

114 on_issue_closed: Callable[[str, str], None] 

115 on_issue_completed: Callable[[str, str, bool, float, str], None] 

116 trigger_epic_closure: Callable[[str, RunMetadata], Awaitable[None]] 

117 create_tracking_issues: Callable[[str, list[ReviewIssueProtocol]], Awaitable[None]] 

118 

119 

120@dataclass 

121class IssueFinalizer: 

122 """Issue finalization pipeline stage. 

123 

124 This class encapsulates the finalization logic that was previously 

125 inline in MalaOrchestrator._finalize_issue_result. It receives 

126 callbacks for orchestrator-owned operations. 

127 

128 The IssueFinalizer is responsible for: 

129 - Building gate metadata from stored results or logs 

130 - Closing issues and emitting events 

131 - Triggering epic closure checks 

132 - Creating tracking issues for review findings 

133 - Recording issue runs to metadata 

134 - Emitting completion events 

135 

136 Attributes: 

137 config: Finalization configuration. 

138 callbacks: Callbacks for orchestrator-owned operations. 

139 quality_gate: Quality gate checker for fallback metadata extraction. 

140 per_issue_spec: Validation spec for fallback metadata extraction. 

141 """ 

142 

143 config: IssueFinalizeConfig 

144 callbacks: IssueFinalizeCallbacks 

145 quality_gate: GateChecker | None = None 

146 per_issue_spec: ValidationSpec | None = None 

147 

148 async def finalize(self, input: IssueFinalizeInput) -> IssueFinalizeOutput: 

149 """Finalize an issue result. 

150 

151 Records the result, updates metadata, and emits logs. 

152 Uses stored gate result to derive metadata, avoiding duplicate 

153 validation parsing. 

154 

155 Args: 

156 input: The finalization input containing issue data. 

157 

158 Returns: 

159 IssueFinalizeOutput with finalization results. 

160 """ 

161 result = input.result 

162 issue_id = input.issue_id 

163 

164 # Build gate metadata from stored result or logs 

165 gate_metadata = self._build_gate_metadata(input) 

166 

167 # Handle successful issue closure and epic verification 

168 closed = False 

169 if result.success and await self.callbacks.close_issue(issue_id): 

170 closed = True 

171 self.callbacks.on_issue_closed(issue_id, issue_id) 

172 await self.callbacks.trigger_epic_closure(issue_id, input.run_metadata) 

173 

174 # Create tracking issues for P2/P3 review findings (if enabled) 

175 if self.config.track_review_issues and result.low_priority_review_issues: 

176 await self.callbacks.create_tracking_issues( 

177 issue_id, 

178 result.low_priority_review_issues, 

179 ) 

180 

181 # Record to run metadata 

182 self._record_issue_run(input, gate_metadata) 

183 

184 # Emit completion event and handle failure tracking 

185 await self._emit_completion(input) 

186 

187 return IssueFinalizeOutput( 

188 success=True, 

189 closed=closed, 

190 gate_metadata=gate_metadata, 

191 ) 

192 

193 def _build_gate_metadata(self, input: IssueFinalizeInput) -> GateMetadata: 

194 """Build gate metadata from stored result or logs. 

195 

196 Args: 

197 input: The finalization input. 

198 

199 Returns: 

200 GateMetadata extracted from results or logs. 

201 """ 

202 stored_gate_result = input.stored_gate_result 

203 log_path = input.log_path 

204 result = input.result 

205 

206 if stored_gate_result is not None: 

207 return build_gate_metadata(stored_gate_result, result.success) 

208 elif ( 

209 not result.success 

210 and log_path 

211 and log_path.exists() 

212 and self.quality_gate is not None 

213 and self.per_issue_spec is not None 

214 ): 

215 return build_gate_metadata_from_logs( 

216 log_path, 

217 result.summary, 

218 result.success, 

219 self.quality_gate, 

220 self.per_issue_spec, 

221 ) 

222 else: 

223 return GateMetadata() 

224 

225 def _record_issue_run( 

226 self, 

227 input: IssueFinalizeInput, 

228 gate_metadata: GateMetadata, 

229 ) -> None: 

230 """Record an issue run to the run metadata. 

231 

232 Args: 

233 input: The finalization input. 

234 gate_metadata: Extracted gate metadata. 

235 """ 

236 result = input.result 

237 log_path = input.log_path 

238 

239 issue_run = IssueRun( 

240 issue_id=result.issue_id, 

241 agent_id=result.agent_id, 

242 status="success" if result.success else "failed", 

243 duration_seconds=result.duration_seconds, 

244 session_id=result.session_id, 

245 log_path=str(log_path) if log_path else None, 

246 quality_gate=gate_metadata.quality_gate_result, 

247 error=result.summary if not result.success else None, 

248 gate_attempts=result.gate_attempts, 

249 review_attempts=result.review_attempts, 

250 validation=gate_metadata.validation_result, 

251 resolution=result.resolution, 

252 review_log_path=input.review_log_path, 

253 ) 

254 input.run_metadata.record_issue(issue_run) 

255 

256 async def _emit_completion(self, input: IssueFinalizeInput) -> None: 

257 """Emit completion event and handle failure tracking. 

258 

259 Args: 

260 input: The finalization input. 

261 """ 

262 result = input.result 

263 issue_id = input.issue_id 

264 log_path = input.log_path 

265 

266 self.callbacks.on_issue_completed( 

267 issue_id, 

268 issue_id, 

269 result.success, 

270 result.duration_seconds, 

271 truncate_text(result.summary, 50) if result.success else result.summary, 

272 ) 

273 if not result.success: 

274 await self.callbacks.mark_needs_followup(issue_id, result.summary, log_path)