Coverage for src / infra / hooks / file_cache.py: 33%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""File read caching hooks for reducing redundant file reads. 

2 

3Contains the FileReadCache class and hook factory for blocking redundant 

4file reads when the file hasn't changed since the last read. 

5""" 

6 

7from __future__ import annotations 

8 

9import hashlib 

10from dataclasses import dataclass 

11from pathlib import Path 

12from typing import TYPE_CHECKING, Any 

13 

14if TYPE_CHECKING: 

15 from .dangerous_commands import PreToolUseHook 

16 

17# Tools that write to files and require lock ownership 

18FILE_WRITE_TOOLS: frozenset[str] = frozenset( 

19 [ 

20 "Write", # Claude Code Write tool: file_path 

21 "Edit", # Claude Code Edit tool: file_path 

22 "NotebookEdit", # Claude Code NotebookEdit: notebook_path 

23 ] 

24) 

25 

26# Map of tool name to the key in tool_input that contains the file path 

27FILE_PATH_KEYS: dict[str, str] = { 

28 "Write": "file_path", 

29 "Edit": "file_path", 

30 "NotebookEdit": "notebook_path", 

31} 

32 

33 

34@dataclass 

35class CachedFileInfo: 

36 """Cached information about a previously-read file. 

37 

38 Attributes: 

39 mtime_ns: File modification time in nanoseconds at time of read. 

40 size: File size in bytes at time of read. 

41 content_hash: SHA-256 hash of the file content. None if hash computation 

42 was deferred (when mtime/size change detected file modification). 

43 read_count: Number of times this file was read. 

44 """ 

45 

46 mtime_ns: int 

47 size: int 

48 content_hash: str | None 

49 read_count: int = 1 

50 

51 

52class FileReadCache: 

53 """Cache for tracking file reads and detecting redundant re-reads. 

54 

55 This cache tracks files that have been read during an agent session. 

56 When a file is re-read without modification, the cache blocks the read 

57 and informs the agent that the file hasn't changed, saving tokens. 

58 

59 The cache uses file mtime and size as fast change detection, falling back 

60 to content hash comparison only when mtime/size match. 

61 

62 Attributes: 

63 _cache: Mapping of absolute file paths to cached file info. 

64 _blocked_count: Count of reads that were blocked due to cache hits. 

65 """ 

66 

67 def __init__(self) -> None: 

68 """Initialize an empty file read cache.""" 

69 # Cache key is (resolved_path, offset, limit) to support range-specific caching 

70 self._cache: dict[tuple[str, int | None, int | None], CachedFileInfo] = {} 

71 self._blocked_count: int = 0 

72 

73 def check_and_update( 

74 self, 

75 file_path: str, 

76 offset: int | None = None, 

77 limit: int | None = None, 

78 ) -> tuple[bool, str]: 

79 """Check if a file read is redundant and update the cache. 

80 

81 Args: 

82 file_path: Path to the file being read. 

83 offset: Line offset for partial reads. If provided with different 

84 value than cached read, allows the read. 

85 limit: Line limit for partial reads. If provided with different 

86 value than cached read, allows the read. 

87 

88 Returns: 

89 Tuple of (is_redundant, message). If is_redundant is True, 

90 the message explains why the read is blocked. 

91 """ 

92 try: 

93 path = Path(file_path).resolve() 

94 if not path.is_file(): 

95 # File doesn't exist or is not a file, allow the read 

96 return (False, "") 

97 

98 stat = path.stat() 

99 mtime_ns = stat.st_mtime_ns 

100 size = stat.st_size 

101 

102 # Create cache key that includes offset/limit for range-specific caching 

103 # Use (None, None) as default to represent full file reads 

104 cache_key = (str(path), offset, limit) 

105 

106 # Check if we have a cached entry for this exact file + range 

107 cached = self._cache.get(cache_key) 

108 if cached is None: 

109 # First read of this file/range combination - cache it 

110 content_hash = self._compute_hash(path) 

111 self._cache[cache_key] = CachedFileInfo( 

112 mtime_ns=mtime_ns, 

113 size=size, 

114 content_hash=content_hash, 

115 read_count=1, 

116 ) 

117 return (False, "") 

118 

119 # Check if file has changed based on mtime/size 

120 if mtime_ns != cached.mtime_ns or size != cached.size: 

121 # File modified - update cache with new mtime/size, defer hash 

122 # computation since we already know the file changed 

123 self._cache[cache_key] = CachedFileInfo( 

124 mtime_ns=mtime_ns, 

125 size=size, 

126 content_hash=None, # Defer hash computation 

127 read_count=cached.read_count + 1, 

128 ) 

129 return (False, "") 

130 

131 # mtime/size match - verify with content hash 

132 # Always recompute hash to detect rare cases where content changes 

133 # without affecting mtime/size (e.g., coarse timestamp resolution) 

134 content_hash = self._compute_hash(path) 

135 if cached.content_hash is None or content_hash != cached.content_hash: 

136 # Content changed despite same mtime/size (rare but possible) 

137 # Or no cached hash yet - update cache 

138 self._cache[cache_key] = CachedFileInfo( 

139 mtime_ns=mtime_ns, 

140 size=size, 

141 content_hash=content_hash, 

142 read_count=cached.read_count + 1, 

143 ) 

144 return (False, "") 

145 

146 # Hash matches - file truly unchanged, block the redundant read 

147 cached.read_count += 1 

148 self._blocked_count += 1 

149 return ( 

150 True, 

151 f"File unchanged since last read (read {cached.read_count}x). " 

152 "Content already in context - use what you have.", 

153 ) 

154 

155 except OSError: 

156 # File access error - allow the read (tool will report the error) 

157 return (False, "") 

158 

159 def _compute_hash(self, path: Path) -> str: 

160 """Compute SHA-256 hash of file content. 

161 

162 Args: 

163 path: Path to the file. 

164 

165 Returns: 

166 Hex-encoded SHA-256 hash of the file content. 

167 """ 

168 hasher = hashlib.sha256() 

169 with open(path, "rb") as f: 

170 # Read in 64KB chunks for memory efficiency with large files 

171 for chunk in iter(lambda: f.read(65536), b""): 

172 hasher.update(chunk) 

173 return hasher.hexdigest() 

174 

175 def invalidate(self, file_path: str) -> None: 

176 """Invalidate all cache entries for a file (all offset/limit combinations). 

177 

178 Call this when a file is modified (e.g., after a Write or edit). 

179 

180 Args: 

181 file_path: Path to the file to invalidate. 

182 """ 

183 try: 

184 path = str(Path(file_path).resolve()) 

185 # Remove all entries for this file path (any offset/limit) 

186 keys_to_remove = [key for key in self._cache if key[0] == path] 

187 for key in keys_to_remove: 

188 del self._cache[key] 

189 except OSError: 

190 pass 

191 

192 @property 

193 def blocked_count(self) -> int: 

194 """Return the number of reads blocked due to cache hits.""" 

195 return self._blocked_count 

196 

197 @property 

198 def cache_size(self) -> int: 

199 """Return the number of files currently cached.""" 

200 return len(self._cache) 

201 

202 

203def make_file_read_cache_hook(cache: FileReadCache) -> PreToolUseHook: 

204 """Create a PreToolUse hook that blocks redundant file reads. 

205 

206 This hook checks Read tool invocations against the cache. If the file 

207 hasn't changed since the last read, the hook blocks the read and 

208 informs the agent to use the content already in context. 

209 

210 The hook also invalidates cache entries when files are written to, 

211 ensuring subsequent reads see the updated content. 

212 

213 Args: 

214 cache: The FileReadCache instance to use for tracking reads. 

215 

216 Returns: 

217 An async hook function that can be passed to ClaudeAgentOptions.hooks["PreToolUse"]. 

218 """ 

219 

220 async def file_read_cache_hook( 

221 hook_input: Any, # noqa: ANN401 - SDK type, avoid import 

222 stderr: str | None, 

223 context: Any, # noqa: ANN401 - SDK type, avoid import 

224 ) -> dict[str, Any]: 

225 """PreToolUse hook to block redundant file reads.""" 

226 tool_name = hook_input["tool_name"] 

227 tool_input = hook_input["tool_input"] 

228 

229 # Check for Read tool 

230 if tool_name == "Read": 

231 file_path = tool_input.get("file_path") 

232 if file_path: 

233 # Extract offset/limit for range-specific caching 

234 offset = tool_input.get("offset") 

235 limit = tool_input.get("limit") 

236 is_redundant, message = cache.check_and_update( 

237 file_path, offset=offset, limit=limit 

238 ) 

239 if is_redundant: 

240 return { 

241 "decision": "block", 

242 "reason": message, 

243 } 

244 

245 # Invalidate cache on file writes 

246 if tool_name in FILE_WRITE_TOOLS: 

247 path_key = FILE_PATH_KEYS.get(tool_name) 

248 if path_key: 

249 file_path = tool_input.get(path_key) 

250 if file_path: 

251 cache.invalidate(file_path) 

252 

253 return {} 

254 

255 return file_read_cache_hook