Coverage for src / infra / hooks / lint_cache.py: 46%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Lint command caching hooks for reducing redundant lint runs. 

2 

3Contains the LintCache class and hook factory for blocking redundant 

4lint commands when the git state hasn't changed since the last successful run. 

5""" 

6 

7from __future__ import annotations 

8 

9import hashlib 

10import time 

11from dataclasses import dataclass 

12from pathlib import Path 

13from typing import TYPE_CHECKING, Any 

14 

15if TYPE_CHECKING: 

16 from collections.abc import Set as AbstractSet 

17 

18 from .dangerous_commands import PreToolUseHook 

19 

20from src.core.tool_name_extractor import extract_tool_name 

21 

22from ..tools.command_runner import run_command 

23from .dangerous_commands import BASH_TOOL_NAMES 

24from .file_cache import FILE_WRITE_TOOLS 

25 

26# Default lint tool names for fallback when no ValidationSpec is provided 

27DEFAULT_LINT_TOOLS: frozenset[str] = frozenset( 

28 {"ruff", "ty", "eslint", "golangci-lint"} 

29) 

30 

31 

32def _get_git_state(repo_path: Path | None = None) -> str | None: 

33 """Get a hash representing the current git state including commit SHA. 

34 

35 This captures: 

36 - Current HEAD commit SHA 

37 - Staged and unstaged changes to tracked files 

38 - Untracked files list 

39 

40 Returns None if git command fails or not in a git repo. 

41 

42 Args: 

43 repo_path: Path to the repository. If None, uses current directory. 

44 

45 Returns: 

46 A hash string representing the complete git state, or None on failure. 

47 """ 

48 try: 

49 cwd = repo_path or Path.cwd() 

50 

51 # Get current HEAD commit SHA 

52 head_result = run_command( 

53 ["git", "rev-parse", "HEAD"], 

54 cwd=cwd, 

55 ) 

56 if not head_result.ok: 

57 return None 

58 head_sha = head_result.stdout.strip() 

59 

60 # Get hash of working tree state (staged + unstaged changes) 

61 diff_result = run_command( 

62 ["git", "diff", "HEAD"], 

63 cwd=cwd, 

64 ) 

65 if not diff_result.ok: 

66 return None 

67 

68 # Also include untracked files in the hash 

69 untracked = run_command( 

70 ["git", "ls-files", "--others", "--exclude-standard"], 

71 cwd=cwd, 

72 ) 

73 

74 # Combine HEAD SHA, diff, and untracked files for complete state 

75 combined = ( 

76 head_sha 

77 + "\n" 

78 + diff_result.stdout 

79 + "\n" 

80 + (untracked.stdout if untracked.ok else "") 

81 ) 

82 return hashlib.sha256(combined.encode()).hexdigest()[:16] 

83 except Exception: 

84 return None 

85 

86 

87def _detect_lint_command( 

88 command: str, lint_tools_lower: AbstractSet[str] 

89) -> str | None: 

90 """Detect which lint command type is being run. 

91 

92 Uses extract_tool_name to dynamically identify the tool from any command, 

93 supporting any language's lint tools (eslint, golangci-lint, cargo clippy, etc.). 

94 

95 Args: 

96 command: The bash command string. 

97 lint_tools_lower: Set of known lint tool names (pre-normalized to lowercase). 

98 

99 Returns: 

100 The extracted tool name if it matches a known lint tool, or None. 

101 """ 

102 tool_name = extract_tool_name(command) 

103 if not tool_name: 

104 return None 

105 

106 # Check if the extracted tool (or its base name) matches any lint tool 

107 # Handle compound commands like "cargo clippy" or "npm run:lint" 

108 # tool_name is already lowercase from extract_tool_name 

109 # lint_tools_lower is pre-computed lowercase set for efficiency 

110 tool_name_lower = tool_name.lower() 

111 base_tool = tool_name_lower.split()[0] 

112 

113 # Check full tool name first (e.g., "cargo clippy", "go vet") 

114 if tool_name_lower in lint_tools_lower: 

115 return tool_name_lower 

116 

117 # Check base tool name (e.g., "ruff", "eslint", "golangci-lint") 

118 if base_tool in lint_tools_lower: 

119 return base_tool 

120 

121 return None 

122 

123 

124@dataclass 

125class LintCacheEntry: 

126 """Cached information about a successful lint run. 

127 

128 Attributes: 

129 git_state: Git state hash when lint successfully completed. 

130 timestamp: Unix timestamp when lint passed. 

131 skipped_count: Number of times this lint was skipped due to cache hit. 

132 """ 

133 

134 git_state: str 

135 timestamp: float 

136 skipped_count: int = 0 

137 

138 

139class LintCache: 

140 """In-memory cache for tracking successful lint runs during agent sessions. 

141 

142 This cache tracks the git state when lint commands pass. A lint is only 

143 cached after an explicit success via `mark_success()`. The cache uses 

144 commit SHA + working tree diff hash to detect any file changes. 

145 

146 The cache supports any lint tool (eslint, golangci-lint, cargo clippy, etc.) 

147 by using extract_tool_name() for dynamic detection. Tool names can be 

148 configured via the lint_tools parameter or loaded from ValidationSpec. 

149 

150 Note: 

151 This is an IN-MEMORY cache designed for use with Claude agent hooks 

152 (see make_lint_cache_hook). For a disk-persisted cache used in batch 

153 validation, see src/validation/lint_cache.py which has a different API 

154 (should_skip/mark_passed) suited for SpecValidationRunner. 

155 

156 Attributes: 

157 _cache: Mapping of lint command type to cached entry. 

158 _skipped_count: Total count of lints skipped due to cache hits. 

159 _repo_path: Path to the repository for git operations. 

160 _lint_tools: Set of lint tool names to recognize. 

161 """ 

162 

163 def __init__( 

164 self, 

165 repo_path: Path | None = None, 

166 lint_tools: AbstractSet[str] | None = None, 

167 ) -> None: 

168 """Initialize an empty lint cache. 

169 

170 Args: 

171 repo_path: Path to the repository. If None, uses current directory. 

172 lint_tools: Set of lint tool names to recognize. If None, uses 

173 DEFAULT_LINT_TOOLS (ruff, ty, eslint, golangci-lint). 

174 """ 

175 self._cache: dict[str, LintCacheEntry] = {} 

176 self._skipped_count: int = 0 

177 self._repo_path = repo_path 

178 self._lint_tools: frozenset[str] = ( 

179 frozenset(lint_tools) if lint_tools else DEFAULT_LINT_TOOLS 

180 ) 

181 # Pre-compute lowercase version for case-insensitive matching 

182 self._lint_tools_lower: frozenset[str] = frozenset( 

183 t.lower() for t in self._lint_tools 

184 ) 

185 

186 def _make_cache_key(self, lint_type: str, command: str) -> str: 

187 """Create a cache key combining lint type and command. 

188 

189 This ensures commands with different arguments (e.g., 'ruff check src/' 

190 vs 'ruff check .') are cached separately. 

191 

192 Args: 

193 lint_type: Type of lint command (e.g., "ruff_check"). 

194 command: Full command string. 

195 

196 Returns: 

197 A cache key string combining lint type and command hash. 

198 """ 

199 # Use SHA-256 hash of command to create a stable key 

200 command_hash = hashlib.sha256(command.encode()).hexdigest()[:12] 

201 return f"{lint_type}:{command_hash}" 

202 

203 def check_and_update(self, lint_type: str, command: str = "") -> tuple[bool, str]: 

204 """Check if a lint run is redundant based on cached success. 

205 

206 Only skips if there is a confirmed successful lint at the current git 

207 state (via prior `mark_success()` call). 

208 

209 Args: 

210 lint_type: Type of lint command (e.g., "ruff_check"). 

211 command: Full command string for cache key differentiation. 

212 

213 Returns: 

214 Tuple of (is_redundant, message). If is_redundant is True, 

215 the message explains why the lint is skipped. 

216 """ 

217 current_state = _get_git_state(self._repo_path) 

218 if current_state is None: 

219 # Can't determine git state, allow the lint 

220 return (False, "") 

221 

222 # Create cache key combining lint type and command 

223 cache_key = self._make_cache_key(lint_type, command) 

224 

225 # Check if we have a confirmed successful lint at this state 

226 cached = self._cache.get(cache_key) 

227 if cached is not None and cached.git_state == current_state: 

228 # State unchanged since last confirmed success - skip 

229 cached.skipped_count += 1 

230 self._skipped_count += 1 

231 lint_name = lint_type.replace("_", " ") 

232 return ( 

233 True, 

234 f"No changes since last {lint_name} (skipped {cached.skipped_count}x). " 

235 "Git state unchanged - lint would produce same results.", 

236 ) 

237 

238 # No cached success at current state - allow lint to run 

239 return (False, "") 

240 

241 def mark_success(self, lint_type: str, command: str = "") -> None: 

242 """Explicitly mark a lint as successful at current state. 

243 

244 Call this after a lint command completes successfully to cache 

245 the result. 

246 

247 Args: 

248 lint_type: Type of lint command that succeeded. 

249 command: Full command string for cache key differentiation. 

250 """ 

251 current_state = _get_git_state(self._repo_path) 

252 if current_state is not None: 

253 cache_key = self._make_cache_key(lint_type, command) 

254 self._cache[cache_key] = LintCacheEntry( 

255 git_state=current_state, 

256 timestamp=time.time(), 

257 skipped_count=0, 

258 ) 

259 

260 def invalidate(self, lint_type: str | None = None) -> None: 

261 """Invalidate cache entries. 

262 

263 Call this when files are modified to ensure lint runs again. 

264 

265 Args: 

266 lint_type: Specific lint type to invalidate. If None, clears all. 

267 When provided, invalidates all commands for that lint type. 

268 """ 

269 if lint_type is None: 

270 self._cache.clear() 

271 else: 

272 # Cache keys are in format "lint_type:command_hash" 

273 # Remove all entries matching the lint type prefix 

274 prefix = f"{lint_type}:" 

275 keys_to_remove = [k for k in self._cache if k.startswith(prefix)] 

276 for key in keys_to_remove: 

277 del self._cache[key] 

278 

279 @property 

280 def skipped_count(self) -> int: 

281 """Return the total number of lints skipped due to cache hits.""" 

282 return self._skipped_count 

283 

284 @property 

285 def cache_size(self) -> int: 

286 """Return the number of lint types currently cached.""" 

287 return len(self._cache) 

288 

289 @property 

290 def lint_tools(self) -> frozenset[str]: 

291 """Return the set of recognized lint tool names.""" 

292 return self._lint_tools 

293 

294 def detect_lint_command(self, command: str) -> str | None: 

295 """Detect if a command is a lint command. 

296 

297 Uses extract_tool_name to parse the command and checks against 

298 configured lint tools. 

299 

300 Args: 

301 command: The bash command string. 

302 

303 Returns: 

304 The lint tool name if detected, or None. 

305 """ 

306 return _detect_lint_command(command, self._lint_tools_lower) 

307 

308 

309def make_lint_cache_hook( 

310 cache: LintCache, 

311) -> PreToolUseHook: 

312 """Create a PreToolUse hook that blocks redundant lint commands. 

313 

314 This hook checks Bash tool invocations for lint commands using dynamic 

315 tool detection via extract_tool_name(). Supports any language's lint tools 

316 (eslint, golangci-lint, cargo clippy, etc.) based on the cache's 

317 configured lint_tools. 

318 

319 If the working tree hasn't changed since the last run of that lint type, 

320 the hook blocks the command. 

321 

322 The hook also invalidates cache entries when files are written to, 

323 ensuring subsequent lints see the updated state. 

324 

325 Args: 

326 cache: The LintCache instance to use for tracking lint runs. 

327 The cache's lint_tools set determines which commands are cached. 

328 

329 Returns: 

330 An async hook function for ClaudeAgentOptions.hooks["PreToolUse"]. 

331 """ 

332 

333 async def lint_cache_hook( 

334 hook_input: Any, # noqa: ANN401 - SDK type, avoid import 

335 stderr: str | None, 

336 context: Any, # noqa: ANN401 - SDK type, avoid import 

337 ) -> dict[str, Any]: 

338 """PreToolUse hook to block redundant lint commands.""" 

339 tool_name = hook_input["tool_name"] 

340 tool_input = hook_input["tool_input"] 

341 

342 # Check for Bash tool with lint command 

343 if tool_name.lower() in BASH_TOOL_NAMES: 

344 command = tool_input.get("command", "") 

345 lint_type = cache.detect_lint_command(command) 

346 if lint_type: 

347 # Don't block compound commands - only block simple lint commands 

348 # This ensures "ruff check . && pytest" runs the test portion 

349 if any(sep in command for sep in ["&&", "||", ";"]): 

350 return {} # Allow compound commands to run 

351 

352 is_redundant, message = cache.check_and_update(lint_type, command) 

353 if is_redundant: 

354 return { 

355 "decision": "block", 

356 "reason": message, 

357 } 

358 

359 # Invalidate cache on file writes (lint results may change) 

360 if tool_name in FILE_WRITE_TOOLS: 

361 cache.invalidate() 

362 

363 return {} 

364 

365 return lint_cache_hook