Coverage for src / infra / hooks / file_cache.py: 33%
83 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""File read caching hooks for reducing redundant file reads.
3Contains the FileReadCache class and hook factory for blocking redundant
4file reads when the file hasn't changed since the last read.
5"""
7from __future__ import annotations
9import hashlib
10from dataclasses import dataclass
11from pathlib import Path
12from typing import TYPE_CHECKING, Any
14if TYPE_CHECKING:
15 from .dangerous_commands import PreToolUseHook
17# Tools that write to files and require lock ownership
18FILE_WRITE_TOOLS: frozenset[str] = frozenset(
19 [
20 "Write", # Claude Code Write tool: file_path
21 "Edit", # Claude Code Edit tool: file_path
22 "NotebookEdit", # Claude Code NotebookEdit: notebook_path
23 ]
24)
26# Map of tool name to the key in tool_input that contains the file path
27FILE_PATH_KEYS: dict[str, str] = {
28 "Write": "file_path",
29 "Edit": "file_path",
30 "NotebookEdit": "notebook_path",
31}
34@dataclass
35class CachedFileInfo:
36 """Cached information about a previously-read file.
38 Attributes:
39 mtime_ns: File modification time in nanoseconds at time of read.
40 size: File size in bytes at time of read.
41 content_hash: SHA-256 hash of the file content. None if hash computation
42 was deferred (when mtime/size change detected file modification).
43 read_count: Number of times this file was read.
44 """
46 mtime_ns: int
47 size: int
48 content_hash: str | None
49 read_count: int = 1
52class FileReadCache:
53 """Cache for tracking file reads and detecting redundant re-reads.
55 This cache tracks files that have been read during an agent session.
56 When a file is re-read without modification, the cache blocks the read
57 and informs the agent that the file hasn't changed, saving tokens.
59 The cache uses file mtime and size as fast change detection, falling back
60 to content hash comparison only when mtime/size match.
62 Attributes:
63 _cache: Mapping of absolute file paths to cached file info.
64 _blocked_count: Count of reads that were blocked due to cache hits.
65 """
67 def __init__(self) -> None:
68 """Initialize an empty file read cache."""
69 # Cache key is (resolved_path, offset, limit) to support range-specific caching
70 self._cache: dict[tuple[str, int | None, int | None], CachedFileInfo] = {}
71 self._blocked_count: int = 0
73 def check_and_update(
74 self,
75 file_path: str,
76 offset: int | None = None,
77 limit: int | None = None,
78 ) -> tuple[bool, str]:
79 """Check if a file read is redundant and update the cache.
81 Args:
82 file_path: Path to the file being read.
83 offset: Line offset for partial reads. If provided with different
84 value than cached read, allows the read.
85 limit: Line limit for partial reads. If provided with different
86 value than cached read, allows the read.
88 Returns:
89 Tuple of (is_redundant, message). If is_redundant is True,
90 the message explains why the read is blocked.
91 """
92 try:
93 path = Path(file_path).resolve()
94 if not path.is_file():
95 # File doesn't exist or is not a file, allow the read
96 return (False, "")
98 stat = path.stat()
99 mtime_ns = stat.st_mtime_ns
100 size = stat.st_size
102 # Create cache key that includes offset/limit for range-specific caching
103 # Use (None, None) as default to represent full file reads
104 cache_key = (str(path), offset, limit)
106 # Check if we have a cached entry for this exact file + range
107 cached = self._cache.get(cache_key)
108 if cached is None:
109 # First read of this file/range combination - cache it
110 content_hash = self._compute_hash(path)
111 self._cache[cache_key] = CachedFileInfo(
112 mtime_ns=mtime_ns,
113 size=size,
114 content_hash=content_hash,
115 read_count=1,
116 )
117 return (False, "")
119 # Check if file has changed based on mtime/size
120 if mtime_ns != cached.mtime_ns or size != cached.size:
121 # File modified - update cache with new mtime/size, defer hash
122 # computation since we already know the file changed
123 self._cache[cache_key] = CachedFileInfo(
124 mtime_ns=mtime_ns,
125 size=size,
126 content_hash=None, # Defer hash computation
127 read_count=cached.read_count + 1,
128 )
129 return (False, "")
131 # mtime/size match - verify with content hash
132 # Always recompute hash to detect rare cases where content changes
133 # without affecting mtime/size (e.g., coarse timestamp resolution)
134 content_hash = self._compute_hash(path)
135 if cached.content_hash is None or content_hash != cached.content_hash:
136 # Content changed despite same mtime/size (rare but possible)
137 # Or no cached hash yet - update cache
138 self._cache[cache_key] = CachedFileInfo(
139 mtime_ns=mtime_ns,
140 size=size,
141 content_hash=content_hash,
142 read_count=cached.read_count + 1,
143 )
144 return (False, "")
146 # Hash matches - file truly unchanged, block the redundant read
147 cached.read_count += 1
148 self._blocked_count += 1
149 return (
150 True,
151 f"File unchanged since last read (read {cached.read_count}x). "
152 "Content already in context - use what you have.",
153 )
155 except OSError:
156 # File access error - allow the read (tool will report the error)
157 return (False, "")
159 def _compute_hash(self, path: Path) -> str:
160 """Compute SHA-256 hash of file content.
162 Args:
163 path: Path to the file.
165 Returns:
166 Hex-encoded SHA-256 hash of the file content.
167 """
168 hasher = hashlib.sha256()
169 with open(path, "rb") as f:
170 # Read in 64KB chunks for memory efficiency with large files
171 for chunk in iter(lambda: f.read(65536), b""):
172 hasher.update(chunk)
173 return hasher.hexdigest()
175 def invalidate(self, file_path: str) -> None:
176 """Invalidate all cache entries for a file (all offset/limit combinations).
178 Call this when a file is modified (e.g., after a Write or edit).
180 Args:
181 file_path: Path to the file to invalidate.
182 """
183 try:
184 path = str(Path(file_path).resolve())
185 # Remove all entries for this file path (any offset/limit)
186 keys_to_remove = [key for key in self._cache if key[0] == path]
187 for key in keys_to_remove:
188 del self._cache[key]
189 except OSError:
190 pass
192 @property
193 def blocked_count(self) -> int:
194 """Return the number of reads blocked due to cache hits."""
195 return self._blocked_count
197 @property
198 def cache_size(self) -> int:
199 """Return the number of files currently cached."""
200 return len(self._cache)
203def make_file_read_cache_hook(cache: FileReadCache) -> PreToolUseHook:
204 """Create a PreToolUse hook that blocks redundant file reads.
206 This hook checks Read tool invocations against the cache. If the file
207 hasn't changed since the last read, the hook blocks the read and
208 informs the agent to use the content already in context.
210 The hook also invalidates cache entries when files are written to,
211 ensuring subsequent reads see the updated content.
213 Args:
214 cache: The FileReadCache instance to use for tracking reads.
216 Returns:
217 An async hook function that can be passed to ClaudeAgentOptions.hooks["PreToolUse"].
218 """
220 async def file_read_cache_hook(
221 hook_input: Any, # noqa: ANN401 - SDK type, avoid import
222 stderr: str | None,
223 context: Any, # noqa: ANN401 - SDK type, avoid import
224 ) -> dict[str, Any]:
225 """PreToolUse hook to block redundant file reads."""
226 tool_name = hook_input["tool_name"]
227 tool_input = hook_input["tool_input"]
229 # Check for Read tool
230 if tool_name == "Read":
231 file_path = tool_input.get("file_path")
232 if file_path:
233 # Extract offset/limit for range-specific caching
234 offset = tool_input.get("offset")
235 limit = tool_input.get("limit")
236 is_redundant, message = cache.check_and_update(
237 file_path, offset=offset, limit=limit
238 )
239 if is_redundant:
240 return {
241 "decision": "block",
242 "reason": message,
243 }
245 # Invalidate cache on file writes
246 if tool_name in FILE_WRITE_TOOLS:
247 path_key = FILE_PATH_KEYS.get(tool_name)
248 if path_key:
249 file_path = tool_input.get(path_key)
250 if file_path:
251 cache.invalidate(file_path)
253 return {}
255 return file_read_cache_hook