Coverage for src / pipeline / gate_runner.py: 58%
73 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""GateRunner: Quality gate checking pipeline stage.
3Extracted from MalaOrchestrator to separate gate/fixer policy from orchestration.
4This module handles:
5- Per-issue quality gate checks with retry state management
6- No-progress detection for retry termination
7- Async gate running for non-blocking orchestration
9The GateRunner receives explicit inputs and returns explicit outputs,
10making it testable without SDK or subprocess dependencies.
12Design principles:
13- Pure functions where possible (gate checking is stateless)
14- Protocol-based dependencies for testability
15- Explicit input/output types for clarity
16"""
18from __future__ import annotations
20import asyncio
21import logging
22from dataclasses import dataclass, field
23from typing import TYPE_CHECKING, cast
25from src.domain.quality_gate import GateResult
26from src.domain.validation.spec import (
27 ValidationScope,
28 build_validation_spec,
29)
31logger = logging.getLogger(__name__)
33if TYPE_CHECKING:
34 from pathlib import Path
36 from src.domain.lifecycle import RetryState
37 from src.core.protocols import (
38 GateChecker,
39 GateResultProtocol,
40 ValidationSpecProtocol,
41 )
42 from src.domain.validation.spec import ValidationSpec
45@dataclass
46class GateRunnerConfig:
47 """Configuration for GateRunner behavior.
49 Attributes:
50 max_gate_retries: Maximum number of gate retry attempts.
51 disable_validations: Set of validation names to disable.
52 coverage_threshold: Optional coverage threshold override.
53 """
55 max_gate_retries: int = 3
56 disable_validations: set[str] | None = None
57 coverage_threshold: float | None = None
60@dataclass
61class PerIssueGateInput:
62 """Input for per-issue quality gate check.
64 Bundles all the data needed to run a single gate check.
66 Attributes:
67 issue_id: The issue being checked.
68 log_path: Path to the JSONL session log file.
69 retry_state: Current retry state (gate attempt, log offset, etc.).
70 spec: ValidationSpec defining what to check. If None, will be built.
71 """
73 issue_id: str
74 log_path: Path
75 retry_state: RetryState
76 spec: ValidationSpec | None = None
79@dataclass
80class PerIssueGateOutput:
81 """Output from per-issue quality gate check.
83 Attributes:
84 gate_result: The GateResult from the quality gate.
85 new_log_offset: Updated log offset for next retry attempt.
86 """
88 gate_result: GateResultProtocol
89 new_log_offset: int
92@dataclass
93class GateRunner:
94 """Quality gate runner for per-issue validation.
96 This class encapsulates the gate checking logic that was previously
97 inline in MalaOrchestrator._run_quality_gate_sync. It receives a
98 GateChecker (protocol) for actual validation execution.
100 The GateRunner is responsible for:
101 - Building/using ValidationSpec for scope-aware checks
102 - Running gate checks via the injected GateChecker
103 - Detecting no-progress conditions for retry termination
104 - Returning updated log offsets for retry scoping
106 Usage:
107 runner = GateRunner(
108 gate_checker=quality_gate,
109 repo_path=repo_path,
110 config=GateRunnerConfig(max_gate_retries=3),
111 )
112 output = runner.run_per_issue_gate(input)
114 Attributes:
115 gate_checker: GateChecker implementation for running checks.
116 repo_path: Path to the repository.
117 config: Configuration for gate behavior.
118 per_issue_spec: Cached per-issue ValidationSpec (built lazily).
119 """
121 gate_checker: GateChecker
122 repo_path: Path
123 config: GateRunnerConfig = field(default_factory=GateRunnerConfig)
124 per_issue_spec: ValidationSpec | None = field(default=None, init=False)
126 def _get_or_build_spec(
127 self, provided_spec: ValidationSpec | None
128 ) -> ValidationSpec:
129 """Get provided spec or build/cache a per-issue spec.
131 Args:
132 provided_spec: Spec provided in input, or None.
134 Returns:
135 ValidationSpec to use for the gate check.
136 """
137 if provided_spec is not None:
138 return provided_spec
140 # Build and cache per-issue spec if not already cached
141 if self.per_issue_spec is None:
142 self.per_issue_spec = build_validation_spec(
143 self.repo_path,
144 scope=ValidationScope.PER_ISSUE,
145 disable_validations=self.config.disable_validations,
146 )
147 return self.per_issue_spec
149 def run_per_issue_gate(self, input: PerIssueGateInput) -> PerIssueGateOutput:
150 """Run quality gate check for a single issue.
152 This is a synchronous method that performs blocking I/O.
153 The orchestrator should call this via asyncio.to_thread().
155 Args:
156 input: PerIssueGateInput with issue_id, log_path, retry_state.
158 Returns:
159 PerIssueGateOutput with gate_result and new_log_offset.
160 """
161 spec = self._get_or_build_spec(input.spec)
162 logger.debug(
163 "Gate check: issue_id=%s attempt=%d",
164 input.issue_id,
165 input.retry_state.gate_attempt,
166 )
168 # Run gate check via injected checker
169 gate_result = self.gate_checker.check_with_resolution(
170 issue_id=input.issue_id,
171 log_path=input.log_path,
172 baseline_timestamp=input.retry_state.baseline_timestamp,
173 log_offset=input.retry_state.log_offset,
174 spec=cast("ValidationSpecProtocol | None", spec),
175 )
177 # Calculate new offset for next attempt
178 new_offset = self.gate_checker.get_log_end_offset(
179 input.log_path, start_offset=input.retry_state.log_offset
180 )
182 # Check for no-progress condition on retries
183 if input.retry_state.gate_attempt > 1 and not gate_result.passed:
184 no_progress = self.gate_checker.check_no_progress(
185 input.log_path,
186 input.retry_state.log_offset,
187 input.retry_state.previous_commit_hash,
188 gate_result.commit_hash,
189 spec=cast("ValidationSpecProtocol | None", spec),
190 )
191 if no_progress:
192 logger.warning("No progress detected: issue_id=%s", input.issue_id)
193 # Add no-progress to failure reasons
194 updated_reasons = list(gate_result.failure_reasons)
195 updated_reasons.append(
196 "No progress: commit unchanged and no new validation evidence"
197 )
198 gate_result = GateResult(
199 passed=False,
200 failure_reasons=updated_reasons,
201 commit_hash=gate_result.commit_hash,
202 validation_evidence=gate_result.validation_evidence,
203 no_progress=True,
204 resolution=gate_result.resolution,
205 )
207 return PerIssueGateOutput(
208 gate_result=gate_result,
209 new_log_offset=new_offset,
210 )
212 def get_cached_spec(self) -> ValidationSpec | None:
213 """Get the cached per-issue spec, if any.
215 This allows the orchestrator to access the spec for other purposes
216 (e.g., evidence parsing) without rebuilding it.
218 Returns:
219 The cached ValidationSpec, or None if not yet built.
220 """
221 return self.per_issue_spec
223 def set_cached_spec(self, spec: ValidationSpec) -> None:
224 """Set the cached per-issue spec.
226 Allows the orchestrator to pre-populate the cache with a spec
227 built at run start.
229 Args:
230 spec: ValidationSpec to cache.
231 """
232 self.per_issue_spec = spec
233 logger.debug("Validation spec cached: issue_id=*")
236@dataclass
237class AsyncGateRunner:
238 """Async wrapper for GateRunner that runs gate checks in a thread pool.
240 This class implements the GateAsyncRunner protocol used by SessionCallbackFactory.
241 It wraps a GateRunner and delegates to it via asyncio.to_thread for non-blocking
242 execution.
244 The AsyncGateRunner maintains its own state for:
245 - per_issue_spec: Cached validation spec (synced with underlying GateRunner)
246 - last_gate_results: Most recent gate results per issue
248 Usage:
249 gate_runner = GateRunner(gate_checker=..., repo_path=..., config=...)
250 async_runner = AsyncGateRunner(gate_runner=gate_runner)
251 result, offset = await async_runner.run_gate_async(issue_id, log_path, retry_state)
252 """
254 gate_runner: GateRunner
255 per_issue_spec: ValidationSpec | None = field(default=None)
256 last_gate_results: dict[str, GateResult | GateResultProtocol] = field(
257 default_factory=dict
258 )
260 def _run_gate_sync(
261 self,
262 issue_id: str,
263 log_path: Path,
264 retry_state: RetryState,
265 ) -> tuple[GateResult | GateResultProtocol, int]:
266 """Synchronous gate check (blocking I/O).
268 Delegates to GateRunner for actual gate checking logic.
269 """
270 # Sync per_issue_spec with gate_runner
271 if self.per_issue_spec is not None:
272 self.gate_runner.set_cached_spec(self.per_issue_spec)
274 gate_input = PerIssueGateInput(
275 issue_id=issue_id,
276 log_path=log_path,
277 retry_state=retry_state,
278 spec=self.per_issue_spec,
279 )
280 output = self.gate_runner.run_per_issue_gate(gate_input)
282 # Sync cached spec back (gate_runner may have built it)
283 if self.per_issue_spec is None:
284 self.per_issue_spec = self.gate_runner.get_cached_spec()
286 # Store gate result for later retrieval
287 self.last_gate_results[issue_id] = output.gate_result
289 return (output.gate_result, output.new_log_offset)
291 async def run_gate_async(
292 self,
293 issue_id: str,
294 log_path: Path,
295 retry_state: RetryState,
296 ) -> tuple[GateResult | GateResultProtocol, int]:
297 """Run quality gate check asynchronously (GateAsyncRunner protocol).
299 Wraps the blocking gate check to avoid stalling the event loop.
300 This allows the orchestrator to service other agents while a gate runs.
302 Args:
303 issue_id: The issue being checked.
304 log_path: Path to the session log file.
305 retry_state: Current retry state for this issue.
307 Returns:
308 Tuple of (GateResult, new_log_offset).
309 """
310 return await asyncio.to_thread(
311 self._run_gate_sync, issue_id, log_path, retry_state
312 )
314 def get_last_gate_result(
315 self, issue_id: str
316 ) -> GateResult | GateResultProtocol | None:
317 """Get the last gate result for an issue.
319 Args:
320 issue_id: The issue ID.
322 Returns:
323 The last gate result, or None if not available.
324 """
325 return self.last_gate_results.get(issue_id)
327 def clear_gate_result(self, issue_id: str) -> None:
328 """Clear the stored gate result for an issue.
330 Args:
331 issue_id: The issue ID.
332 """
333 self.last_gate_results.pop(issue_id, None)