Coverage for little_loops / user_messages.py: 16%
303 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Extract and analyze user messages from Claude Code logs.
3Provides functionality to extract user messages from Claude Code session
4logs stored in ~/.claude/projects/.
6Usage as CLI:
7 ll-messages # Last 100 messages to file
8 ll-messages -n 50 # Last 50 messages
9 ll-messages --since 2026-01-01 # Since date
10 ll-messages -o output.jsonl # Custom output path
11 ll-messages --stdout # Print to terminal instead of file
13Usage as library:
14 from little_loops.user_messages import extract_user_messages, get_project_folder
16 project_folder = get_project_folder()
17 messages = extract_user_messages(project_folder, limit=50)
18"""
20from __future__ import annotations
22import json
23from dataclasses import dataclass
24from datetime import datetime
25from pathlib import Path
27__all__ = [
28 "UserMessage",
29 "ResponseMetadata",
30 "CommandRecord",
31 "get_project_folder",
32 "extract_user_messages",
33 "extract_commands",
34 "save_messages",
35]
38@dataclass
39class UserMessage:
40 """Extracted user message with metadata.
42 Attributes:
43 content: The text content of the user message
44 timestamp: When the message was sent
45 session_id: Claude Code session identifier
46 uuid: Unique message identifier
47 cwd: Working directory when message was sent
48 git_branch: Git branch active when message was sent
49 is_sidechain: Whether this was a sidechain message
50 """
52 content: str
53 timestamp: datetime
54 session_id: str
55 uuid: str
56 cwd: str | None = None
57 git_branch: str | None = None
58 is_sidechain: bool = False
60 response_metadata: ResponseMetadata | None = None
62 def to_dict(self) -> dict[str, object]:
63 """Convert to dictionary for JSON serialization."""
64 result: dict[str, object] = {
65 "content": self.content,
66 "timestamp": self.timestamp.isoformat(),
67 "session_id": self.session_id,
68 "uuid": self.uuid,
69 "cwd": self.cwd,
70 "git_branch": self.git_branch,
71 "is_sidechain": self.is_sidechain,
72 }
73 if self.response_metadata is not None:
74 result["response_metadata"] = self.response_metadata.to_dict()
75 return result
78@dataclass
79class ResponseMetadata:
80 """Metadata extracted from assistant response.
82 Attributes:
83 tools_used: List of tools and their usage counts
84 files_read: Files accessed via Read tool
85 files_modified: Files changed via Edit/Write tools
86 completion_status: "success", "failure", or "partial"
87 error_message: Error text if failure detected
88 """
90 tools_used: list[dict[str, str | int]]
91 files_read: list[str]
92 files_modified: list[str]
93 completion_status: str
94 error_message: str | None = None
96 def to_dict(self) -> dict[str, object]:
97 """Convert to dictionary for JSON serialization."""
98 return {
99 "tools_used": self.tools_used,
100 "files_read": self.files_read,
101 "files_modified": self.files_modified,
102 "completion_status": self.completion_status,
103 "error_message": self.error_message,
104 }
107@dataclass
108class CommandRecord:
109 """Extracted CLI command from assistant tool_use.
111 Attributes:
112 content: The command string that was executed
113 timestamp: When the command was issued
114 session_id: Claude Code session identifier
115 uuid: Unique record identifier
116 tool: Tool name (e.g., "Bash")
117 cwd: Working directory when command was issued
118 git_branch: Git branch active when command was issued
119 """
121 content: str
122 timestamp: datetime
123 session_id: str
124 uuid: str
125 tool: str
126 cwd: str | None = None
127 git_branch: str | None = None
129 def to_dict(self) -> dict[str, object]:
130 """Convert to dictionary for JSON serialization."""
131 return {
132 "type": "command",
133 "content": self.content,
134 "timestamp": self.timestamp.isoformat(),
135 "session_id": self.session_id,
136 "uuid": self.uuid,
137 "tool": self.tool,
138 "cwd": self.cwd,
139 "git_branch": self.git_branch,
140 }
143def _extract_response_metadata(response_record: dict) -> ResponseMetadata | None:
144 """Extract metadata from an assistant response record.
146 Args:
147 response_record: The assistant record from JSONL
149 Returns:
150 ResponseMetadata if parseable, None otherwise
151 """
152 message_data = response_record.get("message", {})
153 content = message_data.get("content", [])
155 if not isinstance(content, list):
156 return None
158 tools_used: dict[str, int] = {}
159 files_read: list[str] = []
160 files_modified: list[str] = []
162 for block in content:
163 if not isinstance(block, dict):
164 continue
165 if block.get("type") != "tool_use":
166 continue
168 tool_name = block.get("name", "")
169 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1
171 tool_input = block.get("input", {})
172 if tool_name == "Read":
173 file_path = tool_input.get("file_path")
174 if file_path:
175 files_read.append(file_path)
176 elif tool_name in ("Edit", "Write"):
177 file_path = tool_input.get("file_path")
178 if file_path:
179 files_modified.append(file_path)
181 # Detect completion status from text content
182 completion_status = _detect_completion_status(content)
183 error_message = _detect_error_message(content) if completion_status == "failure" else None
185 # Convert tools_used dict to list format
186 tools_list: list[dict[str, str | int]] = [
187 {"tool": name, "count": count} for name, count in tools_used.items()
188 ]
190 return ResponseMetadata(
191 tools_used=tools_list,
192 files_read=files_read,
193 files_modified=files_modified,
194 completion_status=completion_status,
195 error_message=error_message,
196 )
199def _aggregate_response_metadata(responses: list[dict]) -> ResponseMetadata | None:
200 """Aggregate metadata from multiple assistant response records.
202 Combines tool counts, file lists, and uses completion status from final response.
204 Args:
205 responses: List of assistant records from JSONL
207 Returns:
208 Aggregated ResponseMetadata, or None if no valid responses
209 """
210 if not responses:
211 return None
213 tools_used: dict[str, int] = {}
214 files_read: set[str] = set()
215 files_modified: set[str] = set()
216 completion_status = "success"
217 error_message: str | None = None
219 for response_record in responses:
220 message_data = response_record.get("message", {})
221 content = message_data.get("content", [])
223 if not isinstance(content, list):
224 continue
226 for block in content:
227 if not isinstance(block, dict):
228 continue
229 if block.get("type") != "tool_use":
230 continue
232 tool_name = block.get("name", "")
233 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1
235 tool_input = block.get("input", {})
236 if tool_name == "Read":
237 file_path = tool_input.get("file_path")
238 if file_path:
239 files_read.add(file_path)
240 elif tool_name in ("Edit", "Write"):
241 file_path = tool_input.get("file_path")
242 if file_path:
243 files_modified.add(file_path)
245 # Use completion status from the final response
246 final_content = responses[-1].get("message", {}).get("content", [])
247 if isinstance(final_content, list):
248 completion_status = _detect_completion_status(final_content)
249 if completion_status == "failure":
250 error_message = _detect_error_message(final_content)
252 # Convert to output format
253 tools_list: list[dict[str, str | int]] = [
254 {"tool": name, "count": count} for name, count in tools_used.items()
255 ]
257 return ResponseMetadata(
258 tools_used=tools_list,
259 files_read=sorted(files_read),
260 files_modified=sorted(files_modified),
261 completion_status=completion_status,
262 error_message=error_message,
263 )
266def _detect_completion_status(content: list) -> str:
267 """Detect completion status from response content.
269 Args:
270 content: List of content blocks from assistant response
272 Returns:
273 "success", "failure", or "partial"
274 """
275 text_parts = []
276 for block in content:
277 if isinstance(block, dict) and block.get("type") == "text":
278 text_parts.append(block.get("text", ""))
280 text = " ".join(text_parts).lower()
282 # Check for error indicators
283 error_patterns = ["error", "failed", "couldn't", "unable to", "cannot"]
284 if any(pattern in text for pattern in error_patterns):
285 return "failure"
287 # Check for partial completion
288 partial_patterns = ["partially", "some of", "not all", "incomplete"]
289 if any(pattern in text for pattern in partial_patterns):
290 return "partial"
292 return "success"
295def _detect_error_message(content: list) -> str | None:
296 """Extract error message from response content.
298 Args:
299 content: List of content blocks from assistant response
301 Returns:
302 Error message if found, None otherwise
303 """
304 for block in content:
305 if isinstance(block, dict) and block.get("type") == "text":
306 text = block.get("text", "")
307 # Look for common error message patterns
308 lower_text = text.lower()
309 if "error:" in lower_text or "failed:" in lower_text:
310 # Extract the line containing the error
311 for line in text.split("\n"):
312 if "error" in line.lower() or "failed" in line.lower():
313 result = line.strip()[:200] # Limit length
314 return result if isinstance(result, str) else None
315 return None
318def get_project_folder(cwd: Path | None = None) -> Path | None:
319 """Map current directory to Claude Code project folder.
321 Converts: /home/user/foo/bar -> ~/.claude/projects/-home-user-foo-bar
323 Args:
324 cwd: Working directory to map. If None, uses current directory.
326 Returns:
327 Path to Claude project folder, or None if it doesn't exist.
328 """
329 if cwd is None:
330 cwd = Path.cwd()
332 # Convert path to dash-separated format
333 # /home/user/foo/bar -> -home-user-foo-bar
334 path_str = str(cwd.resolve())
335 encoded_path = path_str.replace("/", "-")
337 # Build project folder path
338 claude_projects = Path.home() / ".claude" / "projects"
339 project_folder = claude_projects / encoded_path
341 if project_folder.exists():
342 return project_folder
344 return None
347def extract_user_messages(
348 project_folder: Path,
349 limit: int | None = None,
350 since: datetime | None = None,
351 include_agent_sessions: bool = True,
352 include_response_context: bool = False,
353) -> list[UserMessage]:
354 """Extract user messages from all JSONL session files.
356 Filters:
357 - type == "user"
358 - message.content is string (real user input)
359 - message.content is array but [0].type != "tool_result"
361 Args:
362 project_folder: Path to Claude project folder
363 limit: Maximum number of messages to return
364 since: Only include messages after this datetime
365 include_agent_sessions: Whether to include agent-*.jsonl files
366 include_response_context: Whether to include metadata from assistant responses
368 Returns:
369 Messages sorted by timestamp, most recent first.
370 """
371 messages: list[UserMessage] = []
373 # Find all JSONL files
374 pattern = "*.jsonl"
375 jsonl_files = list(project_folder.glob(pattern))
377 for jsonl_file in jsonl_files:
378 # Skip agent sessions if requested
379 if not include_agent_sessions and jsonl_file.name.startswith("agent-"):
380 continue
382 try:
383 # If we need response context, read all records first to pair user/assistant
384 if include_response_context:
385 all_records: list[dict] = []
386 with open(jsonl_file, encoding="utf-8") as f:
387 for line in f:
388 line = line.strip()
389 if not line:
390 continue
391 try:
392 record = json.loads(line)
393 all_records.append(record)
394 except json.JSONDecodeError:
395 continue
397 # Process records, pairing user messages with their responses
398 messages.extend(_extract_messages_with_context(all_records, jsonl_file, since))
399 else:
400 # Original behavior: stream through file
401 with open(jsonl_file, encoding="utf-8") as f:
402 for line in f:
403 line = line.strip()
404 if not line:
405 continue
407 try:
408 record = json.loads(line)
409 except json.JSONDecodeError:
410 continue
412 msg = _parse_user_record(record, jsonl_file, since)
413 if msg is not None:
414 messages.append(msg)
416 except OSError:
417 # Skip files that can't be read
418 continue
420 # Sort by timestamp, most recent first
421 messages.sort(key=lambda m: m.timestamp, reverse=True)
423 # Apply limit
424 if limit is not None:
425 messages = messages[:limit]
427 return messages
430def extract_commands(
431 project_folder: Path,
432 limit: int | None = None,
433 since: datetime | None = None,
434 include_agent_sessions: bool = True,
435 tools: list[str] | None = None,
436) -> list[CommandRecord]:
437 """Extract CLI commands from assistant tool_use messages.
439 Parses assistant messages for tool_use blocks and extracts command strings.
441 Args:
442 project_folder: Path to Claude project folder
443 limit: Maximum number of commands to return
444 since: Only include commands after this datetime
445 include_agent_sessions: Whether to include agent-*.jsonl files
446 tools: Filter to specific tools (default: ["Bash"])
448 Returns:
449 Commands sorted by timestamp, most recent first.
450 """
451 if tools is None:
452 tools = ["Bash"]
454 commands: list[CommandRecord] = []
456 # Find all JSONL files
457 pattern = "*.jsonl"
458 jsonl_files = list(project_folder.glob(pattern))
460 for jsonl_file in jsonl_files:
461 # Skip agent sessions if requested
462 if not include_agent_sessions and jsonl_file.name.startswith("agent-"):
463 continue
465 try:
466 with open(jsonl_file, encoding="utf-8") as f:
467 for line in f:
468 line = line.strip()
469 if not line:
470 continue
472 try:
473 record = json.loads(line)
474 except json.JSONDecodeError:
475 continue
477 cmds = _parse_command_record(record, jsonl_file, since, tools)
478 commands.extend(cmds)
480 except OSError:
481 # Skip files that can't be read
482 continue
484 # Sort by timestamp, most recent first
485 commands.sort(key=lambda c: c.timestamp, reverse=True)
487 # Apply limit
488 if limit is not None:
489 commands = commands[:limit]
491 return commands
494def _parse_command_record(
495 record: dict,
496 jsonl_file: Path,
497 since: datetime | None,
498 tools: list[str],
499) -> list[CommandRecord]:
500 """Parse CLI commands from an assistant record.
502 Args:
503 record: The JSON record from JSONL
504 jsonl_file: Source file (for fallback timestamp)
505 since: Filter for commands after this datetime
506 tools: Tool names to extract (e.g., ["Bash"])
508 Returns:
509 List of CommandRecord for each matching tool_use block
510 """
511 # Filter for assistant messages only
512 if record.get("type") != "assistant":
513 return []
515 message_data = record.get("message", {})
516 content = message_data.get("content", [])
518 if not isinstance(content, list):
519 return []
521 # Parse timestamp
522 timestamp_str = record.get("timestamp", "")
523 try:
524 timestamp_str = timestamp_str.replace("Z", "+00:00")
525 timestamp = datetime.fromisoformat(timestamp_str)
526 if timestamp.tzinfo is not None:
527 timestamp = timestamp.replace(tzinfo=None)
528 except (ValueError, AttributeError):
529 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime)
531 # Apply since filter
532 if since and timestamp < since:
533 return []
535 commands: list[CommandRecord] = []
537 for block in content:
538 if not isinstance(block, dict):
539 continue
540 if block.get("type") != "tool_use":
541 continue
543 tool_name = block.get("name", "")
544 if tool_name not in tools:
545 continue
547 tool_input = block.get("input", {})
548 command_str = tool_input.get("command", "")
549 if not command_str:
550 continue
552 commands.append(
553 CommandRecord(
554 content=command_str,
555 timestamp=timestamp,
556 session_id=record.get("sessionId", ""),
557 uuid=record.get("uuid", ""),
558 tool=tool_name,
559 cwd=record.get("cwd"),
560 git_branch=record.get("gitBranch"),
561 )
562 )
564 return commands
567def _parse_user_record(
568 record: dict,
569 jsonl_file: Path,
570 since: datetime | None,
571) -> UserMessage | None:
572 """Parse a single user record into a UserMessage.
574 Args:
575 record: The JSON record from JSONL
576 jsonl_file: Source file (for fallback timestamp)
577 since: Filter for messages after this datetime
579 Returns:
580 UserMessage if valid user message, None otherwise
581 """
582 # Filter for user messages only
583 if record.get("type") != "user":
584 return None
586 message_data = record.get("message", {})
587 content = message_data.get("content")
589 # Skip if no content
590 if content is None:
591 return None
593 # Check if this is a real user message or tool_result
594 if isinstance(content, str):
595 # String content = real user message
596 message_content = content
597 elif isinstance(content, list):
598 # Array content - check first element
599 if len(content) > 0 and content[0].get("type") == "tool_result":
600 # This is a tool result, skip it
601 return None
602 # Extract text from array (could be text blocks)
603 text_parts = []
604 for block in content:
605 if isinstance(block, dict):
606 if block.get("type") == "text":
607 text_parts.append(block.get("text", ""))
608 elif "content" in block:
609 text_parts.append(str(block.get("content", "")))
610 message_content = "\n".join(text_parts) if text_parts else str(content)
611 else:
612 return None
614 # Parse timestamp
615 timestamp_str = record.get("timestamp", "")
616 try:
617 # Handle ISO 8601 format with Z suffix
618 timestamp_str = timestamp_str.replace("Z", "+00:00")
619 timestamp = datetime.fromisoformat(timestamp_str)
620 # Convert to naive datetime for consistent comparison
621 if timestamp.tzinfo is not None:
622 timestamp = timestamp.replace(tzinfo=None)
623 except (ValueError, AttributeError):
624 # Use file modification time as fallback
625 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime)
627 # Apply since filter
628 if since and timestamp < since:
629 return None
631 # Create message object
632 return UserMessage(
633 content=message_content,
634 timestamp=timestamp,
635 session_id=record.get("sessionId", ""),
636 uuid=record.get("uuid", ""),
637 cwd=record.get("cwd"),
638 git_branch=record.get("gitBranch"),
639 is_sidechain=record.get("isSidechain", False),
640 )
643def _extract_messages_with_context(
644 records: list[dict],
645 jsonl_file: Path,
646 since: datetime | None,
647) -> list[UserMessage]:
648 """Extract user messages with response context from a list of records.
650 Pairs each user message with ALL following assistant responses until the
651 next user message, aggregating tool usage and file changes.
653 Args:
654 records: List of all records from a JSONL file
655 jsonl_file: Source file (for fallback timestamp)
656 since: Filter for messages after this datetime
658 Returns:
659 List of UserMessages with response_metadata populated
660 """
661 messages: list[UserMessage] = []
663 i = 0
664 while i < len(records):
665 record = records[i]
666 msg = _parse_user_record(record, jsonl_file, since)
668 if msg is not None:
669 # Collect ALL assistant responses until next user message
670 assistant_responses: list[dict] = []
671 for j in range(i + 1, len(records)):
672 next_record = records[j]
673 if next_record.get("type") == "assistant":
674 assistant_responses.append(next_record)
675 elif next_record.get("type") == "user":
676 # Hit another user message, stop collecting
677 break
679 msg.response_metadata = _aggregate_response_metadata(assistant_responses)
680 messages.append(msg)
682 i += 1
684 return messages
687def save_messages(
688 messages: list[UserMessage],
689 output_path: Path | None = None,
690) -> Path:
691 """Save messages to timestamped JSONL file.
693 Args:
694 messages: List of UserMessage objects to save
695 output_path: Output file path. If None, uses default location.
697 Returns:
698 Path to the saved file.
699 """
700 if output_path is None:
701 # Default: ./.claude/user-messages-{timestamp}.jsonl
702 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
703 output_dir = Path.cwd() / ".claude"
704 output_dir.mkdir(parents=True, exist_ok=True)
705 output_path = output_dir / f"user-messages-{timestamp}.jsonl"
707 output_path = Path(output_path)
708 output_path.parent.mkdir(parents=True, exist_ok=True)
710 with open(output_path, "w", encoding="utf-8") as f:
711 for msg in messages:
712 f.write(json.dumps(msg.to_dict()) + "\n")
714 return output_path
717def print_messages_to_stdout(messages: list[UserMessage]) -> None:
718 """Print messages to stdout in JSONL format.
720 Args:
721 messages: List of UserMessage objects to print
722 """
723 import sys
725 for msg in messages:
726 print(json.dumps(msg.to_dict()), file=sys.stdout)