Coverage for src / domain / validation / spec_executor.py: 37%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Command executor for spec-based validation. 

2 

3This module provides SpecCommandExecutor which encapsulates command execution 

4and lint-cache handling. It is separated from SpecValidationRunner to: 

5- Isolate execution logic from orchestration 

6- Enable unit testing with fake CommandRunners 

7- Keep lint-cache skipping behavior in one place 

8 

9The executor receives commands and environment, executes them with lint-cache 

10awareness, and returns structured results without knowledge of the overall 

11validation pipeline. 

12""" 

13 

14from __future__ import annotations 

15 

16import os 

17import shlex 

18from dataclasses import dataclass, field 

19from typing import TYPE_CHECKING 

20 

21from .helpers import format_step_output 

22from .lint_cache import LintCache 

23from .result import ValidationStepResult 

24from .spec import CommandKind 

25 

26if TYPE_CHECKING: 

27 from collections.abc import Mapping 

28 from pathlib import Path 

29 

30 from src.core.protocols import CommandRunnerPort, EnvConfigPort, MalaEventSink 

31 

32 from .spec import ValidationCommand 

33 

34 

35@dataclass(frozen=True, kw_only=True) 

36class ExecutorConfig: 

37 """Configuration for SpecCommandExecutor. 

38 

39 Attributes: 

40 enable_lint_cache: Whether to enable lint caching for cacheable commands. 

41 repo_path: Path to the main repo (for lint cache storage). 

42 step_timeout_seconds: Optional timeout for individual command execution. 

43 env_config: Environment configuration for paths (scripts, cache, etc.). 

44 command_runner: Command runner for executing commands. 

45 event_sink: Event sink for emitting validation step events (optional). 

46 """ 

47 

48 enable_lint_cache: bool = True 

49 repo_path: Path | None = None 

50 step_timeout_seconds: float | None = None 

51 env_config: EnvConfigPort 

52 command_runner: CommandRunnerPort 

53 event_sink: MalaEventSink | None = None 

54 

55 

56@dataclass 

57class ExecutorInput: 

58 """Input to SpecCommandExecutor for a single execution batch. 

59 

60 Attributes: 

61 commands: List of commands to execute. 

62 cwd: Working directory for command execution. 

63 env: Environment variables for commands. 

64 log_dir: Directory for writing step logs. 

65 """ 

66 

67 commands: list[ValidationCommand] 

68 cwd: Path 

69 env: Mapping[str, str] 

70 log_dir: Path 

71 

72 

73@dataclass 

74class ExecutorOutput: 

75 """Output from SpecCommandExecutor after execution. 

76 

77 Attributes: 

78 steps: List of step results for all commands (including skipped). 

79 failed: Whether a command failed (and allow_fail was False). 

80 failure_reason: Human-readable failure reason if failed. 

81 """ 

82 

83 steps: list[ValidationStepResult] = field(default_factory=list) 

84 failed: bool = False 

85 failure_reason: str | None = None 

86 

87 

88class SpecCommandExecutor: 

89 """Executes validation commands with lint-cache awareness. 

90 

91 This executor handles: 

92 - Lint cache checking for FORMAT/LINT/TYPECHECK commands 

93 - Command execution via CommandRunner 

94 - Test mutex wrapping when requested 

95 - Step logging to files 

96 - Failure detection and early exit 

97 

98 The executor is stateless per execution batch - it does not retain 

99 state between execute() calls. The lint cache itself persists to disk. 

100 

101 Example: 

102 executor = SpecCommandExecutor(config) 

103 output = executor.execute(input) 

104 if output.failed: 

105 print(f"Failed: {output.failure_reason}") 

106 """ 

107 

108 # Command kinds eligible for lint caching 

109 CACHEABLE_KINDS = frozenset( 

110 {CommandKind.LINT, CommandKind.FORMAT, CommandKind.TYPECHECK} 

111 ) 

112 

113 def __init__(self, config: ExecutorConfig) -> None: 

114 """Initialize the command executor. 

115 

116 Args: 

117 config: Executor configuration. 

118 """ 

119 self.config = config 

120 

121 def execute(self, input: ExecutorInput) -> ExecutorOutput: 

122 """Execute all commands in the input. 

123 

124 Iterates through commands, checking lint cache for cacheable commands, 

125 executing non-cached commands, and stopping on first failure (unless 

126 allow_fail is set). 

127 

128 Args: 

129 input: ExecutorInput with commands and context. 

130 

131 Returns: 

132 ExecutorOutput with step results and failure info. 

133 """ 

134 output = ExecutorOutput() 

135 

136 # Initialize lint cache if enabled 

137 lint_cache = self._create_lint_cache(input.cwd) 

138 

139 for i, cmd in enumerate(input.commands): 

140 # Check if this command can be skipped via cache 

141 if self._should_skip_cached(cmd, lint_cache): 

142 step = self._create_skipped_step(cmd) 

143 output.steps.append(step) 

144 continue 

145 

146 # Write start marker for debugging 

147 self._write_start_marker(input.log_dir, i, cmd, input.cwd) 

148 

149 # Emit event sink notification for command start 

150 if self.config.event_sink is not None: 

151 self.config.event_sink.on_validation_step_running(cmd.name) 

152 

153 # Execute the command 

154 step = self._run_command(cmd, input.cwd, input.env) 

155 output.steps.append(step) 

156 

157 # Write step logs to files 

158 self._write_step_logs(step, input.log_dir) 

159 

160 # Log result to terminal and update cache 

161 if step.ok: 

162 self._log_success(cmd, step, lint_cache) 

163 else: 

164 self._log_failure(cmd, step) 

165 if not cmd.allow_fail: 

166 output.failed = True 

167 output.failure_reason = self._format_failure_reason(cmd, step) 

168 break 

169 

170 return output 

171 

172 def _create_lint_cache(self, cwd: Path) -> LintCache | None: 

173 """Create lint cache if enabled and paths available. 

174 

175 Args: 

176 cwd: Working directory (passed to LintCache for git operations). 

177 

178 Returns: 

179 LintCache if enabled and paths available, None otherwise. 

180 

181 Note: 

182 The lint cache is created fresh for each execution batch to ensure 

183 correct git state detection, since the same batch may be 

184 validated in different worktrees. 

185 """ 

186 if not self.config.enable_lint_cache: 

187 return None 

188 if self.config.repo_path is None: 

189 return None 

190 cache_dir = self.config.env_config.cache_dir 

191 return LintCache( 

192 cache_dir=cache_dir, 

193 repo_path=self.config.repo_path, 

194 command_runner=self.config.command_runner, 

195 git_cwd=cwd, 

196 ) 

197 

198 def _should_skip_cached( 

199 self, 

200 cmd: ValidationCommand, 

201 lint_cache: LintCache | None, 

202 ) -> bool: 

203 """Check if a command should be skipped due to cache hit. 

204 

205 Args: 

206 cmd: The command to check. 

207 lint_cache: The lint cache (may be None if disabled). 

208 

209 Returns: 

210 True if the command can be skipped. 

211 """ 

212 if lint_cache is None: 

213 return False 

214 if cmd.kind not in self.CACHEABLE_KINDS: 

215 return False 

216 return lint_cache.should_skip(cmd.name) 

217 

218 def _create_skipped_step(self, cmd: ValidationCommand) -> ValidationStepResult: 

219 """Create a synthetic step result for a skipped command. 

220 

221 Args: 

222 cmd: The skipped command. 

223 

224 Returns: 

225 ValidationStepResult indicating the command was skipped. 

226 """ 

227 if self.config.event_sink is not None: 

228 self.config.event_sink.on_validation_step_skipped( 

229 cmd.name, "no changes since last check" 

230 ) 

231 return ValidationStepResult( 

232 name=cmd.name, 

233 command=cmd.command, 

234 ok=True, 

235 returncode=0, 

236 stdout_tail="Skipped: no changes since last check", 

237 stderr_tail="", 

238 duration_seconds=0.0, 

239 ) 

240 

241 def _write_start_marker( 

242 self, 

243 log_dir: Path, 

244 index: int, 

245 cmd: ValidationCommand, 

246 cwd: Path, 

247 ) -> None: 

248 """Write a start marker file for debugging. 

249 

250 Uses explicit flush() and fsync() to ensure the marker is written 

251 to disk immediately. This provides accurate debugging info if mala 

252 is interrupted mid-execution. 

253 

254 Args: 

255 log_dir: Directory for logs. 

256 index: Command index in the batch. 

257 cmd: The command about to run. 

258 cwd: Working directory. 

259 """ 

260 start_marker = log_dir / f"{index:02d}_{cmd.name.replace(' ', '_')}.started" 

261 self._write_file_flushed(start_marker, f"command: {cmd.command}\ncwd: {cwd}\n") 

262 

263 def _run_command( 

264 self, 

265 cmd: ValidationCommand, 

266 cwd: Path, 

267 env: Mapping[str, str], 

268 ) -> ValidationStepResult: 

269 """Execute a single command. 

270 

271 Args: 

272 cmd: The command to run. 

273 cwd: Working directory. 

274 env: Environment variables. 

275 

276 Returns: 

277 ValidationStepResult with execution details. 

278 """ 

279 # Use command's timeout if specified, else fall back to config timeout 

280 timeout = cmd.timeout or self.config.step_timeout_seconds 

281 

282 # Use injected runner (required) 

283 runner = self.config.command_runner 

284 

285 if cmd.shell: 

286 # Shell mode: pass command string directly with shell=True 

287 # For mutex wrapping in shell mode, prepend the script path 

288 if cmd.use_test_mutex: 

289 scripts_dir = self.config.env_config.scripts_dir 

290 full_cmd = f"{scripts_dir / 'test-mutex.sh'} {cmd.command}" 

291 else: 

292 full_cmd = cmd.command 

293 result = runner.run(full_cmd, env=env, shell=True, cwd=cwd, timeout=timeout) 

294 else: 

295 # Non-shell mode: split command and run as list (legacy behavior) 

296 cmd_list = shlex.split(cmd.command) 

297 full_cmd = ( 

298 self._wrap_with_mutex(cmd_list) if cmd.use_test_mutex else cmd_list 

299 ) 

300 result = runner.run(full_cmd, env=env, cwd=cwd, timeout=timeout) 

301 

302 return ValidationStepResult( 

303 name=cmd.name, 

304 command=cmd.command, 

305 ok=result.ok, 

306 returncode=result.returncode, 

307 stdout_tail=result.stdout_tail(), 

308 stderr_tail=result.stderr_tail(), 

309 duration_seconds=result.duration_seconds, 

310 ) 

311 

312 def _wrap_with_mutex(self, cmd: list[str]) -> list[str]: 

313 """Wrap a command with the test mutex script. 

314 

315 Args: 

316 cmd: The command to wrap as a list. 

317 

318 Returns: 

319 Command prefixed with test-mutex.sh. 

320 """ 

321 scripts_dir = self.config.env_config.scripts_dir 

322 return [str(scripts_dir / "test-mutex.sh"), *cmd] 

323 

324 def _write_file_flushed(self, path: Path, content: str) -> None: 

325 """Write content to a file with immediate flush to disk. 

326 

327 Uses explicit flush() and fsync() to ensure data is persisted 

328 before returning. This prevents log data loss if mala is interrupted. 

329 

330 Args: 

331 path: Path to write to. 

332 content: Text content to write. 

333 """ 

334 with open(path, "w") as f: 

335 f.write(content) 

336 f.flush() 

337 os.fsync(f.fileno()) 

338 

339 def _write_step_logs(self, step: ValidationStepResult, log_dir: Path) -> None: 

340 """Write step stdout/stderr to log files. 

341 

342 Uses explicit flush() and fsync() to ensure logs are written to disk 

343 immediately. This prevents log data loss if mala is interrupted. 

344 

345 Args: 

346 step: The step result to log. 

347 log_dir: Directory to write logs to. 

348 """ 

349 # Sanitize step name for filename 

350 safe_name = step.name.replace(" ", "_").replace("/", "_") 

351 

352 if step.stdout_tail: 

353 stdout_path = log_dir / f"{safe_name}.stdout.log" 

354 self._write_file_flushed(stdout_path, step.stdout_tail) 

355 

356 if step.stderr_tail: 

357 stderr_path = log_dir / f"{safe_name}.stderr.log" 

358 self._write_file_flushed(stderr_path, step.stderr_tail) 

359 

360 def _log_success( 

361 self, 

362 cmd: ValidationCommand, 

363 step: ValidationStepResult, 

364 lint_cache: LintCache | None, 

365 ) -> None: 

366 """Log successful step and update lint cache. 

367 

368 Args: 

369 cmd: The command that succeeded. 

370 step: The step result. 

371 lint_cache: The lint cache to update (may be None). 

372 """ 

373 # Emit event sink notification 

374 if self.config.event_sink is not None: 

375 self.config.event_sink.on_validation_step_passed( 

376 cmd.name, step.duration_seconds 

377 ) 

378 

379 # Mark command as passed in cache for cacheable commands 

380 if lint_cache is not None and cmd.kind in self.CACHEABLE_KINDS: 

381 lint_cache.mark_passed(cmd.name) 

382 

383 def _log_failure( 

384 self, 

385 cmd: ValidationCommand, 

386 step: ValidationStepResult, 

387 ) -> None: 

388 """Log failed step. 

389 

390 Args: 

391 cmd: The command that failed. 

392 step: The step result. 

393 """ 

394 # Emit event sink notification 

395 if self.config.event_sink is not None: 

396 self.config.event_sink.on_validation_step_failed(cmd.name, step.returncode) 

397 

398 def _format_failure_reason( 

399 self, 

400 cmd: ValidationCommand, 

401 step: ValidationStepResult, 

402 ) -> str: 

403 """Format a human-readable failure reason. 

404 

405 Args: 

406 cmd: The command that failed. 

407 step: The step result. 

408 

409 Returns: 

410 Formatted failure reason string. 

411 """ 

412 reason = f"{cmd.name} failed (exit {step.returncode})" 

413 details = format_step_output(step.stdout_tail, step.stderr_tail) 

414 if details: 

415 reason = f"{reason}: {details}" 

416 return reason