Coverage for src / domain / validation / spec_runner.py: 27%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Spec-based validation runner for mala. 

2 

3This module provides SpecValidationRunner which runs validation using 

4ValidationSpec + ValidationContext, the modern API for mala validation. 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import json 

11import os 

12from typing import TYPE_CHECKING 

13 

14from .lint_cache import LintCache 

15from .spec import ValidationArtifacts 

16from .spec_executor import ( 

17 ExecutorConfig, 

18 ExecutorInput, 

19 SpecCommandExecutor, 

20) 

21from .spec_result_builder import ResultBuilderInput, SpecResultBuilder 

22from .spec_workspace import ( 

23 SetupError, 

24 cleanup_workspace, 

25 setup_workspace, 

26) 

27from .validation_gating import ( 

28 should_invalidate_lint_cache, 

29 should_trigger_validation, 

30) 

31 

32if TYPE_CHECKING: 

33 from pathlib import Path 

34 

35 from src.core.protocols import ( 

36 CommandRunnerPort, 

37 EnvConfigPort, 

38 LockManagerPort, 

39 MalaEventSink, 

40 ) 

41 

42 from .result import ValidationStepResult 

43 from .spec import ( 

44 ValidationContext, 

45 ValidationSpec, 

46 ) 

47 

48from .result import ValidationResult 

49 

50 

51class CommandFailure(Exception): 

52 """Raised when a command fails during validation. 

53 

54 Attributes: 

55 steps: The steps executed so far (including the failed step). 

56 reason: Human-readable failure reason. 

57 """ 

58 

59 def __init__(self, steps: list[ValidationStepResult], reason: str) -> None: 

60 super().__init__(reason) 

61 self.steps = steps 

62 self.reason = reason 

63 

64 

65class SpecValidationRunner: 

66 """Runs validation according to a ValidationSpec. 

67 

68 This runner supports: 

69 - Scope-aware validation (per-issue vs run-level) 

70 - Per-command mutex settings 

71 - Integrated worktree, coverage, and E2E handling 

72 - Artifact tracking 

73 - Lint caching to skip redundant lint commands 

74 """ 

75 

76 def __init__( 

77 self, 

78 repo_path: Path, 

79 env_config: EnvConfigPort, 

80 command_runner: CommandRunnerPort, 

81 lock_manager: LockManagerPort, 

82 step_timeout_seconds: float | None = None, 

83 enable_lint_cache: bool = True, 

84 event_sink: MalaEventSink | None = None, 

85 ): 

86 """Initialize the spec validation runner. 

87 

88 Args: 

89 repo_path: Path to the repository to validate. 

90 env_config: Environment configuration for paths. 

91 command_runner: Command runner for executing commands. 

92 lock_manager: Lock manager for file locking. 

93 step_timeout_seconds: Optional timeout for individual steps. 

94 enable_lint_cache: Whether to enable lint caching. Set to False 

95 in tests or when caching is not desired. 

96 event_sink: Event sink for emitting validation step events. 

97 """ 

98 self.repo_path = repo_path.resolve() 

99 self.step_timeout_seconds = step_timeout_seconds 

100 self.enable_lint_cache = enable_lint_cache 

101 self.env_config = env_config 

102 self.command_runner = command_runner 

103 self.lock_manager = lock_manager 

104 self.event_sink = event_sink 

105 

106 async def run_spec( 

107 self, 

108 spec: ValidationSpec, 

109 context: ValidationContext, 

110 log_dir: Path | None = None, 

111 ) -> ValidationResult: 

112 """Run validation according to a ValidationSpec. 

113 

114 Args: 

115 spec: What validations to run. 

116 context: Immutable context for the validation run. 

117 log_dir: Directory for logs/artifacts. Uses temp dir if None. 

118 

119 Returns: 

120 ValidationResult with steps, artifacts, and coverage info. 

121 """ 

122 return await asyncio.to_thread(self._run_spec_sync, spec, context, log_dir) 

123 

124 def _run_spec_sync( 

125 self, 

126 spec: ValidationSpec, 

127 context: ValidationContext, 

128 log_dir: Path | None = None, 

129 ) -> ValidationResult: 

130 """Synchronous implementation of run_spec. 

131 

132 Uses a pipeline pattern: 

133 1. Check validation gating (skip if no code changes match patterns) 

134 2. Invalidate caches if config/setup files changed 

135 3. setup_workspace -> run_commands -> check_coverage -> run_e2e -> build_result 

136 

137 Delegates workspace/baseline/worktree setup to spec_workspace module. 

138 """ 

139 # Step 0: Check validation gating based on changed_files and code_patterns 

140 # Skip validation if no files match code_patterns (unless patterns empty) 

141 if context.changed_files and not should_trigger_validation( 

142 context.changed_files, spec 

143 ): 

144 # No matching code changes - skip validation (pass without running) 

145 artifacts = ValidationArtifacts(log_dir=log_dir) if log_dir else None 

146 return ValidationResult( 

147 passed=True, 

148 steps=[], 

149 failure_reasons=[], 

150 artifacts=artifacts, 

151 ) 

152 

153 # Note: setup commands always run fresh (not cached per CACHEABLE_KINDS in 

154 # spec_executor.py). The should_invalidate_setup_cache() function in 

155 # validation_gating.py exists for future setup caching support but is not 

156 # currently wired up. 

157 

158 # Delegate workspace setup to spec_workspace module 

159 runner: CommandRunnerPort = self.command_runner 

160 

161 try: 

162 # Step 0b: Invalidate lint cache if config_files changed 

163 # Done after runner is available so cache invalidation works 

164 if context.changed_files and should_invalidate_lint_cache( 

165 context.changed_files, spec 

166 ): 

167 self._invalidate_lint_cache_for_config_change(runner) 

168 

169 workspace = setup_workspace( 

170 spec=spec, 

171 context=context, 

172 log_dir=log_dir, 

173 step_timeout_seconds=self.step_timeout_seconds, 

174 command_runner=runner, 

175 env_config=self.env_config, 

176 lock_manager=self.lock_manager, 

177 ) 

178 except SetupError as e: 

179 # Return early failure for setup errors 

180 artifacts = ValidationArtifacts(log_dir=log_dir) if log_dir else None 

181 return ValidationResult( 

182 passed=False, 

183 failure_reasons=[e.reason], 

184 retriable=e.retriable, 

185 artifacts=artifacts, 

186 ) 

187 

188 # Execute pipeline and capture result, ensuring worktree cleanup 

189 result: ValidationResult | None = None 

190 try: 

191 result = self._run_validation_pipeline( 

192 spec, 

193 context, 

194 workspace.validation_cwd, 

195 workspace.artifacts, 

196 workspace.log_dir, 

197 workspace.run_id, 

198 workspace.baseline_percent, 

199 runner, 

200 ) 

201 return result 

202 finally: 

203 # Clean up workspace with correct pass/fail status 

204 # On exception, result is None so we treat as failed (validation_passed=False) 

205 validation_passed = result.passed if result is not None else False 

206 cleanup_workspace(workspace, validation_passed, runner) 

207 

208 def _invalidate_lint_cache_for_config_change( 

209 self, command_runner: CommandRunnerPort 

210 ) -> None: 

211 """Invalidate lint cache when config files change. 

212 

213 Called when files matching config_files patterns are detected in 

214 the changed files. This ensures lint/format/typecheck commands 

215 run fresh when their configuration changes. 

216 

217 Args: 

218 command_runner: The command runner to use for cache operations. 

219 """ 

220 if not self.enable_lint_cache: 

221 return 

222 try: 

223 cache_dir = self.env_config.cache_dir 

224 cache = LintCache( 

225 cache_dir=cache_dir, 

226 repo_path=self.repo_path, 

227 command_runner=command_runner, 

228 ) 

229 cache.invalidate_all() 

230 except Exception: 

231 # If cache invalidation fails, continue anyway 

232 # The commands will just run without cache benefit 

233 pass 

234 

235 def _run_validation_pipeline( 

236 self, 

237 spec: ValidationSpec, 

238 context: ValidationContext, 

239 cwd: Path, 

240 artifacts: ValidationArtifacts, 

241 log_dir: Path, 

242 run_id: str, 

243 baseline_percent: float | None, 

244 command_runner: CommandRunnerPort, 

245 ) -> ValidationResult: 

246 """Run pipeline: commands -> coverage -> e2e -> result.""" 

247 env = self._build_spec_env(context, run_id) 

248 expected = [cmd.name for cmd in spec.commands] 

249 self._write_initial_manifest(log_dir, expected, cwd, run_id, context, spec) 

250 

251 # Step 1: Run commands 

252 try: 

253 steps = self._run_commands(spec, cwd, env, log_dir, command_runner) 

254 except CommandFailure as e: 

255 self._write_completion_manifest(log_dir, expected, e.steps, e.reason) 

256 return ValidationResult( 

257 passed=False, 

258 steps=e.steps, 

259 failure_reasons=[e.reason], 

260 artifacts=artifacts, 

261 ) 

262 

263 # Step 2: Build result (coverage check, E2E, result assembly) 

264 builder = SpecResultBuilder() 

265 builder_input = ResultBuilderInput( 

266 spec=spec, 

267 context=context, 

268 steps=steps, 

269 artifacts=artifacts, 

270 cwd=cwd, 

271 log_dir=log_dir, 

272 env=env, 

273 baseline_percent=baseline_percent, 

274 env_config=self.env_config, 

275 command_runner=command_runner, 

276 yaml_coverage_config=spec.yaml_coverage_config, 

277 ) 

278 result = builder.build(builder_input) 

279 

280 # Write completion manifest 

281 failure_reason = result.failure_reasons[0] if result.failure_reasons else None 

282 self._write_completion_manifest(log_dir, expected, steps, failure_reason) 

283 

284 return result 

285 

286 def _write_file_flushed(self, path: Path, content: str) -> None: 

287 """Write content to a file with immediate flush to disk. 

288 

289 Uses explicit flush() and fsync() to ensure data is persisted 

290 before returning. This prevents log data loss if mala is interrupted. 

291 

292 Args: 

293 path: Path to write to. 

294 content: Text content to write. 

295 """ 

296 with open(path, "w") as f: 

297 f.write(content) 

298 f.flush() 

299 os.fsync(f.fileno()) 

300 

301 def _write_initial_manifest( 

302 self, 

303 log_dir: Path, 

304 expected_commands: list[str], 

305 cwd: Path, 

306 run_id: str, 

307 context: ValidationContext, 

308 spec: ValidationSpec, 

309 ) -> None: 

310 """Write initial manifest of expected commands for debugging. 

311 

312 Uses explicit flush() and fsync() to ensure the manifest is written 

313 to disk immediately. This provides accurate debugging info if mala 

314 is interrupted mid-validation. 

315 """ 

316 manifest_path = log_dir / "validation_manifest.json" 

317 self._write_file_flushed( 

318 manifest_path, 

319 json.dumps( 

320 { 

321 "expected_commands": expected_commands, 

322 "cwd": str(cwd), 

323 "run_id": run_id, 

324 "issue_id": context.issue_id, 

325 "scope": spec.scope.value, 

326 }, 

327 indent=2, 

328 ), 

329 ) 

330 

331 def _run_commands( 

332 self, 

333 spec: ValidationSpec, 

334 cwd: Path, 

335 env: dict[str, str], 

336 log_dir: Path, 

337 command_runner: CommandRunnerPort, 

338 ) -> list[ValidationStepResult]: 

339 """Execute all commands in the spec. 

340 

341 Delegates to SpecCommandExecutor for command execution and lint-cache 

342 handling. The executor encapsulates all execution logic. 

343 

344 Args: 

345 spec: Validation spec with commands. 

346 cwd: Working directory for commands. 

347 env: Environment variables. 

348 log_dir: Directory for logs. 

349 command_runner: Command runner for executing commands. 

350 

351 Returns: 

352 List of step results for all commands. 

353 

354 Raises: 

355 CommandFailure: If a command fails (and allow_fail is False). 

356 """ 

357 # Configure executor 

358 executor_config = ExecutorConfig( 

359 enable_lint_cache=self.enable_lint_cache, 

360 repo_path=self.repo_path, 

361 step_timeout_seconds=self.step_timeout_seconds, 

362 env_config=self.env_config, 

363 command_runner=command_runner, 

364 event_sink=self.event_sink, 

365 ) 

366 executor = SpecCommandExecutor(executor_config) 

367 

368 # Build executor input 

369 executor_input = ExecutorInput( 

370 commands=spec.commands, 

371 cwd=cwd, 

372 env=env, 

373 log_dir=log_dir, 

374 ) 

375 

376 # Execute commands 

377 output = executor.execute(executor_input) 

378 

379 # Raise CommandFailure if execution failed 

380 if output.failed: 

381 raise CommandFailure( 

382 output.steps, output.failure_reason or "Command failed" 

383 ) 

384 

385 return output.steps 

386 

387 def _write_completion_manifest( 

388 self, 

389 log_dir: Path, 

390 expected_commands: list[str], 

391 steps: list[ValidationStepResult], 

392 failure_reason: str | None, 

393 ) -> None: 

394 """Write completion manifest with expected vs actual commands. 

395 

396 Uses explicit flush() and fsync() to ensure the manifest is written 

397 to disk immediately. This helps debug cases where commands are 

398 unexpectedly skipped and prevents data loss if mala is interrupted. 

399 """ 

400 actual_commands = [s.name for s in steps] 

401 manifest = { 

402 "expected_commands": expected_commands, 

403 "actual_commands": actual_commands, 

404 "commands_executed": len(actual_commands), 

405 "commands_expected": len(expected_commands), 

406 "all_executed": expected_commands == actual_commands, 

407 "missing_commands": [ 

408 c for c in expected_commands if c not in actual_commands 

409 ], 

410 "failure_reason": failure_reason, 

411 "steps": [ 

412 { 

413 "name": s.name, 

414 "ok": s.ok, 

415 "returncode": s.returncode, 

416 "duration_seconds": s.duration_seconds, 

417 } 

418 for s in steps 

419 ], 

420 } 

421 manifest_path = log_dir / "validation_complete.json" 

422 self._write_file_flushed(manifest_path, json.dumps(manifest, indent=2)) 

423 

424 def _build_spec_env( 

425 self, 

426 context: ValidationContext, 

427 run_id: str, 

428 ) -> dict[str, str]: 

429 """Build environment for spec-based validation.""" 

430 lock_dir = str(self.env_config.lock_dir) 

431 return { 

432 **os.environ, 

433 "LOCK_DIR": lock_dir, 

434 "AGENT_ID": f"validator-{context.issue_id or run_id}", 

435 }