Coverage for src / infra / io / session_log_parser.py: 23%
126 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Session log parser for JSONL log files from Claude Agent SDK.
3This module provides SessionLogParser, a reusable class for parsing JSONL log
4files produced by Claude Agent SDK. It extracts structured information from
5logs that can be used for:
6- Validation evidence detection (did pytest/ruff/ty run?)
7- Issue resolution marker detection (ISSUE_NO_CHANGE, ISSUE_OBSOLETE, etc.)
8- Debugging and analytics
10The parser uses typed log events from log_events.py to ensure type safety
11and contract adherence with the Claude Agent SDK schema.
13This module also provides FileSystemLogProvider, the canonical implementation
14of the LogProvider protocol that reads logs from the Claude SDK's filesystem
15storage at ~/.claude/projects/{encoded-path}/.
16"""
18from __future__ import annotations
20import json
21from dataclasses import dataclass
22from typing import TYPE_CHECKING, Any, cast
24from src.core.log_events import (
25 AssistantLogEntry,
26 TextBlock,
27 ToolResultBlock,
28 ToolUseBlock,
29 UserLogEntry,
30 parse_log_entry,
31)
32from src.infra.tools.env import encode_repo_path, get_claude_config_dir
34if TYPE_CHECKING:
35 from collections.abc import Iterator
36 from pathlib import Path
38 from src.core.log_events import LogEntry
39 from src.core.protocols import JsonlEntryProtocol
42@dataclass
43class JsonlEntry:
44 """A parsed JSONL log entry with byte offset tracking.
46 Attributes:
47 data: The parsed JSON object from this line.
48 entry: The typed LogEntry if successfully parsed, None otherwise.
49 line_len: Length of the raw line in bytes (for offset tracking).
50 offset: Byte offset where this line started in the file.
51 """
53 data: dict[str, Any]
54 entry: LogEntry | None
55 line_len: int
56 offset: int
59class SessionLogParser:
60 """Parser for JSONL session logs from Claude Agent SDK.
62 This class provides methods to iterate over and extract structured
63 information from JSONL log files. It handles:
64 - Byte-offset-aware iteration for incremental parsing
65 - Extraction of Bash commands from tool_use blocks
66 - Extraction of tool results with error status
67 - Extraction of text blocks from assistant messages
69 Example:
70 >>> parser = SessionLogParser()
71 >>> for entry in parser.iter_jsonl_entries(log_path):
72 ... commands = parser.extract_bash_commands(entry)
73 ... for tool_id, command in commands:
74 ... print(f"Command: {command}")
75 """
77 def iter_jsonl_entries(
78 self, log_path: Path, offset: int = 0
79 ) -> Iterator[JsonlEntry]:
80 """Iterate over parsed JSONL entries from a log file.
82 Reads the file in binary mode for accurate byte offset tracking,
83 decodes each line as UTF-8, parses JSON, and yields structured entries.
85 Args:
86 log_path: Path to the JSONL log file.
87 offset: Byte offset to start reading from (default 0).
89 Yields:
90 JsonlEntry objects for each successfully parsed JSON line.
91 The entry field contains the typed LogEntry if parsing succeeded.
93 Raises:
94 OSError: If file cannot be read. Callers should handle this.
96 Note:
97 - Lines that fail UTF-8 decoding are silently skipped
98 - Empty lines are silently skipped
99 - Lines that fail JSON parsing are silently skipped
100 """
101 if not log_path.exists():
102 return
104 with open(log_path, "rb") as f:
105 f.seek(offset)
106 current_offset = offset
108 for line_bytes in f:
109 line_len = len(line_bytes)
110 line_offset = current_offset
111 current_offset += line_len
113 try:
114 line = line_bytes.decode("utf-8").strip()
115 except UnicodeDecodeError:
116 continue
118 if not line:
119 continue
121 try:
122 data = json.loads(line)
123 except json.JSONDecodeError:
124 continue
126 # Parse into typed LogEntry (returns None for unrecognized entries)
127 entry = parse_log_entry(data)
129 yield JsonlEntry(
130 data=data, entry=entry, line_len=line_len, offset=line_offset
131 )
133 def get_log_end_offset(self, log_path: Path, start_offset: int = 0) -> int:
134 """Get the byte offset at the end of a log file.
136 This is a lightweight method for getting the current file position.
137 Use this when you only need the offset for retry scoping, not the
138 parsed entries themselves.
140 Args:
141 log_path: Path to the JSONL log file.
142 start_offset: Byte offset to start from (default 0).
144 Returns:
145 The byte offset at the end of the file, or start_offset if file
146 doesn't exist or can't be read.
147 """
148 if not log_path.exists():
149 return start_offset
151 try:
152 with open(log_path, "rb") as f:
153 f.seek(0, 2) # Seek to end
154 return f.tell()
155 except OSError:
156 return start_offset
158 def extract_bash_commands(
159 self, entry: JsonlEntry | JsonlEntryProtocol
160 ) -> list[tuple[str, str]]:
161 """Extract Bash tool_use commands from an entry.
163 Args:
164 entry: A JsonlEntry or JsonlEntryProtocol from iter_jsonl_entries.
166 Returns:
167 List of (tool_id, command) tuples for Bash tool_use blocks.
168 Returns empty list if entry is not an assistant message.
169 """
170 # Use typed entry if available
171 if entry.entry is not None:
172 if not isinstance(entry.entry, AssistantLogEntry):
173 return []
174 commands = []
175 for block in entry.entry.message.content:
176 if isinstance(block, ToolUseBlock) and block.name.lower() == "bash":
177 command = block.input.get("command", "")
178 commands.append((block.id, command))
179 return commands
181 # Fallback to raw data parsing for backward compatibility
182 return self._extract_bash_commands_from_data(entry.data)
184 def _extract_bash_commands_from_data(
185 self, data: dict[str, Any]
186 ) -> list[tuple[str, str]]:
187 """Extract Bash commands from raw entry data (fallback method).
189 Args:
190 data: Parsed JSONL entry data.
192 Returns:
193 List of (tool_id, command) tuples for Bash tool_use blocks.
194 """
195 if data.get("type") != "assistant":
196 return []
198 commands = []
199 message = data.get("message", {})
200 for block in message.get("content", []):
201 if isinstance(block, dict) and block.get("type") == "tool_use":
202 tool_name = block.get("name", "")
203 if tool_name.lower() == "bash":
204 tool_id = block.get("id", "")
205 command = block.get("input", {}).get("command", "")
206 commands.append((tool_id, command))
207 return commands
209 def extract_tool_results(
210 self, entry: JsonlEntry | JsonlEntryProtocol
211 ) -> list[tuple[str, bool]]:
212 """Extract tool_result entries from an entry.
214 Args:
215 entry: A JsonlEntry or JsonlEntryProtocol from iter_jsonl_entries.
217 Returns:
218 List of (tool_use_id, is_error) tuples for tool_result blocks.
219 Returns empty list if entry is not a user message.
220 """
221 # Use typed entry if available
222 if entry.entry is not None:
223 if not isinstance(entry.entry, UserLogEntry):
224 return []
225 results = []
226 for block in entry.entry.message.content:
227 if isinstance(block, ToolResultBlock):
228 results.append((block.tool_use_id, block.is_error))
229 return results
231 # Fallback to raw data parsing for backward compatibility
232 return self._extract_tool_results_from_data(entry.data)
234 def _extract_tool_results_from_data(
235 self, data: dict[str, Any]
236 ) -> list[tuple[str, bool]]:
237 """Extract tool results from raw entry data (fallback method).
239 Args:
240 data: Parsed JSONL entry data.
242 Returns:
243 List of (tool_use_id, is_error) tuples for tool_result blocks.
244 """
245 if data.get("type") != "user":
246 return []
248 results = []
249 message = data.get("message", {})
250 for block in message.get("content", []):
251 if isinstance(block, dict) and block.get("type") == "tool_result":
252 tool_use_id = block.get("tool_use_id", "")
253 is_error = block.get("is_error", False)
254 results.append((tool_use_id, is_error))
255 return results
257 def extract_assistant_text_blocks(
258 self, entry: JsonlEntry | JsonlEntryProtocol
259 ) -> list[str]:
260 """Extract text content from assistant message blocks.
262 Args:
263 entry: A JsonlEntry or JsonlEntryProtocol from iter_jsonl_entries.
265 Returns:
266 List of text strings from text blocks in assistant messages.
267 Returns empty list if entry is not an assistant message.
268 """
269 # Use typed entry if available
270 if entry.entry is not None:
271 if not isinstance(entry.entry, AssistantLogEntry):
272 return []
273 texts = []
274 for block in entry.entry.message.content:
275 if isinstance(block, TextBlock):
276 texts.append(block.text)
277 return texts
279 # Fallback to raw data parsing for backward compatibility
280 return self._extract_assistant_text_blocks_from_data(entry.data)
282 def _extract_assistant_text_blocks_from_data(
283 self, data: dict[str, Any]
284 ) -> list[str]:
285 """Extract text blocks from raw entry data (fallback method).
287 Args:
288 data: Parsed JSONL entry data.
290 Returns:
291 List of text strings from text blocks in assistant messages.
292 """
293 entry_type = data.get("type", "")
294 entry_role = data.get("message", {}).get("role", "")
295 if entry_type != "assistant" and entry_role != "assistant":
296 return []
298 texts = []
299 message = data.get("message", {})
300 for block in message.get("content", []):
301 if isinstance(block, dict) and block.get("type") == "text":
302 texts.append(block.get("text", ""))
303 return texts
306class FileSystemLogProvider:
307 """LogProvider implementation for Claude SDK filesystem logs.
309 Reads JSONL session logs from the Claude SDK's standard location:
310 {claude_config_dir}/projects/{encoded-repo-path}/{session_id}.jsonl
312 This class conforms to the LogProvider protocol and wraps SessionLogParser
313 for the actual parsing logic.
315 Example:
316 >>> provider = FileSystemLogProvider()
317 >>> log_path = provider.get_log_path(repo_path, session_id)
318 >>> for entry in provider.iter_events(log_path):
319 ... # Process entry
320 """
322 def __init__(self) -> None:
323 """Initialize the FileSystemLogProvider."""
324 self._parser = SessionLogParser()
326 def get_log_path(self, repo_path: Path, session_id: str) -> Path:
327 """Get path to Claude SDK's session log file.
329 Claude SDK writes session logs to:
330 {claude_config_dir}/projects/{encoded-repo-path}/{session_id}.jsonl
332 Args:
333 repo_path: Repository path the session was run in.
334 session_id: Claude SDK session ID (UUID from ResultMessage).
336 Returns:
337 Path to the JSONL log file.
338 """
339 encoded = encode_repo_path(repo_path)
340 return get_claude_config_dir() / "projects" / encoded / f"{session_id}.jsonl"
342 def iter_events(
343 self, log_path: Path, offset: int = 0
344 ) -> Iterator[JsonlEntryProtocol]:
345 """Iterate over parsed JSONL entries from a log file.
347 Delegates to SessionLogParser.iter_jsonl_entries().
349 Args:
350 log_path: Path to the JSONL log file.
351 offset: Byte offset to start reading from (default 0).
353 Yields:
354 JsonlEntryProtocol objects for each successfully parsed JSON line.
355 """
356 return cast(
357 "Iterator[JsonlEntryProtocol]",
358 self._parser.iter_jsonl_entries(log_path, offset),
359 )
361 def get_end_offset(self, log_path: Path, start_offset: int = 0) -> int:
362 """Get the byte offset at the end of a log file.
364 Delegates to SessionLogParser.get_log_end_offset().
366 Args:
367 log_path: Path to the JSONL log file.
368 start_offset: Byte offset to start from (default 0).
370 Returns:
371 The byte offset at the end of the file, or start_offset if file
372 doesn't exist or can't be read.
373 """
374 return self._parser.get_log_end_offset(log_path, start_offset)
376 def extract_bash_commands(self, entry: JsonlEntryProtocol) -> list[tuple[str, str]]:
377 """Extract Bash tool_use commands from an entry.
379 Delegates to SessionLogParser.extract_bash_commands().
381 Args:
382 entry: A JsonlEntryProtocol from iter_events.
384 Returns:
385 List of (tool_id, command) tuples for Bash tool_use blocks.
386 """
387 return self._parser.extract_bash_commands(entry)
389 def extract_tool_results(self, entry: JsonlEntryProtocol) -> list[tuple[str, bool]]:
390 """Extract tool_result entries from an entry.
392 Delegates to SessionLogParser.extract_tool_results().
394 Args:
395 entry: A JsonlEntryProtocol from iter_events.
397 Returns:
398 List of (tool_use_id, is_error) tuples for tool_result blocks.
399 """
400 return self._parser.extract_tool_results(entry)
402 def extract_assistant_text_blocks(self, entry: JsonlEntryProtocol) -> list[str]:
403 """Extract text content from assistant message blocks.
405 Delegates to SessionLogParser.extract_assistant_text_blocks().
407 Args:
408 entry: A JsonlEntryProtocol from iter_events.
410 Returns:
411 List of text strings from text blocks in assistant messages.
412 """
413 return self._parser.extract_assistant_text_blocks(entry)