Coverage for src / domain / validation / spec_runner.py: 27%
94 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Spec-based validation runner for mala.
3This module provides SpecValidationRunner which runs validation using
4ValidationSpec + ValidationContext, the modern API for mala validation.
5"""
7from __future__ import annotations
9import asyncio
10import json
11import os
12from typing import TYPE_CHECKING
14from .lint_cache import LintCache
15from .spec import ValidationArtifacts
16from .spec_executor import (
17 ExecutorConfig,
18 ExecutorInput,
19 SpecCommandExecutor,
20)
21from .spec_result_builder import ResultBuilderInput, SpecResultBuilder
22from .spec_workspace import (
23 SetupError,
24 cleanup_workspace,
25 setup_workspace,
26)
27from .validation_gating import (
28 should_invalidate_lint_cache,
29 should_trigger_validation,
30)
32if TYPE_CHECKING:
33 from pathlib import Path
35 from src.core.protocols import (
36 CommandRunnerPort,
37 EnvConfigPort,
38 LockManagerPort,
39 MalaEventSink,
40 )
42 from .result import ValidationStepResult
43 from .spec import (
44 ValidationContext,
45 ValidationSpec,
46 )
48from .result import ValidationResult
51class CommandFailure(Exception):
52 """Raised when a command fails during validation.
54 Attributes:
55 steps: The steps executed so far (including the failed step).
56 reason: Human-readable failure reason.
57 """
59 def __init__(self, steps: list[ValidationStepResult], reason: str) -> None:
60 super().__init__(reason)
61 self.steps = steps
62 self.reason = reason
65class SpecValidationRunner:
66 """Runs validation according to a ValidationSpec.
68 This runner supports:
69 - Scope-aware validation (per-issue vs run-level)
70 - Per-command mutex settings
71 - Integrated worktree, coverage, and E2E handling
72 - Artifact tracking
73 - Lint caching to skip redundant lint commands
74 """
76 def __init__(
77 self,
78 repo_path: Path,
79 env_config: EnvConfigPort,
80 command_runner: CommandRunnerPort,
81 lock_manager: LockManagerPort,
82 step_timeout_seconds: float | None = None,
83 enable_lint_cache: bool = True,
84 event_sink: MalaEventSink | None = None,
85 ):
86 """Initialize the spec validation runner.
88 Args:
89 repo_path: Path to the repository to validate.
90 env_config: Environment configuration for paths.
91 command_runner: Command runner for executing commands.
92 lock_manager: Lock manager for file locking.
93 step_timeout_seconds: Optional timeout for individual steps.
94 enable_lint_cache: Whether to enable lint caching. Set to False
95 in tests or when caching is not desired.
96 event_sink: Event sink for emitting validation step events.
97 """
98 self.repo_path = repo_path.resolve()
99 self.step_timeout_seconds = step_timeout_seconds
100 self.enable_lint_cache = enable_lint_cache
101 self.env_config = env_config
102 self.command_runner = command_runner
103 self.lock_manager = lock_manager
104 self.event_sink = event_sink
106 async def run_spec(
107 self,
108 spec: ValidationSpec,
109 context: ValidationContext,
110 log_dir: Path | None = None,
111 ) -> ValidationResult:
112 """Run validation according to a ValidationSpec.
114 Args:
115 spec: What validations to run.
116 context: Immutable context for the validation run.
117 log_dir: Directory for logs/artifacts. Uses temp dir if None.
119 Returns:
120 ValidationResult with steps, artifacts, and coverage info.
121 """
122 return await asyncio.to_thread(self._run_spec_sync, spec, context, log_dir)
124 def _run_spec_sync(
125 self,
126 spec: ValidationSpec,
127 context: ValidationContext,
128 log_dir: Path | None = None,
129 ) -> ValidationResult:
130 """Synchronous implementation of run_spec.
132 Uses a pipeline pattern:
133 1. Check validation gating (skip if no code changes match patterns)
134 2. Invalidate caches if config/setup files changed
135 3. setup_workspace -> run_commands -> check_coverage -> run_e2e -> build_result
137 Delegates workspace/baseline/worktree setup to spec_workspace module.
138 """
139 # Step 0: Check validation gating based on changed_files and code_patterns
140 # Skip validation if no files match code_patterns (unless patterns empty)
141 if context.changed_files and not should_trigger_validation(
142 context.changed_files, spec
143 ):
144 # No matching code changes - skip validation (pass without running)
145 artifacts = ValidationArtifacts(log_dir=log_dir) if log_dir else None
146 return ValidationResult(
147 passed=True,
148 steps=[],
149 failure_reasons=[],
150 artifacts=artifacts,
151 )
153 # Note: setup commands always run fresh (not cached per CACHEABLE_KINDS in
154 # spec_executor.py). The should_invalidate_setup_cache() function in
155 # validation_gating.py exists for future setup caching support but is not
156 # currently wired up.
158 # Delegate workspace setup to spec_workspace module
159 runner: CommandRunnerPort = self.command_runner
161 try:
162 # Step 0b: Invalidate lint cache if config_files changed
163 # Done after runner is available so cache invalidation works
164 if context.changed_files and should_invalidate_lint_cache(
165 context.changed_files, spec
166 ):
167 self._invalidate_lint_cache_for_config_change(runner)
169 workspace = setup_workspace(
170 spec=spec,
171 context=context,
172 log_dir=log_dir,
173 step_timeout_seconds=self.step_timeout_seconds,
174 command_runner=runner,
175 env_config=self.env_config,
176 lock_manager=self.lock_manager,
177 )
178 except SetupError as e:
179 # Return early failure for setup errors
180 artifacts = ValidationArtifacts(log_dir=log_dir) if log_dir else None
181 return ValidationResult(
182 passed=False,
183 failure_reasons=[e.reason],
184 retriable=e.retriable,
185 artifacts=artifacts,
186 )
188 # Execute pipeline and capture result, ensuring worktree cleanup
189 result: ValidationResult | None = None
190 try:
191 result = self._run_validation_pipeline(
192 spec,
193 context,
194 workspace.validation_cwd,
195 workspace.artifacts,
196 workspace.log_dir,
197 workspace.run_id,
198 workspace.baseline_percent,
199 runner,
200 )
201 return result
202 finally:
203 # Clean up workspace with correct pass/fail status
204 # On exception, result is None so we treat as failed (validation_passed=False)
205 validation_passed = result.passed if result is not None else False
206 cleanup_workspace(workspace, validation_passed, runner)
208 def _invalidate_lint_cache_for_config_change(
209 self, command_runner: CommandRunnerPort
210 ) -> None:
211 """Invalidate lint cache when config files change.
213 Called when files matching config_files patterns are detected in
214 the changed files. This ensures lint/format/typecheck commands
215 run fresh when their configuration changes.
217 Args:
218 command_runner: The command runner to use for cache operations.
219 """
220 if not self.enable_lint_cache:
221 return
222 try:
223 cache_dir = self.env_config.cache_dir
224 cache = LintCache(
225 cache_dir=cache_dir,
226 repo_path=self.repo_path,
227 command_runner=command_runner,
228 )
229 cache.invalidate_all()
230 except Exception:
231 # If cache invalidation fails, continue anyway
232 # The commands will just run without cache benefit
233 pass
235 def _run_validation_pipeline(
236 self,
237 spec: ValidationSpec,
238 context: ValidationContext,
239 cwd: Path,
240 artifacts: ValidationArtifacts,
241 log_dir: Path,
242 run_id: str,
243 baseline_percent: float | None,
244 command_runner: CommandRunnerPort,
245 ) -> ValidationResult:
246 """Run pipeline: commands -> coverage -> e2e -> result."""
247 env = self._build_spec_env(context, run_id)
248 expected = [cmd.name for cmd in spec.commands]
249 self._write_initial_manifest(log_dir, expected, cwd, run_id, context, spec)
251 # Step 1: Run commands
252 try:
253 steps = self._run_commands(spec, cwd, env, log_dir, command_runner)
254 except CommandFailure as e:
255 self._write_completion_manifest(log_dir, expected, e.steps, e.reason)
256 return ValidationResult(
257 passed=False,
258 steps=e.steps,
259 failure_reasons=[e.reason],
260 artifacts=artifacts,
261 )
263 # Step 2: Build result (coverage check, E2E, result assembly)
264 builder = SpecResultBuilder()
265 builder_input = ResultBuilderInput(
266 spec=spec,
267 context=context,
268 steps=steps,
269 artifacts=artifacts,
270 cwd=cwd,
271 log_dir=log_dir,
272 env=env,
273 baseline_percent=baseline_percent,
274 env_config=self.env_config,
275 command_runner=command_runner,
276 yaml_coverage_config=spec.yaml_coverage_config,
277 )
278 result = builder.build(builder_input)
280 # Write completion manifest
281 failure_reason = result.failure_reasons[0] if result.failure_reasons else None
282 self._write_completion_manifest(log_dir, expected, steps, failure_reason)
284 return result
286 def _write_file_flushed(self, path: Path, content: str) -> None:
287 """Write content to a file with immediate flush to disk.
289 Uses explicit flush() and fsync() to ensure data is persisted
290 before returning. This prevents log data loss if mala is interrupted.
292 Args:
293 path: Path to write to.
294 content: Text content to write.
295 """
296 with open(path, "w") as f:
297 f.write(content)
298 f.flush()
299 os.fsync(f.fileno())
301 def _write_initial_manifest(
302 self,
303 log_dir: Path,
304 expected_commands: list[str],
305 cwd: Path,
306 run_id: str,
307 context: ValidationContext,
308 spec: ValidationSpec,
309 ) -> None:
310 """Write initial manifest of expected commands for debugging.
312 Uses explicit flush() and fsync() to ensure the manifest is written
313 to disk immediately. This provides accurate debugging info if mala
314 is interrupted mid-validation.
315 """
316 manifest_path = log_dir / "validation_manifest.json"
317 self._write_file_flushed(
318 manifest_path,
319 json.dumps(
320 {
321 "expected_commands": expected_commands,
322 "cwd": str(cwd),
323 "run_id": run_id,
324 "issue_id": context.issue_id,
325 "scope": spec.scope.value,
326 },
327 indent=2,
328 ),
329 )
331 def _run_commands(
332 self,
333 spec: ValidationSpec,
334 cwd: Path,
335 env: dict[str, str],
336 log_dir: Path,
337 command_runner: CommandRunnerPort,
338 ) -> list[ValidationStepResult]:
339 """Execute all commands in the spec.
341 Delegates to SpecCommandExecutor for command execution and lint-cache
342 handling. The executor encapsulates all execution logic.
344 Args:
345 spec: Validation spec with commands.
346 cwd: Working directory for commands.
347 env: Environment variables.
348 log_dir: Directory for logs.
349 command_runner: Command runner for executing commands.
351 Returns:
352 List of step results for all commands.
354 Raises:
355 CommandFailure: If a command fails (and allow_fail is False).
356 """
357 # Configure executor
358 executor_config = ExecutorConfig(
359 enable_lint_cache=self.enable_lint_cache,
360 repo_path=self.repo_path,
361 step_timeout_seconds=self.step_timeout_seconds,
362 env_config=self.env_config,
363 command_runner=command_runner,
364 event_sink=self.event_sink,
365 )
366 executor = SpecCommandExecutor(executor_config)
368 # Build executor input
369 executor_input = ExecutorInput(
370 commands=spec.commands,
371 cwd=cwd,
372 env=env,
373 log_dir=log_dir,
374 )
376 # Execute commands
377 output = executor.execute(executor_input)
379 # Raise CommandFailure if execution failed
380 if output.failed:
381 raise CommandFailure(
382 output.steps, output.failure_reason or "Command failed"
383 )
385 return output.steps
387 def _write_completion_manifest(
388 self,
389 log_dir: Path,
390 expected_commands: list[str],
391 steps: list[ValidationStepResult],
392 failure_reason: str | None,
393 ) -> None:
394 """Write completion manifest with expected vs actual commands.
396 Uses explicit flush() and fsync() to ensure the manifest is written
397 to disk immediately. This helps debug cases where commands are
398 unexpectedly skipped and prevents data loss if mala is interrupted.
399 """
400 actual_commands = [s.name for s in steps]
401 manifest = {
402 "expected_commands": expected_commands,
403 "actual_commands": actual_commands,
404 "commands_executed": len(actual_commands),
405 "commands_expected": len(expected_commands),
406 "all_executed": expected_commands == actual_commands,
407 "missing_commands": [
408 c for c in expected_commands if c not in actual_commands
409 ],
410 "failure_reason": failure_reason,
411 "steps": [
412 {
413 "name": s.name,
414 "ok": s.ok,
415 "returncode": s.returncode,
416 "duration_seconds": s.duration_seconds,
417 }
418 for s in steps
419 ],
420 }
421 manifest_path = log_dir / "validation_complete.json"
422 self._write_file_flushed(manifest_path, json.dumps(manifest, indent=2))
424 def _build_spec_env(
425 self,
426 context: ValidationContext,
427 run_id: str,
428 ) -> dict[str, str]:
429 """Build environment for spec-based validation."""
430 lock_dir = str(self.env_config.lock_dir)
431 return {
432 **os.environ,
433 "LOCK_DIR": lock_dir,
434 "AGENT_ID": f"validator-{context.issue_id or run_id}",
435 }