Coverage for src / domain / validation / lint_cache.py: 26%
129 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Disk-persisted lint cache for SpecValidationRunner.
3This module provides LintCache which tracks the last successful run for each
4lint command and skips the command if no files have changed since. The cache
5is persisted to disk (lint_cache.json) so it survives across validation runs.
7The cache is based on:
81. The current git HEAD commit SHA
92. Whether there are uncommitted changes (git status)
103. Hash of uncommitted changes (if any)
12If all match the cached state, the lint command can be skipped.
14Note:
15 This module is distinct from the in-memory LintCache in src/hooks.py,
16 which is designed for Claude agent hooks (make_lint_cache_hook). The two
17 have different APIs and persistence models suited to their use cases.
18"""
20from __future__ import annotations
22import hashlib
23import json
24import os
25from dataclasses import dataclass
26from typing import TYPE_CHECKING
28if TYPE_CHECKING:
29 from pathlib import Path
31 from src.core.protocols import CommandRunnerPort
34def _run_git_command(
35 args: list[str], cwd: Path, command_runner: CommandRunnerPort
36) -> str | None:
37 """Run a git command and return stdout, or None on failure.
39 This is a separate function to make it easy to mock in tests without
40 affecting other subprocess usage.
42 Args:
43 args: Git command arguments (without 'git' prefix).
44 cwd: Working directory.
45 command_runner: Command runner for executing the git command.
47 Returns:
48 stdout as a string, or None if the command failed.
49 """
50 result = command_runner.run(["git", *args], cwd=cwd, timeout=5.0)
51 if result.ok:
52 return result.stdout.strip()
53 return None
56@dataclass(frozen=True)
57class LintCacheKey:
58 """Key identifying a lint command in the cache.
60 Attributes:
61 command_name: Name of the command (e.g., "ruff check", "ty check").
62 working_dir: Path where the command runs (normalized to string).
63 """
65 command_name: str
66 working_dir: str
68 def to_dict(self) -> dict[str, str]:
69 """Convert to a JSON-serializable dict."""
70 return {"command_name": self.command_name, "working_dir": self.working_dir}
72 @classmethod
73 def from_dict(cls, data: dict[str, str]) -> LintCacheKey:
74 """Create from a dict."""
75 return cls(
76 command_name=data["command_name"],
77 working_dir=data["working_dir"],
78 )
81@dataclass(frozen=True)
82class LintCacheEntry:
83 """Cache entry for a lint command.
85 Attributes:
86 head_sha: Git HEAD commit SHA when the command was run.
87 has_uncommitted: Whether there were uncommitted changes.
88 files_hash: Hash of uncommitted file contents (None if clean).
89 """
91 head_sha: str
92 has_uncommitted: bool
93 files_hash: str | None
95 def to_dict(self) -> dict[str, str | bool | None]:
96 """Convert to a JSON-serializable dict."""
97 return {
98 "head_sha": self.head_sha,
99 "has_uncommitted": self.has_uncommitted,
100 "files_hash": self.files_hash,
101 }
103 @classmethod
104 def from_dict(cls, data: dict[str, str | bool | None]) -> LintCacheEntry:
105 """Create from a dict."""
106 return cls(
107 head_sha=str(data["head_sha"]),
108 has_uncommitted=bool(data["has_uncommitted"]),
109 files_hash=data.get("files_hash"), # type: ignore[arg-type]
110 )
113class LintCache:
114 """Disk-persisted cache for SpecValidationRunner to skip redundant lint runs.
116 The cache stores the git state when each lint command last passed:
117 - HEAD commit SHA
118 - Whether there were uncommitted changes
119 - Hash of uncommitted changes (if any)
121 If the current state matches the cached state, the lint command can be
122 skipped since it would produce the same result.
124 Note:
125 This is a DISK-PERSISTED cache designed for batch validation runs
126 in SpecValidationRunner. For an in-memory cache used with Claude
127 agent hooks, see src/hooks.py LintCache which has a different API
128 (check_and_update with two-phase pending/confirmed) suited for
129 make_lint_cache_hook.
131 Example:
132 cache = LintCache(Path("/tmp/lint-cache"), repo_path)
133 if cache.should_skip("ruff check"):
134 print("Skipping ruff check - no changes since last run")
135 else:
136 # run ruff check...
137 if success:
138 cache.mark_passed("ruff check")
139 """
141 def __init__(
142 self,
143 cache_dir: Path,
144 repo_path: Path,
145 command_runner: CommandRunnerPort,
146 git_cwd: Path | None = None,
147 ) -> None:
148 """Initialize the lint cache.
150 Args:
151 cache_dir: Directory to store the cache file.
152 repo_path: Path to the git repository (used for cache key stability).
153 command_runner: Command runner for executing git commands.
154 git_cwd: Working directory for git commands. If None, uses repo_path.
155 This allows using a stable repo_path for cache keys while running
156 git commands in a per-run worktree.
157 """
158 self.cache_dir = cache_dir
159 self.repo_path = repo_path
160 self._command_runner = command_runner
161 self._git_cwd = git_cwd if git_cwd is not None else repo_path
162 self._cache_file = cache_dir / "lint_cache.json"
163 self._entries: dict[str, dict[str, str | bool | None]] = {}
164 self._load()
166 def _load(self) -> None:
167 """Load the cache from disk."""
168 if self._cache_file.exists():
169 try:
170 self._entries = json.loads(self._cache_file.read_text())
171 except (json.JSONDecodeError, OSError):
172 self._entries = {}
174 def _save(self) -> None:
175 """Save the cache to disk with immediate flush.
177 Uses explicit flush() and fsync() to ensure data is written to disk
178 before returning. This prevents data loss if mala is interrupted.
179 """
180 self.cache_dir.mkdir(parents=True, exist_ok=True)
181 with open(self._cache_file, "w") as f:
182 json.dump(self._entries, f, indent=2)
183 f.flush()
184 os.fsync(f.fileno())
186 def _get_key_str(self, command_name: str) -> str:
187 """Get the cache key string for a command."""
188 key = LintCacheKey(
189 command_name=command_name,
190 working_dir=str(self.repo_path),
191 )
192 return json.dumps(key.to_dict(), sort_keys=True)
194 def _get_current_state(self) -> LintCacheEntry | None:
195 """Get the current git state.
197 Returns:
198 LintCacheEntry with current HEAD SHA and uncommitted status,
199 or None if git commands fail (not a git repo or mocked environment).
200 """
201 # Get HEAD SHA
202 try:
203 head_sha = _run_git_command(
204 ["rev-parse", "HEAD"], self._git_cwd, self._command_runner
205 )
206 if head_sha is None:
207 # Not a git repo, no commits, or subprocess mocked - can't cache
208 return None
209 except Exception:
210 # Not a git repo, no commits, or subprocess mocked - can't cache
211 return None
213 # Check for uncommitted changes
214 try:
215 status_output = _run_git_command(
216 ["status", "--porcelain"], self._git_cwd, self._command_runner
217 )
218 if status_output is None:
219 # Can't determine state - don't cache
220 return None
221 has_uncommitted = bool(status_output)
222 except Exception:
223 # Can't determine state - don't cache
224 return None
226 # If there are uncommitted changes, hash them
227 files_hash = None
228 if has_uncommitted:
229 files_hash = self._hash_uncommitted()
231 return LintCacheEntry(
232 head_sha=head_sha,
233 has_uncommitted=has_uncommitted,
234 files_hash=files_hash,
235 )
237 def _hash_uncommitted(self) -> str:
238 """Hash the uncommitted changes including untracked files.
240 Returns:
241 SHA256 hash of the uncommitted diff plus untracked file contents.
242 """
243 hasher = hashlib.sha256()
245 # Get staged + unstaged diff (captures tracked file changes)
246 try:
247 diff = _run_git_command(
248 ["diff", "HEAD"], self._git_cwd, self._command_runner
249 )
250 if diff is None:
251 diff = ""
252 except Exception:
253 diff = ""
254 hasher.update(diff.encode())
256 # Get untracked files and include their contents in the hash
257 try:
258 untracked_output = _run_git_command(
259 ["ls-files", "--others", "--exclude-standard"],
260 self._git_cwd,
261 self._command_runner,
262 )
263 if untracked_output:
264 untracked_files = untracked_output.split("\n")
265 # Sort for deterministic ordering
266 for filepath in sorted(untracked_files):
267 if filepath:
268 # Include the path in the hash
269 hasher.update(f"\n--- untracked: {filepath}\n".encode())
270 # Include the file content using chunked reading
271 # to avoid OOM on large files
272 full_path = self._git_cwd / filepath
273 try:
274 with open(full_path, "rb") as f:
275 for chunk in iter(lambda: f.read(65536), b""):
276 hasher.update(chunk)
277 except OSError:
278 # File may have been deleted or be unreadable
279 hasher.update(b"<unreadable>")
280 except Exception:
281 pass
283 return hasher.hexdigest()[:16]
285 def should_skip(self, command_name: str) -> bool:
286 """Check if a lint command can be skipped.
288 Args:
289 command_name: Name of the command (e.g., "ruff check").
291 Returns:
292 True if the command can be skipped (no changes since last run).
293 """
294 key_str = self._get_key_str(command_name)
295 if key_str not in self._entries:
296 return False
298 current = self._get_current_state()
299 if current is None:
300 # Can't determine state - don't skip
301 return False
303 cached = LintCacheEntry.from_dict(self._entries[key_str])
305 # Must match on all dimensions
306 return (
307 cached.head_sha == current.head_sha
308 and cached.has_uncommitted == current.has_uncommitted
309 and cached.files_hash == current.files_hash
310 )
312 def mark_passed(self, command_name: str) -> None:
313 """Mark a lint command as having passed.
315 Args:
316 command_name: Name of the command that passed.
317 """
318 current = self._get_current_state()
319 if current is None:
320 # Can't determine state - don't cache
321 return
323 key_str = self._get_key_str(command_name)
324 self._entries[key_str] = current.to_dict()
325 self._save()
327 def invalidate(self, command_name: str) -> None:
328 """Invalidate the cache for a command.
330 Args:
331 command_name: Name of the command to invalidate.
332 """
333 key_str = self._get_key_str(command_name)
334 if key_str in self._entries:
335 del self._entries[key_str]
336 self._save()
338 def invalidate_all(self) -> int:
339 """Invalidate all cache entries.
341 Unlike clear(), this removes entries but keeps the cache file,
342 ensuring consistent behavior with per-command invalidate().
344 Returns:
345 Number of entries invalidated.
346 """
347 count = len(self._entries)
348 if count > 0:
349 self._entries = {}
350 self._save()
351 return count
353 def clear(self) -> None:
354 """Clear the entire cache."""
355 self._entries = {}
356 if self._cache_file.exists():
357 self._cache_file.unlink()