Coverage for src / pipeline / gate_runner.py: 58%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""GateRunner: Quality gate checking pipeline stage. 

2 

3Extracted from MalaOrchestrator to separate gate/fixer policy from orchestration. 

4This module handles: 

5- Per-issue quality gate checks with retry state management 

6- No-progress detection for retry termination 

7- Async gate running for non-blocking orchestration 

8 

9The GateRunner receives explicit inputs and returns explicit outputs, 

10making it testable without SDK or subprocess dependencies. 

11 

12Design principles: 

13- Pure functions where possible (gate checking is stateless) 

14- Protocol-based dependencies for testability 

15- Explicit input/output types for clarity 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import logging 

22from dataclasses import dataclass, field 

23from typing import TYPE_CHECKING, cast 

24 

25from src.domain.quality_gate import GateResult 

26from src.domain.validation.spec import ( 

27 ValidationScope, 

28 build_validation_spec, 

29) 

30 

31logger = logging.getLogger(__name__) 

32 

33if TYPE_CHECKING: 

34 from pathlib import Path 

35 

36 from src.domain.lifecycle import RetryState 

37 from src.core.protocols import ( 

38 GateChecker, 

39 GateResultProtocol, 

40 ValidationSpecProtocol, 

41 ) 

42 from src.domain.validation.spec import ValidationSpec 

43 

44 

45@dataclass 

46class GateRunnerConfig: 

47 """Configuration for GateRunner behavior. 

48 

49 Attributes: 

50 max_gate_retries: Maximum number of gate retry attempts. 

51 disable_validations: Set of validation names to disable. 

52 coverage_threshold: Optional coverage threshold override. 

53 """ 

54 

55 max_gate_retries: int = 3 

56 disable_validations: set[str] | None = None 

57 coverage_threshold: float | None = None 

58 

59 

60@dataclass 

61class PerIssueGateInput: 

62 """Input for per-issue quality gate check. 

63 

64 Bundles all the data needed to run a single gate check. 

65 

66 Attributes: 

67 issue_id: The issue being checked. 

68 log_path: Path to the JSONL session log file. 

69 retry_state: Current retry state (gate attempt, log offset, etc.). 

70 spec: ValidationSpec defining what to check. If None, will be built. 

71 """ 

72 

73 issue_id: str 

74 log_path: Path 

75 retry_state: RetryState 

76 spec: ValidationSpec | None = None 

77 

78 

79@dataclass 

80class PerIssueGateOutput: 

81 """Output from per-issue quality gate check. 

82 

83 Attributes: 

84 gate_result: The GateResult from the quality gate. 

85 new_log_offset: Updated log offset for next retry attempt. 

86 """ 

87 

88 gate_result: GateResultProtocol 

89 new_log_offset: int 

90 

91 

92@dataclass 

93class GateRunner: 

94 """Quality gate runner for per-issue validation. 

95 

96 This class encapsulates the gate checking logic that was previously 

97 inline in MalaOrchestrator._run_quality_gate_sync. It receives a 

98 GateChecker (protocol) for actual validation execution. 

99 

100 The GateRunner is responsible for: 

101 - Building/using ValidationSpec for scope-aware checks 

102 - Running gate checks via the injected GateChecker 

103 - Detecting no-progress conditions for retry termination 

104 - Returning updated log offsets for retry scoping 

105 

106 Usage: 

107 runner = GateRunner( 

108 gate_checker=quality_gate, 

109 repo_path=repo_path, 

110 config=GateRunnerConfig(max_gate_retries=3), 

111 ) 

112 output = runner.run_per_issue_gate(input) 

113 

114 Attributes: 

115 gate_checker: GateChecker implementation for running checks. 

116 repo_path: Path to the repository. 

117 config: Configuration for gate behavior. 

118 per_issue_spec: Cached per-issue ValidationSpec (built lazily). 

119 """ 

120 

121 gate_checker: GateChecker 

122 repo_path: Path 

123 config: GateRunnerConfig = field(default_factory=GateRunnerConfig) 

124 per_issue_spec: ValidationSpec | None = field(default=None, init=False) 

125 

126 def _get_or_build_spec( 

127 self, provided_spec: ValidationSpec | None 

128 ) -> ValidationSpec: 

129 """Get provided spec or build/cache a per-issue spec. 

130 

131 Args: 

132 provided_spec: Spec provided in input, or None. 

133 

134 Returns: 

135 ValidationSpec to use for the gate check. 

136 """ 

137 if provided_spec is not None: 

138 return provided_spec 

139 

140 # Build and cache per-issue spec if not already cached 

141 if self.per_issue_spec is None: 

142 self.per_issue_spec = build_validation_spec( 

143 self.repo_path, 

144 scope=ValidationScope.PER_ISSUE, 

145 disable_validations=self.config.disable_validations, 

146 ) 

147 return self.per_issue_spec 

148 

149 def run_per_issue_gate(self, input: PerIssueGateInput) -> PerIssueGateOutput: 

150 """Run quality gate check for a single issue. 

151 

152 This is a synchronous method that performs blocking I/O. 

153 The orchestrator should call this via asyncio.to_thread(). 

154 

155 Args: 

156 input: PerIssueGateInput with issue_id, log_path, retry_state. 

157 

158 Returns: 

159 PerIssueGateOutput with gate_result and new_log_offset. 

160 """ 

161 spec = self._get_or_build_spec(input.spec) 

162 logger.debug( 

163 "Gate check: issue_id=%s attempt=%d", 

164 input.issue_id, 

165 input.retry_state.gate_attempt, 

166 ) 

167 

168 # Run gate check via injected checker 

169 gate_result = self.gate_checker.check_with_resolution( 

170 issue_id=input.issue_id, 

171 log_path=input.log_path, 

172 baseline_timestamp=input.retry_state.baseline_timestamp, 

173 log_offset=input.retry_state.log_offset, 

174 spec=cast("ValidationSpecProtocol | None", spec), 

175 ) 

176 

177 # Calculate new offset for next attempt 

178 new_offset = self.gate_checker.get_log_end_offset( 

179 input.log_path, start_offset=input.retry_state.log_offset 

180 ) 

181 

182 # Check for no-progress condition on retries 

183 if input.retry_state.gate_attempt > 1 and not gate_result.passed: 

184 no_progress = self.gate_checker.check_no_progress( 

185 input.log_path, 

186 input.retry_state.log_offset, 

187 input.retry_state.previous_commit_hash, 

188 gate_result.commit_hash, 

189 spec=cast("ValidationSpecProtocol | None", spec), 

190 ) 

191 if no_progress: 

192 logger.warning("No progress detected: issue_id=%s", input.issue_id) 

193 # Add no-progress to failure reasons 

194 updated_reasons = list(gate_result.failure_reasons) 

195 updated_reasons.append( 

196 "No progress: commit unchanged and no new validation evidence" 

197 ) 

198 gate_result = GateResult( 

199 passed=False, 

200 failure_reasons=updated_reasons, 

201 commit_hash=gate_result.commit_hash, 

202 validation_evidence=gate_result.validation_evidence, 

203 no_progress=True, 

204 resolution=gate_result.resolution, 

205 ) 

206 

207 return PerIssueGateOutput( 

208 gate_result=gate_result, 

209 new_log_offset=new_offset, 

210 ) 

211 

212 def get_cached_spec(self) -> ValidationSpec | None: 

213 """Get the cached per-issue spec, if any. 

214 

215 This allows the orchestrator to access the spec for other purposes 

216 (e.g., evidence parsing) without rebuilding it. 

217 

218 Returns: 

219 The cached ValidationSpec, or None if not yet built. 

220 """ 

221 return self.per_issue_spec 

222 

223 def set_cached_spec(self, spec: ValidationSpec) -> None: 

224 """Set the cached per-issue spec. 

225 

226 Allows the orchestrator to pre-populate the cache with a spec 

227 built at run start. 

228 

229 Args: 

230 spec: ValidationSpec to cache. 

231 """ 

232 self.per_issue_spec = spec 

233 logger.debug("Validation spec cached: issue_id=*") 

234 

235 

236@dataclass 

237class AsyncGateRunner: 

238 """Async wrapper for GateRunner that runs gate checks in a thread pool. 

239 

240 This class implements the GateAsyncRunner protocol used by SessionCallbackFactory. 

241 It wraps a GateRunner and delegates to it via asyncio.to_thread for non-blocking 

242 execution. 

243 

244 The AsyncGateRunner maintains its own state for: 

245 - per_issue_spec: Cached validation spec (synced with underlying GateRunner) 

246 - last_gate_results: Most recent gate results per issue 

247 

248 Usage: 

249 gate_runner = GateRunner(gate_checker=..., repo_path=..., config=...) 

250 async_runner = AsyncGateRunner(gate_runner=gate_runner) 

251 result, offset = await async_runner.run_gate_async(issue_id, log_path, retry_state) 

252 """ 

253 

254 gate_runner: GateRunner 

255 per_issue_spec: ValidationSpec | None = field(default=None) 

256 last_gate_results: dict[str, GateResult | GateResultProtocol] = field( 

257 default_factory=dict 

258 ) 

259 

260 def _run_gate_sync( 

261 self, 

262 issue_id: str, 

263 log_path: Path, 

264 retry_state: RetryState, 

265 ) -> tuple[GateResult | GateResultProtocol, int]: 

266 """Synchronous gate check (blocking I/O). 

267 

268 Delegates to GateRunner for actual gate checking logic. 

269 """ 

270 # Sync per_issue_spec with gate_runner 

271 if self.per_issue_spec is not None: 

272 self.gate_runner.set_cached_spec(self.per_issue_spec) 

273 

274 gate_input = PerIssueGateInput( 

275 issue_id=issue_id, 

276 log_path=log_path, 

277 retry_state=retry_state, 

278 spec=self.per_issue_spec, 

279 ) 

280 output = self.gate_runner.run_per_issue_gate(gate_input) 

281 

282 # Sync cached spec back (gate_runner may have built it) 

283 if self.per_issue_spec is None: 

284 self.per_issue_spec = self.gate_runner.get_cached_spec() 

285 

286 # Store gate result for later retrieval 

287 self.last_gate_results[issue_id] = output.gate_result 

288 

289 return (output.gate_result, output.new_log_offset) 

290 

291 async def run_gate_async( 

292 self, 

293 issue_id: str, 

294 log_path: Path, 

295 retry_state: RetryState, 

296 ) -> tuple[GateResult | GateResultProtocol, int]: 

297 """Run quality gate check asynchronously (GateAsyncRunner protocol). 

298 

299 Wraps the blocking gate check to avoid stalling the event loop. 

300 This allows the orchestrator to service other agents while a gate runs. 

301 

302 Args: 

303 issue_id: The issue being checked. 

304 log_path: Path to the session log file. 

305 retry_state: Current retry state for this issue. 

306 

307 Returns: 

308 Tuple of (GateResult, new_log_offset). 

309 """ 

310 return await asyncio.to_thread( 

311 self._run_gate_sync, issue_id, log_path, retry_state 

312 ) 

313 

314 def get_last_gate_result( 

315 self, issue_id: str 

316 ) -> GateResult | GateResultProtocol | None: 

317 """Get the last gate result for an issue. 

318 

319 Args: 

320 issue_id: The issue ID. 

321 

322 Returns: 

323 The last gate result, or None if not available. 

324 """ 

325 return self.last_gate_results.get(issue_id) 

326 

327 def clear_gate_result(self, issue_id: str) -> None: 

328 """Clear the stored gate result for an issue. 

329 

330 Args: 

331 issue_id: The issue ID. 

332 """ 

333 self.last_gate_results.pop(issue_id, None)