Coverage for src / domain / validation / spec_executor.py: 37%
117 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Command executor for spec-based validation.
3This module provides SpecCommandExecutor which encapsulates command execution
4and lint-cache handling. It is separated from SpecValidationRunner to:
5- Isolate execution logic from orchestration
6- Enable unit testing with fake CommandRunners
7- Keep lint-cache skipping behavior in one place
9The executor receives commands and environment, executes them with lint-cache
10awareness, and returns structured results without knowledge of the overall
11validation pipeline.
12"""
14from __future__ import annotations
16import os
17import shlex
18from dataclasses import dataclass, field
19from typing import TYPE_CHECKING
21from .helpers import format_step_output
22from .lint_cache import LintCache
23from .result import ValidationStepResult
24from .spec import CommandKind
26if TYPE_CHECKING:
27 from collections.abc import Mapping
28 from pathlib import Path
30 from src.core.protocols import CommandRunnerPort, EnvConfigPort, MalaEventSink
32 from .spec import ValidationCommand
35@dataclass(frozen=True, kw_only=True)
36class ExecutorConfig:
37 """Configuration for SpecCommandExecutor.
39 Attributes:
40 enable_lint_cache: Whether to enable lint caching for cacheable commands.
41 repo_path: Path to the main repo (for lint cache storage).
42 step_timeout_seconds: Optional timeout for individual command execution.
43 env_config: Environment configuration for paths (scripts, cache, etc.).
44 command_runner: Command runner for executing commands.
45 event_sink: Event sink for emitting validation step events (optional).
46 """
48 enable_lint_cache: bool = True
49 repo_path: Path | None = None
50 step_timeout_seconds: float | None = None
51 env_config: EnvConfigPort
52 command_runner: CommandRunnerPort
53 event_sink: MalaEventSink | None = None
56@dataclass
57class ExecutorInput:
58 """Input to SpecCommandExecutor for a single execution batch.
60 Attributes:
61 commands: List of commands to execute.
62 cwd: Working directory for command execution.
63 env: Environment variables for commands.
64 log_dir: Directory for writing step logs.
65 """
67 commands: list[ValidationCommand]
68 cwd: Path
69 env: Mapping[str, str]
70 log_dir: Path
73@dataclass
74class ExecutorOutput:
75 """Output from SpecCommandExecutor after execution.
77 Attributes:
78 steps: List of step results for all commands (including skipped).
79 failed: Whether a command failed (and allow_fail was False).
80 failure_reason: Human-readable failure reason if failed.
81 """
83 steps: list[ValidationStepResult] = field(default_factory=list)
84 failed: bool = False
85 failure_reason: str | None = None
88class SpecCommandExecutor:
89 """Executes validation commands with lint-cache awareness.
91 This executor handles:
92 - Lint cache checking for FORMAT/LINT/TYPECHECK commands
93 - Command execution via CommandRunner
94 - Test mutex wrapping when requested
95 - Step logging to files
96 - Failure detection and early exit
98 The executor is stateless per execution batch - it does not retain
99 state between execute() calls. The lint cache itself persists to disk.
101 Example:
102 executor = SpecCommandExecutor(config)
103 output = executor.execute(input)
104 if output.failed:
105 print(f"Failed: {output.failure_reason}")
106 """
108 # Command kinds eligible for lint caching
109 CACHEABLE_KINDS = frozenset(
110 {CommandKind.LINT, CommandKind.FORMAT, CommandKind.TYPECHECK}
111 )
113 def __init__(self, config: ExecutorConfig) -> None:
114 """Initialize the command executor.
116 Args:
117 config: Executor configuration.
118 """
119 self.config = config
121 def execute(self, input: ExecutorInput) -> ExecutorOutput:
122 """Execute all commands in the input.
124 Iterates through commands, checking lint cache for cacheable commands,
125 executing non-cached commands, and stopping on first failure (unless
126 allow_fail is set).
128 Args:
129 input: ExecutorInput with commands and context.
131 Returns:
132 ExecutorOutput with step results and failure info.
133 """
134 output = ExecutorOutput()
136 # Initialize lint cache if enabled
137 lint_cache = self._create_lint_cache(input.cwd)
139 for i, cmd in enumerate(input.commands):
140 # Check if this command can be skipped via cache
141 if self._should_skip_cached(cmd, lint_cache):
142 step = self._create_skipped_step(cmd)
143 output.steps.append(step)
144 continue
146 # Write start marker for debugging
147 self._write_start_marker(input.log_dir, i, cmd, input.cwd)
149 # Emit event sink notification for command start
150 if self.config.event_sink is not None:
151 self.config.event_sink.on_validation_step_running(cmd.name)
153 # Execute the command
154 step = self._run_command(cmd, input.cwd, input.env)
155 output.steps.append(step)
157 # Write step logs to files
158 self._write_step_logs(step, input.log_dir)
160 # Log result to terminal and update cache
161 if step.ok:
162 self._log_success(cmd, step, lint_cache)
163 else:
164 self._log_failure(cmd, step)
165 if not cmd.allow_fail:
166 output.failed = True
167 output.failure_reason = self._format_failure_reason(cmd, step)
168 break
170 return output
172 def _create_lint_cache(self, cwd: Path) -> LintCache | None:
173 """Create lint cache if enabled and paths available.
175 Args:
176 cwd: Working directory (passed to LintCache for git operations).
178 Returns:
179 LintCache if enabled and paths available, None otherwise.
181 Note:
182 The lint cache is created fresh for each execution batch to ensure
183 correct git state detection, since the same batch may be
184 validated in different worktrees.
185 """
186 if not self.config.enable_lint_cache:
187 return None
188 if self.config.repo_path is None:
189 return None
190 cache_dir = self.config.env_config.cache_dir
191 return LintCache(
192 cache_dir=cache_dir,
193 repo_path=self.config.repo_path,
194 command_runner=self.config.command_runner,
195 git_cwd=cwd,
196 )
198 def _should_skip_cached(
199 self,
200 cmd: ValidationCommand,
201 lint_cache: LintCache | None,
202 ) -> bool:
203 """Check if a command should be skipped due to cache hit.
205 Args:
206 cmd: The command to check.
207 lint_cache: The lint cache (may be None if disabled).
209 Returns:
210 True if the command can be skipped.
211 """
212 if lint_cache is None:
213 return False
214 if cmd.kind not in self.CACHEABLE_KINDS:
215 return False
216 return lint_cache.should_skip(cmd.name)
218 def _create_skipped_step(self, cmd: ValidationCommand) -> ValidationStepResult:
219 """Create a synthetic step result for a skipped command.
221 Args:
222 cmd: The skipped command.
224 Returns:
225 ValidationStepResult indicating the command was skipped.
226 """
227 if self.config.event_sink is not None:
228 self.config.event_sink.on_validation_step_skipped(
229 cmd.name, "no changes since last check"
230 )
231 return ValidationStepResult(
232 name=cmd.name,
233 command=cmd.command,
234 ok=True,
235 returncode=0,
236 stdout_tail="Skipped: no changes since last check",
237 stderr_tail="",
238 duration_seconds=0.0,
239 )
241 def _write_start_marker(
242 self,
243 log_dir: Path,
244 index: int,
245 cmd: ValidationCommand,
246 cwd: Path,
247 ) -> None:
248 """Write a start marker file for debugging.
250 Uses explicit flush() and fsync() to ensure the marker is written
251 to disk immediately. This provides accurate debugging info if mala
252 is interrupted mid-execution.
254 Args:
255 log_dir: Directory for logs.
256 index: Command index in the batch.
257 cmd: The command about to run.
258 cwd: Working directory.
259 """
260 start_marker = log_dir / f"{index:02d}_{cmd.name.replace(' ', '_')}.started"
261 self._write_file_flushed(start_marker, f"command: {cmd.command}\ncwd: {cwd}\n")
263 def _run_command(
264 self,
265 cmd: ValidationCommand,
266 cwd: Path,
267 env: Mapping[str, str],
268 ) -> ValidationStepResult:
269 """Execute a single command.
271 Args:
272 cmd: The command to run.
273 cwd: Working directory.
274 env: Environment variables.
276 Returns:
277 ValidationStepResult with execution details.
278 """
279 # Use command's timeout if specified, else fall back to config timeout
280 timeout = cmd.timeout or self.config.step_timeout_seconds
282 # Use injected runner (required)
283 runner = self.config.command_runner
285 if cmd.shell:
286 # Shell mode: pass command string directly with shell=True
287 # For mutex wrapping in shell mode, prepend the script path
288 if cmd.use_test_mutex:
289 scripts_dir = self.config.env_config.scripts_dir
290 full_cmd = f"{scripts_dir / 'test-mutex.sh'} {cmd.command}"
291 else:
292 full_cmd = cmd.command
293 result = runner.run(full_cmd, env=env, shell=True, cwd=cwd, timeout=timeout)
294 else:
295 # Non-shell mode: split command and run as list (legacy behavior)
296 cmd_list = shlex.split(cmd.command)
297 full_cmd = (
298 self._wrap_with_mutex(cmd_list) if cmd.use_test_mutex else cmd_list
299 )
300 result = runner.run(full_cmd, env=env, cwd=cwd, timeout=timeout)
302 return ValidationStepResult(
303 name=cmd.name,
304 command=cmd.command,
305 ok=result.ok,
306 returncode=result.returncode,
307 stdout_tail=result.stdout_tail(),
308 stderr_tail=result.stderr_tail(),
309 duration_seconds=result.duration_seconds,
310 )
312 def _wrap_with_mutex(self, cmd: list[str]) -> list[str]:
313 """Wrap a command with the test mutex script.
315 Args:
316 cmd: The command to wrap as a list.
318 Returns:
319 Command prefixed with test-mutex.sh.
320 """
321 scripts_dir = self.config.env_config.scripts_dir
322 return [str(scripts_dir / "test-mutex.sh"), *cmd]
324 def _write_file_flushed(self, path: Path, content: str) -> None:
325 """Write content to a file with immediate flush to disk.
327 Uses explicit flush() and fsync() to ensure data is persisted
328 before returning. This prevents log data loss if mala is interrupted.
330 Args:
331 path: Path to write to.
332 content: Text content to write.
333 """
334 with open(path, "w") as f:
335 f.write(content)
336 f.flush()
337 os.fsync(f.fileno())
339 def _write_step_logs(self, step: ValidationStepResult, log_dir: Path) -> None:
340 """Write step stdout/stderr to log files.
342 Uses explicit flush() and fsync() to ensure logs are written to disk
343 immediately. This prevents log data loss if mala is interrupted.
345 Args:
346 step: The step result to log.
347 log_dir: Directory to write logs to.
348 """
349 # Sanitize step name for filename
350 safe_name = step.name.replace(" ", "_").replace("/", "_")
352 if step.stdout_tail:
353 stdout_path = log_dir / f"{safe_name}.stdout.log"
354 self._write_file_flushed(stdout_path, step.stdout_tail)
356 if step.stderr_tail:
357 stderr_path = log_dir / f"{safe_name}.stderr.log"
358 self._write_file_flushed(stderr_path, step.stderr_tail)
360 def _log_success(
361 self,
362 cmd: ValidationCommand,
363 step: ValidationStepResult,
364 lint_cache: LintCache | None,
365 ) -> None:
366 """Log successful step and update lint cache.
368 Args:
369 cmd: The command that succeeded.
370 step: The step result.
371 lint_cache: The lint cache to update (may be None).
372 """
373 # Emit event sink notification
374 if self.config.event_sink is not None:
375 self.config.event_sink.on_validation_step_passed(
376 cmd.name, step.duration_seconds
377 )
379 # Mark command as passed in cache for cacheable commands
380 if lint_cache is not None and cmd.kind in self.CACHEABLE_KINDS:
381 lint_cache.mark_passed(cmd.name)
383 def _log_failure(
384 self,
385 cmd: ValidationCommand,
386 step: ValidationStepResult,
387 ) -> None:
388 """Log failed step.
390 Args:
391 cmd: The command that failed.
392 step: The step result.
393 """
394 # Emit event sink notification
395 if self.config.event_sink is not None:
396 self.config.event_sink.on_validation_step_failed(cmd.name, step.returncode)
398 def _format_failure_reason(
399 self,
400 cmd: ValidationCommand,
401 step: ValidationStepResult,
402 ) -> str:
403 """Format a human-readable failure reason.
405 Args:
406 cmd: The command that failed.
407 step: The step result.
409 Returns:
410 Formatted failure reason string.
411 """
412 reason = f"{cmd.name} failed (exit {step.returncode})"
413 details = format_step_output(step.stdout_tail, step.stderr_tail)
414 if details:
415 reason = f"{reason}: {details}"
416 return reason