Coverage for little_loops / user_messages.py: 16%

303 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Extract and analyze user messages from Claude Code logs. 

2 

3Provides functionality to extract user messages from Claude Code session 

4logs stored in ~/.claude/projects/. 

5 

6Usage as CLI: 

7 ll-messages # Last 100 messages to file 

8 ll-messages -n 50 # Last 50 messages 

9 ll-messages --since 2026-01-01 # Since date 

10 ll-messages -o output.jsonl # Custom output path 

11 ll-messages --stdout # Print to terminal instead of file 

12 

13Usage as library: 

14 from little_loops.user_messages import extract_user_messages, get_project_folder 

15 

16 project_folder = get_project_folder() 

17 messages = extract_user_messages(project_folder, limit=50) 

18""" 

19 

20from __future__ import annotations 

21 

22import json 

23from dataclasses import dataclass 

24from datetime import datetime 

25from pathlib import Path 

26 

27__all__ = [ 

28 "UserMessage", 

29 "ResponseMetadata", 

30 "CommandRecord", 

31 "get_project_folder", 

32 "extract_user_messages", 

33 "extract_commands", 

34 "save_messages", 

35] 

36 

37 

38@dataclass 

39class UserMessage: 

40 """Extracted user message with metadata. 

41 

42 Attributes: 

43 content: The text content of the user message 

44 timestamp: When the message was sent 

45 session_id: Claude Code session identifier 

46 uuid: Unique message identifier 

47 cwd: Working directory when message was sent 

48 git_branch: Git branch active when message was sent 

49 is_sidechain: Whether this was a sidechain message 

50 """ 

51 

52 content: str 

53 timestamp: datetime 

54 session_id: str 

55 uuid: str 

56 cwd: str | None = None 

57 git_branch: str | None = None 

58 is_sidechain: bool = False 

59 

60 response_metadata: ResponseMetadata | None = None 

61 

62 def to_dict(self) -> dict[str, object]: 

63 """Convert to dictionary for JSON serialization.""" 

64 result: dict[str, object] = { 

65 "content": self.content, 

66 "timestamp": self.timestamp.isoformat(), 

67 "session_id": self.session_id, 

68 "uuid": self.uuid, 

69 "cwd": self.cwd, 

70 "git_branch": self.git_branch, 

71 "is_sidechain": self.is_sidechain, 

72 } 

73 if self.response_metadata is not None: 

74 result["response_metadata"] = self.response_metadata.to_dict() 

75 return result 

76 

77 

78@dataclass 

79class ResponseMetadata: 

80 """Metadata extracted from assistant response. 

81 

82 Attributes: 

83 tools_used: List of tools and their usage counts 

84 files_read: Files accessed via Read tool 

85 files_modified: Files changed via Edit/Write tools 

86 completion_status: "success", "failure", or "partial" 

87 error_message: Error text if failure detected 

88 """ 

89 

90 tools_used: list[dict[str, str | int]] 

91 files_read: list[str] 

92 files_modified: list[str] 

93 completion_status: str 

94 error_message: str | None = None 

95 

96 def to_dict(self) -> dict[str, object]: 

97 """Convert to dictionary for JSON serialization.""" 

98 return { 

99 "tools_used": self.tools_used, 

100 "files_read": self.files_read, 

101 "files_modified": self.files_modified, 

102 "completion_status": self.completion_status, 

103 "error_message": self.error_message, 

104 } 

105 

106 

107@dataclass 

108class CommandRecord: 

109 """Extracted CLI command from assistant tool_use. 

110 

111 Attributes: 

112 content: The command string that was executed 

113 timestamp: When the command was issued 

114 session_id: Claude Code session identifier 

115 uuid: Unique record identifier 

116 tool: Tool name (e.g., "Bash") 

117 cwd: Working directory when command was issued 

118 git_branch: Git branch active when command was issued 

119 """ 

120 

121 content: str 

122 timestamp: datetime 

123 session_id: str 

124 uuid: str 

125 tool: str 

126 cwd: str | None = None 

127 git_branch: str | None = None 

128 

129 def to_dict(self) -> dict[str, object]: 

130 """Convert to dictionary for JSON serialization.""" 

131 return { 

132 "type": "command", 

133 "content": self.content, 

134 "timestamp": self.timestamp.isoformat(), 

135 "session_id": self.session_id, 

136 "uuid": self.uuid, 

137 "tool": self.tool, 

138 "cwd": self.cwd, 

139 "git_branch": self.git_branch, 

140 } 

141 

142 

143def _extract_response_metadata(response_record: dict) -> ResponseMetadata | None: 

144 """Extract metadata from an assistant response record. 

145 

146 Args: 

147 response_record: The assistant record from JSONL 

148 

149 Returns: 

150 ResponseMetadata if parseable, None otherwise 

151 """ 

152 message_data = response_record.get("message", {}) 

153 content = message_data.get("content", []) 

154 

155 if not isinstance(content, list): 

156 return None 

157 

158 tools_used: dict[str, int] = {} 

159 files_read: list[str] = [] 

160 files_modified: list[str] = [] 

161 

162 for block in content: 

163 if not isinstance(block, dict): 

164 continue 

165 if block.get("type") != "tool_use": 

166 continue 

167 

168 tool_name = block.get("name", "") 

169 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1 

170 

171 tool_input = block.get("input", {}) 

172 if tool_name == "Read": 

173 file_path = tool_input.get("file_path") 

174 if file_path: 

175 files_read.append(file_path) 

176 elif tool_name in ("Edit", "Write"): 

177 file_path = tool_input.get("file_path") 

178 if file_path: 

179 files_modified.append(file_path) 

180 

181 # Detect completion status from text content 

182 completion_status = _detect_completion_status(content) 

183 error_message = _detect_error_message(content) if completion_status == "failure" else None 

184 

185 # Convert tools_used dict to list format 

186 tools_list: list[dict[str, str | int]] = [ 

187 {"tool": name, "count": count} for name, count in tools_used.items() 

188 ] 

189 

190 return ResponseMetadata( 

191 tools_used=tools_list, 

192 files_read=files_read, 

193 files_modified=files_modified, 

194 completion_status=completion_status, 

195 error_message=error_message, 

196 ) 

197 

198 

199def _aggregate_response_metadata(responses: list[dict]) -> ResponseMetadata | None: 

200 """Aggregate metadata from multiple assistant response records. 

201 

202 Combines tool counts, file lists, and uses completion status from final response. 

203 

204 Args: 

205 responses: List of assistant records from JSONL 

206 

207 Returns: 

208 Aggregated ResponseMetadata, or None if no valid responses 

209 """ 

210 if not responses: 

211 return None 

212 

213 tools_used: dict[str, int] = {} 

214 files_read: set[str] = set() 

215 files_modified: set[str] = set() 

216 completion_status = "success" 

217 error_message: str | None = None 

218 

219 for response_record in responses: 

220 message_data = response_record.get("message", {}) 

221 content = message_data.get("content", []) 

222 

223 if not isinstance(content, list): 

224 continue 

225 

226 for block in content: 

227 if not isinstance(block, dict): 

228 continue 

229 if block.get("type") != "tool_use": 

230 continue 

231 

232 tool_name = block.get("name", "") 

233 tools_used[tool_name] = tools_used.get(tool_name, 0) + 1 

234 

235 tool_input = block.get("input", {}) 

236 if tool_name == "Read": 

237 file_path = tool_input.get("file_path") 

238 if file_path: 

239 files_read.add(file_path) 

240 elif tool_name in ("Edit", "Write"): 

241 file_path = tool_input.get("file_path") 

242 if file_path: 

243 files_modified.add(file_path) 

244 

245 # Use completion status from the final response 

246 final_content = responses[-1].get("message", {}).get("content", []) 

247 if isinstance(final_content, list): 

248 completion_status = _detect_completion_status(final_content) 

249 if completion_status == "failure": 

250 error_message = _detect_error_message(final_content) 

251 

252 # Convert to output format 

253 tools_list: list[dict[str, str | int]] = [ 

254 {"tool": name, "count": count} for name, count in tools_used.items() 

255 ] 

256 

257 return ResponseMetadata( 

258 tools_used=tools_list, 

259 files_read=sorted(files_read), 

260 files_modified=sorted(files_modified), 

261 completion_status=completion_status, 

262 error_message=error_message, 

263 ) 

264 

265 

266def _detect_completion_status(content: list) -> str: 

267 """Detect completion status from response content. 

268 

269 Args: 

270 content: List of content blocks from assistant response 

271 

272 Returns: 

273 "success", "failure", or "partial" 

274 """ 

275 text_parts = [] 

276 for block in content: 

277 if isinstance(block, dict) and block.get("type") == "text": 

278 text_parts.append(block.get("text", "")) 

279 

280 text = " ".join(text_parts).lower() 

281 

282 # Check for error indicators 

283 error_patterns = ["error", "failed", "couldn't", "unable to", "cannot"] 

284 if any(pattern in text for pattern in error_patterns): 

285 return "failure" 

286 

287 # Check for partial completion 

288 partial_patterns = ["partially", "some of", "not all", "incomplete"] 

289 if any(pattern in text for pattern in partial_patterns): 

290 return "partial" 

291 

292 return "success" 

293 

294 

295def _detect_error_message(content: list) -> str | None: 

296 """Extract error message from response content. 

297 

298 Args: 

299 content: List of content blocks from assistant response 

300 

301 Returns: 

302 Error message if found, None otherwise 

303 """ 

304 for block in content: 

305 if isinstance(block, dict) and block.get("type") == "text": 

306 text = block.get("text", "") 

307 # Look for common error message patterns 

308 lower_text = text.lower() 

309 if "error:" in lower_text or "failed:" in lower_text: 

310 # Extract the line containing the error 

311 for line in text.split("\n"): 

312 if "error" in line.lower() or "failed" in line.lower(): 

313 result = line.strip()[:200] # Limit length 

314 return result if isinstance(result, str) else None 

315 return None 

316 

317 

318def get_project_folder(cwd: Path | None = None) -> Path | None: 

319 """Map current directory to Claude Code project folder. 

320 

321 Converts: /home/user/foo/bar -> ~/.claude/projects/-home-user-foo-bar 

322 

323 Args: 

324 cwd: Working directory to map. If None, uses current directory. 

325 

326 Returns: 

327 Path to Claude project folder, or None if it doesn't exist. 

328 """ 

329 if cwd is None: 

330 cwd = Path.cwd() 

331 

332 # Convert path to dash-separated format 

333 # /home/user/foo/bar -> -home-user-foo-bar 

334 path_str = str(cwd.resolve()) 

335 encoded_path = path_str.replace("/", "-") 

336 

337 # Build project folder path 

338 claude_projects = Path.home() / ".claude" / "projects" 

339 project_folder = claude_projects / encoded_path 

340 

341 if project_folder.exists(): 

342 return project_folder 

343 

344 return None 

345 

346 

347def extract_user_messages( 

348 project_folder: Path, 

349 limit: int | None = None, 

350 since: datetime | None = None, 

351 include_agent_sessions: bool = True, 

352 include_response_context: bool = False, 

353) -> list[UserMessage]: 

354 """Extract user messages from all JSONL session files. 

355 

356 Filters: 

357 - type == "user" 

358 - message.content is string (real user input) 

359 - message.content is array but [0].type != "tool_result" 

360 

361 Args: 

362 project_folder: Path to Claude project folder 

363 limit: Maximum number of messages to return 

364 since: Only include messages after this datetime 

365 include_agent_sessions: Whether to include agent-*.jsonl files 

366 include_response_context: Whether to include metadata from assistant responses 

367 

368 Returns: 

369 Messages sorted by timestamp, most recent first. 

370 """ 

371 messages: list[UserMessage] = [] 

372 

373 # Find all JSONL files 

374 pattern = "*.jsonl" 

375 jsonl_files = list(project_folder.glob(pattern)) 

376 

377 for jsonl_file in jsonl_files: 

378 # Skip agent sessions if requested 

379 if not include_agent_sessions and jsonl_file.name.startswith("agent-"): 

380 continue 

381 

382 try: 

383 # If we need response context, read all records first to pair user/assistant 

384 if include_response_context: 

385 all_records: list[dict] = [] 

386 with open(jsonl_file, encoding="utf-8") as f: 

387 for line in f: 

388 line = line.strip() 

389 if not line: 

390 continue 

391 try: 

392 record = json.loads(line) 

393 all_records.append(record) 

394 except json.JSONDecodeError: 

395 continue 

396 

397 # Process records, pairing user messages with their responses 

398 messages.extend(_extract_messages_with_context(all_records, jsonl_file, since)) 

399 else: 

400 # Original behavior: stream through file 

401 with open(jsonl_file, encoding="utf-8") as f: 

402 for line in f: 

403 line = line.strip() 

404 if not line: 

405 continue 

406 

407 try: 

408 record = json.loads(line) 

409 except json.JSONDecodeError: 

410 continue 

411 

412 msg = _parse_user_record(record, jsonl_file, since) 

413 if msg is not None: 

414 messages.append(msg) 

415 

416 except OSError: 

417 # Skip files that can't be read 

418 continue 

419 

420 # Sort by timestamp, most recent first 

421 messages.sort(key=lambda m: m.timestamp, reverse=True) 

422 

423 # Apply limit 

424 if limit is not None: 

425 messages = messages[:limit] 

426 

427 return messages 

428 

429 

430def extract_commands( 

431 project_folder: Path, 

432 limit: int | None = None, 

433 since: datetime | None = None, 

434 include_agent_sessions: bool = True, 

435 tools: list[str] | None = None, 

436) -> list[CommandRecord]: 

437 """Extract CLI commands from assistant tool_use messages. 

438 

439 Parses assistant messages for tool_use blocks and extracts command strings. 

440 

441 Args: 

442 project_folder: Path to Claude project folder 

443 limit: Maximum number of commands to return 

444 since: Only include commands after this datetime 

445 include_agent_sessions: Whether to include agent-*.jsonl files 

446 tools: Filter to specific tools (default: ["Bash"]) 

447 

448 Returns: 

449 Commands sorted by timestamp, most recent first. 

450 """ 

451 if tools is None: 

452 tools = ["Bash"] 

453 

454 commands: list[CommandRecord] = [] 

455 

456 # Find all JSONL files 

457 pattern = "*.jsonl" 

458 jsonl_files = list(project_folder.glob(pattern)) 

459 

460 for jsonl_file in jsonl_files: 

461 # Skip agent sessions if requested 

462 if not include_agent_sessions and jsonl_file.name.startswith("agent-"): 

463 continue 

464 

465 try: 

466 with open(jsonl_file, encoding="utf-8") as f: 

467 for line in f: 

468 line = line.strip() 

469 if not line: 

470 continue 

471 

472 try: 

473 record = json.loads(line) 

474 except json.JSONDecodeError: 

475 continue 

476 

477 cmds = _parse_command_record(record, jsonl_file, since, tools) 

478 commands.extend(cmds) 

479 

480 except OSError: 

481 # Skip files that can't be read 

482 continue 

483 

484 # Sort by timestamp, most recent first 

485 commands.sort(key=lambda c: c.timestamp, reverse=True) 

486 

487 # Apply limit 

488 if limit is not None: 

489 commands = commands[:limit] 

490 

491 return commands 

492 

493 

494def _parse_command_record( 

495 record: dict, 

496 jsonl_file: Path, 

497 since: datetime | None, 

498 tools: list[str], 

499) -> list[CommandRecord]: 

500 """Parse CLI commands from an assistant record. 

501 

502 Args: 

503 record: The JSON record from JSONL 

504 jsonl_file: Source file (for fallback timestamp) 

505 since: Filter for commands after this datetime 

506 tools: Tool names to extract (e.g., ["Bash"]) 

507 

508 Returns: 

509 List of CommandRecord for each matching tool_use block 

510 """ 

511 # Filter for assistant messages only 

512 if record.get("type") != "assistant": 

513 return [] 

514 

515 message_data = record.get("message", {}) 

516 content = message_data.get("content", []) 

517 

518 if not isinstance(content, list): 

519 return [] 

520 

521 # Parse timestamp 

522 timestamp_str = record.get("timestamp", "") 

523 try: 

524 timestamp_str = timestamp_str.replace("Z", "+00:00") 

525 timestamp = datetime.fromisoformat(timestamp_str) 

526 if timestamp.tzinfo is not None: 

527 timestamp = timestamp.replace(tzinfo=None) 

528 except (ValueError, AttributeError): 

529 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime) 

530 

531 # Apply since filter 

532 if since and timestamp < since: 

533 return [] 

534 

535 commands: list[CommandRecord] = [] 

536 

537 for block in content: 

538 if not isinstance(block, dict): 

539 continue 

540 if block.get("type") != "tool_use": 

541 continue 

542 

543 tool_name = block.get("name", "") 

544 if tool_name not in tools: 

545 continue 

546 

547 tool_input = block.get("input", {}) 

548 command_str = tool_input.get("command", "") 

549 if not command_str: 

550 continue 

551 

552 commands.append( 

553 CommandRecord( 

554 content=command_str, 

555 timestamp=timestamp, 

556 session_id=record.get("sessionId", ""), 

557 uuid=record.get("uuid", ""), 

558 tool=tool_name, 

559 cwd=record.get("cwd"), 

560 git_branch=record.get("gitBranch"), 

561 ) 

562 ) 

563 

564 return commands 

565 

566 

567def _parse_user_record( 

568 record: dict, 

569 jsonl_file: Path, 

570 since: datetime | None, 

571) -> UserMessage | None: 

572 """Parse a single user record into a UserMessage. 

573 

574 Args: 

575 record: The JSON record from JSONL 

576 jsonl_file: Source file (for fallback timestamp) 

577 since: Filter for messages after this datetime 

578 

579 Returns: 

580 UserMessage if valid user message, None otherwise 

581 """ 

582 # Filter for user messages only 

583 if record.get("type") != "user": 

584 return None 

585 

586 message_data = record.get("message", {}) 

587 content = message_data.get("content") 

588 

589 # Skip if no content 

590 if content is None: 

591 return None 

592 

593 # Check if this is a real user message or tool_result 

594 if isinstance(content, str): 

595 # String content = real user message 

596 message_content = content 

597 elif isinstance(content, list): 

598 # Array content - check first element 

599 if len(content) > 0 and content[0].get("type") == "tool_result": 

600 # This is a tool result, skip it 

601 return None 

602 # Extract text from array (could be text blocks) 

603 text_parts = [] 

604 for block in content: 

605 if isinstance(block, dict): 

606 if block.get("type") == "text": 

607 text_parts.append(block.get("text", "")) 

608 elif "content" in block: 

609 text_parts.append(str(block.get("content", ""))) 

610 message_content = "\n".join(text_parts) if text_parts else str(content) 

611 else: 

612 return None 

613 

614 # Parse timestamp 

615 timestamp_str = record.get("timestamp", "") 

616 try: 

617 # Handle ISO 8601 format with Z suffix 

618 timestamp_str = timestamp_str.replace("Z", "+00:00") 

619 timestamp = datetime.fromisoformat(timestamp_str) 

620 # Convert to naive datetime for consistent comparison 

621 if timestamp.tzinfo is not None: 

622 timestamp = timestamp.replace(tzinfo=None) 

623 except (ValueError, AttributeError): 

624 # Use file modification time as fallback 

625 timestamp = datetime.fromtimestamp(jsonl_file.stat().st_mtime) 

626 

627 # Apply since filter 

628 if since and timestamp < since: 

629 return None 

630 

631 # Create message object 

632 return UserMessage( 

633 content=message_content, 

634 timestamp=timestamp, 

635 session_id=record.get("sessionId", ""), 

636 uuid=record.get("uuid", ""), 

637 cwd=record.get("cwd"), 

638 git_branch=record.get("gitBranch"), 

639 is_sidechain=record.get("isSidechain", False), 

640 ) 

641 

642 

643def _extract_messages_with_context( 

644 records: list[dict], 

645 jsonl_file: Path, 

646 since: datetime | None, 

647) -> list[UserMessage]: 

648 """Extract user messages with response context from a list of records. 

649 

650 Pairs each user message with ALL following assistant responses until the 

651 next user message, aggregating tool usage and file changes. 

652 

653 Args: 

654 records: List of all records from a JSONL file 

655 jsonl_file: Source file (for fallback timestamp) 

656 since: Filter for messages after this datetime 

657 

658 Returns: 

659 List of UserMessages with response_metadata populated 

660 """ 

661 messages: list[UserMessage] = [] 

662 

663 i = 0 

664 while i < len(records): 

665 record = records[i] 

666 msg = _parse_user_record(record, jsonl_file, since) 

667 

668 if msg is not None: 

669 # Collect ALL assistant responses until next user message 

670 assistant_responses: list[dict] = [] 

671 for j in range(i + 1, len(records)): 

672 next_record = records[j] 

673 if next_record.get("type") == "assistant": 

674 assistant_responses.append(next_record) 

675 elif next_record.get("type") == "user": 

676 # Hit another user message, stop collecting 

677 break 

678 

679 msg.response_metadata = _aggregate_response_metadata(assistant_responses) 

680 messages.append(msg) 

681 

682 i += 1 

683 

684 return messages 

685 

686 

687def save_messages( 

688 messages: list[UserMessage], 

689 output_path: Path | None = None, 

690) -> Path: 

691 """Save messages to timestamped JSONL file. 

692 

693 Args: 

694 messages: List of UserMessage objects to save 

695 output_path: Output file path. If None, uses default location. 

696 

697 Returns: 

698 Path to the saved file. 

699 """ 

700 if output_path is None: 

701 # Default: ./.claude/user-messages-{timestamp}.jsonl 

702 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") 

703 output_dir = Path.cwd() / ".claude" 

704 output_dir.mkdir(parents=True, exist_ok=True) 

705 output_path = output_dir / f"user-messages-{timestamp}.jsonl" 

706 

707 output_path = Path(output_path) 

708 output_path.parent.mkdir(parents=True, exist_ok=True) 

709 

710 with open(output_path, "w", encoding="utf-8") as f: 

711 for msg in messages: 

712 f.write(json.dumps(msg.to_dict()) + "\n") 

713 

714 return output_path 

715 

716 

717def print_messages_to_stdout(messages: list[UserMessage]) -> None: 

718 """Print messages to stdout in JSONL format. 

719 

720 Args: 

721 messages: List of UserMessage objects to print 

722 """ 

723 import sys 

724 

725 for msg in messages: 

726 print(json.dumps(msg.to_dict()), file=sys.stdout)