Coverage for src / infra / io / session_log_parser.py: 23%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Session log parser for JSONL log files from Claude Agent SDK. 

2 

3This module provides SessionLogParser, a reusable class for parsing JSONL log 

4files produced by Claude Agent SDK. It extracts structured information from 

5logs that can be used for: 

6- Validation evidence detection (did pytest/ruff/ty run?) 

7- Issue resolution marker detection (ISSUE_NO_CHANGE, ISSUE_OBSOLETE, etc.) 

8- Debugging and analytics 

9 

10The parser uses typed log events from log_events.py to ensure type safety 

11and contract adherence with the Claude Agent SDK schema. 

12 

13This module also provides FileSystemLogProvider, the canonical implementation 

14of the LogProvider protocol that reads logs from the Claude SDK's filesystem 

15storage at ~/.claude/projects/{encoded-path}/. 

16""" 

17 

18from __future__ import annotations 

19 

20import json 

21from dataclasses import dataclass 

22from typing import TYPE_CHECKING, Any, cast 

23 

24from src.core.log_events import ( 

25 AssistantLogEntry, 

26 TextBlock, 

27 ToolResultBlock, 

28 ToolUseBlock, 

29 UserLogEntry, 

30 parse_log_entry, 

31) 

32from src.infra.tools.env import encode_repo_path, get_claude_config_dir 

33 

34if TYPE_CHECKING: 

35 from collections.abc import Iterator 

36 from pathlib import Path 

37 

38 from src.core.log_events import LogEntry 

39 from src.core.protocols import JsonlEntryProtocol 

40 

41 

42@dataclass 

43class JsonlEntry: 

44 """A parsed JSONL log entry with byte offset tracking. 

45 

46 Attributes: 

47 data: The parsed JSON object from this line. 

48 entry: The typed LogEntry if successfully parsed, None otherwise. 

49 line_len: Length of the raw line in bytes (for offset tracking). 

50 offset: Byte offset where this line started in the file. 

51 """ 

52 

53 data: dict[str, Any] 

54 entry: LogEntry | None 

55 line_len: int 

56 offset: int 

57 

58 

59class SessionLogParser: 

60 """Parser for JSONL session logs from Claude Agent SDK. 

61 

62 This class provides methods to iterate over and extract structured 

63 information from JSONL log files. It handles: 

64 - Byte-offset-aware iteration for incremental parsing 

65 - Extraction of Bash commands from tool_use blocks 

66 - Extraction of tool results with error status 

67 - Extraction of text blocks from assistant messages 

68 

69 Example: 

70 >>> parser = SessionLogParser() 

71 >>> for entry in parser.iter_jsonl_entries(log_path): 

72 ... commands = parser.extract_bash_commands(entry) 

73 ... for tool_id, command in commands: 

74 ... print(f"Command: {command}") 

75 """ 

76 

77 def iter_jsonl_entries( 

78 self, log_path: Path, offset: int = 0 

79 ) -> Iterator[JsonlEntry]: 

80 """Iterate over parsed JSONL entries from a log file. 

81 

82 Reads the file in binary mode for accurate byte offset tracking, 

83 decodes each line as UTF-8, parses JSON, and yields structured entries. 

84 

85 Args: 

86 log_path: Path to the JSONL log file. 

87 offset: Byte offset to start reading from (default 0). 

88 

89 Yields: 

90 JsonlEntry objects for each successfully parsed JSON line. 

91 The entry field contains the typed LogEntry if parsing succeeded. 

92 

93 Raises: 

94 OSError: If file cannot be read. Callers should handle this. 

95 

96 Note: 

97 - Lines that fail UTF-8 decoding are silently skipped 

98 - Empty lines are silently skipped 

99 - Lines that fail JSON parsing are silently skipped 

100 """ 

101 if not log_path.exists(): 

102 return 

103 

104 with open(log_path, "rb") as f: 

105 f.seek(offset) 

106 current_offset = offset 

107 

108 for line_bytes in f: 

109 line_len = len(line_bytes) 

110 line_offset = current_offset 

111 current_offset += line_len 

112 

113 try: 

114 line = line_bytes.decode("utf-8").strip() 

115 except UnicodeDecodeError: 

116 continue 

117 

118 if not line: 

119 continue 

120 

121 try: 

122 data = json.loads(line) 

123 except json.JSONDecodeError: 

124 continue 

125 

126 # Parse into typed LogEntry (returns None for unrecognized entries) 

127 entry = parse_log_entry(data) 

128 

129 yield JsonlEntry( 

130 data=data, entry=entry, line_len=line_len, offset=line_offset 

131 ) 

132 

133 def get_log_end_offset(self, log_path: Path, start_offset: int = 0) -> int: 

134 """Get the byte offset at the end of a log file. 

135 

136 This is a lightweight method for getting the current file position. 

137 Use this when you only need the offset for retry scoping, not the 

138 parsed entries themselves. 

139 

140 Args: 

141 log_path: Path to the JSONL log file. 

142 start_offset: Byte offset to start from (default 0). 

143 

144 Returns: 

145 The byte offset at the end of the file, or start_offset if file 

146 doesn't exist or can't be read. 

147 """ 

148 if not log_path.exists(): 

149 return start_offset 

150 

151 try: 

152 with open(log_path, "rb") as f: 

153 f.seek(0, 2) # Seek to end 

154 return f.tell() 

155 except OSError: 

156 return start_offset 

157 

158 def extract_bash_commands( 

159 self, entry: JsonlEntry | JsonlEntryProtocol 

160 ) -> list[tuple[str, str]]: 

161 """Extract Bash tool_use commands from an entry. 

162 

163 Args: 

164 entry: A JsonlEntry or JsonlEntryProtocol from iter_jsonl_entries. 

165 

166 Returns: 

167 List of (tool_id, command) tuples for Bash tool_use blocks. 

168 Returns empty list if entry is not an assistant message. 

169 """ 

170 # Use typed entry if available 

171 if entry.entry is not None: 

172 if not isinstance(entry.entry, AssistantLogEntry): 

173 return [] 

174 commands = [] 

175 for block in entry.entry.message.content: 

176 if isinstance(block, ToolUseBlock) and block.name.lower() == "bash": 

177 command = block.input.get("command", "") 

178 commands.append((block.id, command)) 

179 return commands 

180 

181 # Fallback to raw data parsing for backward compatibility 

182 return self._extract_bash_commands_from_data(entry.data) 

183 

184 def _extract_bash_commands_from_data( 

185 self, data: dict[str, Any] 

186 ) -> list[tuple[str, str]]: 

187 """Extract Bash commands from raw entry data (fallback method). 

188 

189 Args: 

190 data: Parsed JSONL entry data. 

191 

192 Returns: 

193 List of (tool_id, command) tuples for Bash tool_use blocks. 

194 """ 

195 if data.get("type") != "assistant": 

196 return [] 

197 

198 commands = [] 

199 message = data.get("message", {}) 

200 for block in message.get("content", []): 

201 if isinstance(block, dict) and block.get("type") == "tool_use": 

202 tool_name = block.get("name", "") 

203 if tool_name.lower() == "bash": 

204 tool_id = block.get("id", "") 

205 command = block.get("input", {}).get("command", "") 

206 commands.append((tool_id, command)) 

207 return commands 

208 

209 def extract_tool_results( 

210 self, entry: JsonlEntry | JsonlEntryProtocol 

211 ) -> list[tuple[str, bool]]: 

212 """Extract tool_result entries from an entry. 

213 

214 Args: 

215 entry: A JsonlEntry or JsonlEntryProtocol from iter_jsonl_entries. 

216 

217 Returns: 

218 List of (tool_use_id, is_error) tuples for tool_result blocks. 

219 Returns empty list if entry is not a user message. 

220 """ 

221 # Use typed entry if available 

222 if entry.entry is not None: 

223 if not isinstance(entry.entry, UserLogEntry): 

224 return [] 

225 results = [] 

226 for block in entry.entry.message.content: 

227 if isinstance(block, ToolResultBlock): 

228 results.append((block.tool_use_id, block.is_error)) 

229 return results 

230 

231 # Fallback to raw data parsing for backward compatibility 

232 return self._extract_tool_results_from_data(entry.data) 

233 

234 def _extract_tool_results_from_data( 

235 self, data: dict[str, Any] 

236 ) -> list[tuple[str, bool]]: 

237 """Extract tool results from raw entry data (fallback method). 

238 

239 Args: 

240 data: Parsed JSONL entry data. 

241 

242 Returns: 

243 List of (tool_use_id, is_error) tuples for tool_result blocks. 

244 """ 

245 if data.get("type") != "user": 

246 return [] 

247 

248 results = [] 

249 message = data.get("message", {}) 

250 for block in message.get("content", []): 

251 if isinstance(block, dict) and block.get("type") == "tool_result": 

252 tool_use_id = block.get("tool_use_id", "") 

253 is_error = block.get("is_error", False) 

254 results.append((tool_use_id, is_error)) 

255 return results 

256 

257 def extract_assistant_text_blocks( 

258 self, entry: JsonlEntry | JsonlEntryProtocol 

259 ) -> list[str]: 

260 """Extract text content from assistant message blocks. 

261 

262 Args: 

263 entry: A JsonlEntry or JsonlEntryProtocol from iter_jsonl_entries. 

264 

265 Returns: 

266 List of text strings from text blocks in assistant messages. 

267 Returns empty list if entry is not an assistant message. 

268 """ 

269 # Use typed entry if available 

270 if entry.entry is not None: 

271 if not isinstance(entry.entry, AssistantLogEntry): 

272 return [] 

273 texts = [] 

274 for block in entry.entry.message.content: 

275 if isinstance(block, TextBlock): 

276 texts.append(block.text) 

277 return texts 

278 

279 # Fallback to raw data parsing for backward compatibility 

280 return self._extract_assistant_text_blocks_from_data(entry.data) 

281 

282 def _extract_assistant_text_blocks_from_data( 

283 self, data: dict[str, Any] 

284 ) -> list[str]: 

285 """Extract text blocks from raw entry data (fallback method). 

286 

287 Args: 

288 data: Parsed JSONL entry data. 

289 

290 Returns: 

291 List of text strings from text blocks in assistant messages. 

292 """ 

293 entry_type = data.get("type", "") 

294 entry_role = data.get("message", {}).get("role", "") 

295 if entry_type != "assistant" and entry_role != "assistant": 

296 return [] 

297 

298 texts = [] 

299 message = data.get("message", {}) 

300 for block in message.get("content", []): 

301 if isinstance(block, dict) and block.get("type") == "text": 

302 texts.append(block.get("text", "")) 

303 return texts 

304 

305 

306class FileSystemLogProvider: 

307 """LogProvider implementation for Claude SDK filesystem logs. 

308 

309 Reads JSONL session logs from the Claude SDK's standard location: 

310 {claude_config_dir}/projects/{encoded-repo-path}/{session_id}.jsonl 

311 

312 This class conforms to the LogProvider protocol and wraps SessionLogParser 

313 for the actual parsing logic. 

314 

315 Example: 

316 >>> provider = FileSystemLogProvider() 

317 >>> log_path = provider.get_log_path(repo_path, session_id) 

318 >>> for entry in provider.iter_events(log_path): 

319 ... # Process entry 

320 """ 

321 

322 def __init__(self) -> None: 

323 """Initialize the FileSystemLogProvider.""" 

324 self._parser = SessionLogParser() 

325 

326 def get_log_path(self, repo_path: Path, session_id: str) -> Path: 

327 """Get path to Claude SDK's session log file. 

328 

329 Claude SDK writes session logs to: 

330 {claude_config_dir}/projects/{encoded-repo-path}/{session_id}.jsonl 

331 

332 Args: 

333 repo_path: Repository path the session was run in. 

334 session_id: Claude SDK session ID (UUID from ResultMessage). 

335 

336 Returns: 

337 Path to the JSONL log file. 

338 """ 

339 encoded = encode_repo_path(repo_path) 

340 return get_claude_config_dir() / "projects" / encoded / f"{session_id}.jsonl" 

341 

342 def iter_events( 

343 self, log_path: Path, offset: int = 0 

344 ) -> Iterator[JsonlEntryProtocol]: 

345 """Iterate over parsed JSONL entries from a log file. 

346 

347 Delegates to SessionLogParser.iter_jsonl_entries(). 

348 

349 Args: 

350 log_path: Path to the JSONL log file. 

351 offset: Byte offset to start reading from (default 0). 

352 

353 Yields: 

354 JsonlEntryProtocol objects for each successfully parsed JSON line. 

355 """ 

356 return cast( 

357 "Iterator[JsonlEntryProtocol]", 

358 self._parser.iter_jsonl_entries(log_path, offset), 

359 ) 

360 

361 def get_end_offset(self, log_path: Path, start_offset: int = 0) -> int: 

362 """Get the byte offset at the end of a log file. 

363 

364 Delegates to SessionLogParser.get_log_end_offset(). 

365 

366 Args: 

367 log_path: Path to the JSONL log file. 

368 start_offset: Byte offset to start from (default 0). 

369 

370 Returns: 

371 The byte offset at the end of the file, or start_offset if file 

372 doesn't exist or can't be read. 

373 """ 

374 return self._parser.get_log_end_offset(log_path, start_offset) 

375 

376 def extract_bash_commands(self, entry: JsonlEntryProtocol) -> list[tuple[str, str]]: 

377 """Extract Bash tool_use commands from an entry. 

378 

379 Delegates to SessionLogParser.extract_bash_commands(). 

380 

381 Args: 

382 entry: A JsonlEntryProtocol from iter_events. 

383 

384 Returns: 

385 List of (tool_id, command) tuples for Bash tool_use blocks. 

386 """ 

387 return self._parser.extract_bash_commands(entry) 

388 

389 def extract_tool_results(self, entry: JsonlEntryProtocol) -> list[tuple[str, bool]]: 

390 """Extract tool_result entries from an entry. 

391 

392 Delegates to SessionLogParser.extract_tool_results(). 

393 

394 Args: 

395 entry: A JsonlEntryProtocol from iter_events. 

396 

397 Returns: 

398 List of (tool_use_id, is_error) tuples for tool_result blocks. 

399 """ 

400 return self._parser.extract_tool_results(entry) 

401 

402 def extract_assistant_text_blocks(self, entry: JsonlEntryProtocol) -> list[str]: 

403 """Extract text content from assistant message blocks. 

404 

405 Delegates to SessionLogParser.extract_assistant_text_blocks(). 

406 

407 Args: 

408 entry: A JsonlEntryProtocol from iter_events. 

409 

410 Returns: 

411 List of text strings from text blocks in assistant messages. 

412 """ 

413 return self._parser.extract_assistant_text_blocks(entry)