Coverage for src / infra / hooks / lint_cache.py: 46%
104 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Lint command caching hooks for reducing redundant lint runs.
3Contains the LintCache class and hook factory for blocking redundant
4lint commands when the git state hasn't changed since the last successful run.
5"""
7from __future__ import annotations
9import hashlib
10import time
11from dataclasses import dataclass
12from pathlib import Path
13from typing import TYPE_CHECKING, Any
15if TYPE_CHECKING:
16 from collections.abc import Set as AbstractSet
18 from .dangerous_commands import PreToolUseHook
20from src.core.tool_name_extractor import extract_tool_name
22from ..tools.command_runner import run_command
23from .dangerous_commands import BASH_TOOL_NAMES
24from .file_cache import FILE_WRITE_TOOLS
26# Default lint tool names for fallback when no ValidationSpec is provided
27DEFAULT_LINT_TOOLS: frozenset[str] = frozenset(
28 {"ruff", "ty", "eslint", "golangci-lint"}
29)
32def _get_git_state(repo_path: Path | None = None) -> str | None:
33 """Get a hash representing the current git state including commit SHA.
35 This captures:
36 - Current HEAD commit SHA
37 - Staged and unstaged changes to tracked files
38 - Untracked files list
40 Returns None if git command fails or not in a git repo.
42 Args:
43 repo_path: Path to the repository. If None, uses current directory.
45 Returns:
46 A hash string representing the complete git state, or None on failure.
47 """
48 try:
49 cwd = repo_path or Path.cwd()
51 # Get current HEAD commit SHA
52 head_result = run_command(
53 ["git", "rev-parse", "HEAD"],
54 cwd=cwd,
55 )
56 if not head_result.ok:
57 return None
58 head_sha = head_result.stdout.strip()
60 # Get hash of working tree state (staged + unstaged changes)
61 diff_result = run_command(
62 ["git", "diff", "HEAD"],
63 cwd=cwd,
64 )
65 if not diff_result.ok:
66 return None
68 # Also include untracked files in the hash
69 untracked = run_command(
70 ["git", "ls-files", "--others", "--exclude-standard"],
71 cwd=cwd,
72 )
74 # Combine HEAD SHA, diff, and untracked files for complete state
75 combined = (
76 head_sha
77 + "\n"
78 + diff_result.stdout
79 + "\n"
80 + (untracked.stdout if untracked.ok else "")
81 )
82 return hashlib.sha256(combined.encode()).hexdigest()[:16]
83 except Exception:
84 return None
87def _detect_lint_command(
88 command: str, lint_tools_lower: AbstractSet[str]
89) -> str | None:
90 """Detect which lint command type is being run.
92 Uses extract_tool_name to dynamically identify the tool from any command,
93 supporting any language's lint tools (eslint, golangci-lint, cargo clippy, etc.).
95 Args:
96 command: The bash command string.
97 lint_tools_lower: Set of known lint tool names (pre-normalized to lowercase).
99 Returns:
100 The extracted tool name if it matches a known lint tool, or None.
101 """
102 tool_name = extract_tool_name(command)
103 if not tool_name:
104 return None
106 # Check if the extracted tool (or its base name) matches any lint tool
107 # Handle compound commands like "cargo clippy" or "npm run:lint"
108 # tool_name is already lowercase from extract_tool_name
109 # lint_tools_lower is pre-computed lowercase set for efficiency
110 tool_name_lower = tool_name.lower()
111 base_tool = tool_name_lower.split()[0]
113 # Check full tool name first (e.g., "cargo clippy", "go vet")
114 if tool_name_lower in lint_tools_lower:
115 return tool_name_lower
117 # Check base tool name (e.g., "ruff", "eslint", "golangci-lint")
118 if base_tool in lint_tools_lower:
119 return base_tool
121 return None
124@dataclass
125class LintCacheEntry:
126 """Cached information about a successful lint run.
128 Attributes:
129 git_state: Git state hash when lint successfully completed.
130 timestamp: Unix timestamp when lint passed.
131 skipped_count: Number of times this lint was skipped due to cache hit.
132 """
134 git_state: str
135 timestamp: float
136 skipped_count: int = 0
139class LintCache:
140 """In-memory cache for tracking successful lint runs during agent sessions.
142 This cache tracks the git state when lint commands pass. A lint is only
143 cached after an explicit success via `mark_success()`. The cache uses
144 commit SHA + working tree diff hash to detect any file changes.
146 The cache supports any lint tool (eslint, golangci-lint, cargo clippy, etc.)
147 by using extract_tool_name() for dynamic detection. Tool names can be
148 configured via the lint_tools parameter or loaded from ValidationSpec.
150 Note:
151 This is an IN-MEMORY cache designed for use with Claude agent hooks
152 (see make_lint_cache_hook). For a disk-persisted cache used in batch
153 validation, see src/validation/lint_cache.py which has a different API
154 (should_skip/mark_passed) suited for SpecValidationRunner.
156 Attributes:
157 _cache: Mapping of lint command type to cached entry.
158 _skipped_count: Total count of lints skipped due to cache hits.
159 _repo_path: Path to the repository for git operations.
160 _lint_tools: Set of lint tool names to recognize.
161 """
163 def __init__(
164 self,
165 repo_path: Path | None = None,
166 lint_tools: AbstractSet[str] | None = None,
167 ) -> None:
168 """Initialize an empty lint cache.
170 Args:
171 repo_path: Path to the repository. If None, uses current directory.
172 lint_tools: Set of lint tool names to recognize. If None, uses
173 DEFAULT_LINT_TOOLS (ruff, ty, eslint, golangci-lint).
174 """
175 self._cache: dict[str, LintCacheEntry] = {}
176 self._skipped_count: int = 0
177 self._repo_path = repo_path
178 self._lint_tools: frozenset[str] = (
179 frozenset(lint_tools) if lint_tools else DEFAULT_LINT_TOOLS
180 )
181 # Pre-compute lowercase version for case-insensitive matching
182 self._lint_tools_lower: frozenset[str] = frozenset(
183 t.lower() for t in self._lint_tools
184 )
186 def _make_cache_key(self, lint_type: str, command: str) -> str:
187 """Create a cache key combining lint type and command.
189 This ensures commands with different arguments (e.g., 'ruff check src/'
190 vs 'ruff check .') are cached separately.
192 Args:
193 lint_type: Type of lint command (e.g., "ruff_check").
194 command: Full command string.
196 Returns:
197 A cache key string combining lint type and command hash.
198 """
199 # Use SHA-256 hash of command to create a stable key
200 command_hash = hashlib.sha256(command.encode()).hexdigest()[:12]
201 return f"{lint_type}:{command_hash}"
203 def check_and_update(self, lint_type: str, command: str = "") -> tuple[bool, str]:
204 """Check if a lint run is redundant based on cached success.
206 Only skips if there is a confirmed successful lint at the current git
207 state (via prior `mark_success()` call).
209 Args:
210 lint_type: Type of lint command (e.g., "ruff_check").
211 command: Full command string for cache key differentiation.
213 Returns:
214 Tuple of (is_redundant, message). If is_redundant is True,
215 the message explains why the lint is skipped.
216 """
217 current_state = _get_git_state(self._repo_path)
218 if current_state is None:
219 # Can't determine git state, allow the lint
220 return (False, "")
222 # Create cache key combining lint type and command
223 cache_key = self._make_cache_key(lint_type, command)
225 # Check if we have a confirmed successful lint at this state
226 cached = self._cache.get(cache_key)
227 if cached is not None and cached.git_state == current_state:
228 # State unchanged since last confirmed success - skip
229 cached.skipped_count += 1
230 self._skipped_count += 1
231 lint_name = lint_type.replace("_", " ")
232 return (
233 True,
234 f"No changes since last {lint_name} (skipped {cached.skipped_count}x). "
235 "Git state unchanged - lint would produce same results.",
236 )
238 # No cached success at current state - allow lint to run
239 return (False, "")
241 def mark_success(self, lint_type: str, command: str = "") -> None:
242 """Explicitly mark a lint as successful at current state.
244 Call this after a lint command completes successfully to cache
245 the result.
247 Args:
248 lint_type: Type of lint command that succeeded.
249 command: Full command string for cache key differentiation.
250 """
251 current_state = _get_git_state(self._repo_path)
252 if current_state is not None:
253 cache_key = self._make_cache_key(lint_type, command)
254 self._cache[cache_key] = LintCacheEntry(
255 git_state=current_state,
256 timestamp=time.time(),
257 skipped_count=0,
258 )
260 def invalidate(self, lint_type: str | None = None) -> None:
261 """Invalidate cache entries.
263 Call this when files are modified to ensure lint runs again.
265 Args:
266 lint_type: Specific lint type to invalidate. If None, clears all.
267 When provided, invalidates all commands for that lint type.
268 """
269 if lint_type is None:
270 self._cache.clear()
271 else:
272 # Cache keys are in format "lint_type:command_hash"
273 # Remove all entries matching the lint type prefix
274 prefix = f"{lint_type}:"
275 keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
276 for key in keys_to_remove:
277 del self._cache[key]
279 @property
280 def skipped_count(self) -> int:
281 """Return the total number of lints skipped due to cache hits."""
282 return self._skipped_count
284 @property
285 def cache_size(self) -> int:
286 """Return the number of lint types currently cached."""
287 return len(self._cache)
289 @property
290 def lint_tools(self) -> frozenset[str]:
291 """Return the set of recognized lint tool names."""
292 return self._lint_tools
294 def detect_lint_command(self, command: str) -> str | None:
295 """Detect if a command is a lint command.
297 Uses extract_tool_name to parse the command and checks against
298 configured lint tools.
300 Args:
301 command: The bash command string.
303 Returns:
304 The lint tool name if detected, or None.
305 """
306 return _detect_lint_command(command, self._lint_tools_lower)
309def make_lint_cache_hook(
310 cache: LintCache,
311) -> PreToolUseHook:
312 """Create a PreToolUse hook that blocks redundant lint commands.
314 This hook checks Bash tool invocations for lint commands using dynamic
315 tool detection via extract_tool_name(). Supports any language's lint tools
316 (eslint, golangci-lint, cargo clippy, etc.) based on the cache's
317 configured lint_tools.
319 If the working tree hasn't changed since the last run of that lint type,
320 the hook blocks the command.
322 The hook also invalidates cache entries when files are written to,
323 ensuring subsequent lints see the updated state.
325 Args:
326 cache: The LintCache instance to use for tracking lint runs.
327 The cache's lint_tools set determines which commands are cached.
329 Returns:
330 An async hook function for ClaudeAgentOptions.hooks["PreToolUse"].
331 """
333 async def lint_cache_hook(
334 hook_input: Any, # noqa: ANN401 - SDK type, avoid import
335 stderr: str | None,
336 context: Any, # noqa: ANN401 - SDK type, avoid import
337 ) -> dict[str, Any]:
338 """PreToolUse hook to block redundant lint commands."""
339 tool_name = hook_input["tool_name"]
340 tool_input = hook_input["tool_input"]
342 # Check for Bash tool with lint command
343 if tool_name.lower() in BASH_TOOL_NAMES:
344 command = tool_input.get("command", "")
345 lint_type = cache.detect_lint_command(command)
346 if lint_type:
347 # Don't block compound commands - only block simple lint commands
348 # This ensures "ruff check . && pytest" runs the test portion
349 if any(sep in command for sep in ["&&", "||", ";"]):
350 return {} # Allow compound commands to run
352 is_redundant, message = cache.check_and_update(lint_type, command)
353 if is_redundant:
354 return {
355 "decision": "block",
356 "reason": message,
357 }
359 # Invalidate cache on file writes (lint results may change)
360 if tool_name in FILE_WRITE_TOOLS:
361 cache.invalidate()
363 return {}
365 return lint_cache_hook