Coverage for little_loops / workflow_sequence_analyzer.py: 0%

444 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""ll-workflows: Identify multi-step workflow patterns from user message history. 

2 

3Identifies multi-step workflows and cross-session patterns using: 

4- Entity-based clustering 

5- Time-gap weighted boundaries 

6- Semantic similarity scoring 

7- Workflow template matching 

8 

9Usage as CLI: 

10 ll-workflows analyze --input messages.jsonl --patterns step1.yaml 

11 ll-workflows analyze -i messages.jsonl -p patterns.yaml -o output.yaml 

12 

13Usage as library: 

14 from little_loops.workflow_sequence_analyzer import analyze_workflows 

15 

16 result = analyze_workflows( 

17 messages_file=Path("user-messages.jsonl"), 

18 patterns_file=Path("step1-patterns.yaml"), 

19 output_file=Path("step2-workflows.yaml"), 

20 ) 

21""" 

22 

23from __future__ import annotations 

24 

25import json 

26import re 

27import sys 

28from dataclasses import dataclass, field 

29from datetime import datetime 

30from pathlib import Path 

31from typing import Any 

32 

33import yaml 

34 

35__all__ = [ 

36 "analyze_workflows", 

37 "SessionLink", 

38 "EntityCluster", 

39 "WorkflowBoundary", 

40 "Workflow", 

41 "WorkflowAnalysis", 

42 "extract_entities", 

43 "calculate_boundary_weight", 

44 "entity_overlap", 

45 "get_verb_class", 

46 "semantic_similarity", 

47] 

48 

49# Module-level compiled regex patterns 

50FILE_PATTERN = re.compile(r"[\w./-]+\.(?:md|py|json|yaml|yml|js|ts|tsx|jsx|sh|toml)", re.IGNORECASE) 

51PHASE_PATTERN = re.compile(r"phase[- ]?\d+", re.IGNORECASE) 

52MODULE_PATTERN = re.compile(r"module[- ]?\d+", re.IGNORECASE) 

53COMMAND_PATTERN = re.compile(r"/[\w:-]+") 

54ISSUE_PATTERN = re.compile(r"(?:BUG|FEAT|ENH)-\d+", re.IGNORECASE) 

55 

56# Verb class taxonomy for semantic similarity 

57VERB_CLASSES: dict[str, set[str]] = { 

58 "deletion": {"remove", "delete", "drop", "eliminate", "clear", "clean"}, 

59 "modification": {"update", "change", "modify", "edit", "fix", "adjust", "revise"}, 

60 "creation": {"create", "add", "generate", "write", "make", "build"}, 

61 "search": {"find", "search", "locate", "where", "what", "which", "list"}, 

62 "verification": {"check", "verify", "validate", "confirm", "review", "ensure"}, 

63 "execution": {"run", "execute", "launch", "start", "invoke", "call"}, 

64} 

65 

66# Workflow templates: category sequences that indicate common patterns 

67WORKFLOW_TEMPLATES: dict[str, list[str]] = { 

68 "explore → modify → verify": ["file_search", "code_modification", "testing"], 

69 "create → refine → finalize": ["file_write", "code_modification", "git_operation"], 

70 "review → fix → commit": ["code_review", "code_modification", "git_operation"], 

71 "plan → implement → verify": ["planning", "code_modification", "testing"], 

72 "debug → fix → test": ["debugging", "code_modification", "testing"], 

73} 

74 

75# Maps content keywords to workflow category labels used by WORKFLOW_TEMPLATES 

76_CONTENT_CATEGORY_MAP: dict[str, list[str]] = { 

77 "file_search": ["search", "find", "glob", "grep", "locate"], 

78 "code_modification": ["edit", "write", "fix", "refactor", "update", "implement"], 

79 "testing": ["test", "pytest", "assert", "verify", "check"], 

80 "git_operation": ["commit", "push", "branch", "pr", "merge", "pull"], 

81 "planning": ["plan", "design", "architect", "outline", "draft"], 

82 "debugging": ["debug", "trace", "breakpoint", "error", "exception", "bug"], 

83 "code_review": ["review", "inspect", "audit", "read", "examine"], 

84 "file_write": ["create", "generate", "scaffold", "write", "add"], 

85} 

86 

87 

88# ----------------------------------------------------------------------------- 

89# Data Classes 

90# ----------------------------------------------------------------------------- 

91 

92 

93@dataclass 

94class SessionLink: 

95 """Link between related sessions.""" 

96 

97 link_id: str 

98 sessions: list[dict[str, Any]] # session_id, position, link_evidence 

99 unified_workflow: dict[str, Any] # name, total_messages, span_hours 

100 confidence: float 

101 

102 def to_dict(self) -> dict[str, Any]: 

103 """Convert to dictionary for YAML serialization.""" 

104 return { 

105 "link_id": self.link_id, 

106 "sessions": self.sessions, 

107 "unified_workflow": self.unified_workflow, 

108 "confidence": self.confidence, 

109 } 

110 

111 

112@dataclass 

113class EntityCluster: 

114 """Cluster of messages sharing entities.""" 

115 

116 cluster_id: str 

117 primary_entities: list[str] 

118 all_entities: set[str] = field(default_factory=set) 

119 messages: list[dict[str, Any]] = field(default_factory=list) 

120 span: dict[str, Any] | None = None 

121 inferred_workflow: str | None = None 

122 cohesion_score: float = 0.0 

123 

124 def to_dict(self) -> dict[str, Any]: 

125 """Convert to dictionary for YAML serialization.""" 

126 return { 

127 "cluster_id": self.cluster_id, 

128 "primary_entities": self.primary_entities, 

129 "all_entities": sorted(self.all_entities), 

130 "messages": self.messages, 

131 "span": self.span, 

132 "inferred_workflow": self.inferred_workflow, 

133 "cohesion_score": round(self.cohesion_score, 2), 

134 } 

135 

136 

137@dataclass 

138class WorkflowBoundary: 

139 """Boundary between workflows based on time gaps and entity overlap.""" 

140 

141 msg_a: str 

142 msg_b: str 

143 time_gap_seconds: int 

144 time_gap_weight: float 

145 entity_overlap: float 

146 final_boundary_score: float 

147 is_boundary: bool 

148 

149 def to_dict(self) -> dict[str, Any]: 

150 """Convert to dictionary for YAML serialization.""" 

151 return { 

152 "between": {"msg_a": self.msg_a, "msg_b": self.msg_b}, 

153 "time_gap_seconds": self.time_gap_seconds, 

154 "time_gap_weight": round(self.time_gap_weight, 2), 

155 "entity_overlap": round(self.entity_overlap, 2), 

156 "final_boundary_score": round(self.final_boundary_score, 2), 

157 "is_boundary": self.is_boundary, 

158 } 

159 

160 

161@dataclass 

162class Workflow: 

163 """Identified multi-step workflow.""" 

164 

165 workflow_id: str 

166 name: str 

167 pattern: str 

168 pattern_confidence: float 

169 messages: list[dict[str, Any]] 

170 session_span: list[str] 

171 entity_cluster: str | None = None 

172 semantic_cluster: str | None = None 

173 duration_minutes: int = 0 

174 handoff_points: list[dict[str, Any]] = field(default_factory=list) 

175 

176 def to_dict(self) -> dict[str, Any]: 

177 """Convert to dictionary for YAML serialization.""" 

178 return { 

179 "workflow_id": self.workflow_id, 

180 "name": self.name, 

181 "pattern": self.pattern, 

182 "pattern_confidence": round(self.pattern_confidence, 2), 

183 "messages": self.messages, 

184 "session_span": self.session_span, 

185 "entity_cluster": self.entity_cluster, 

186 "semantic_cluster": self.semantic_cluster, 

187 "duration_minutes": self.duration_minutes, 

188 "handoff_points": self.handoff_points, 

189 } 

190 

191 

192@dataclass 

193class WorkflowAnalysis: 

194 """Complete workflow analysis output.""" 

195 

196 metadata: dict[str, Any] 

197 session_links: list[SessionLink] = field(default_factory=list) 

198 entity_clusters: list[EntityCluster] = field(default_factory=list) 

199 workflow_boundaries: list[WorkflowBoundary] = field(default_factory=list) 

200 workflows: list[Workflow] = field(default_factory=list) 

201 handoff_analysis: dict[str, Any] | None = None 

202 

203 def to_dict(self) -> dict[str, Any]: 

204 """Convert to dictionary for YAML serialization.""" 

205 return { 

206 "analysis_metadata": self.metadata, 

207 "session_links": [s.to_dict() for s in self.session_links], 

208 "entity_clusters": [c.to_dict() for c in self.entity_clusters], 

209 "workflow_boundaries": [b.to_dict() for b in self.workflow_boundaries], 

210 "workflows": [w.to_dict() for w in self.workflows], 

211 "handoff_analysis": self.handoff_analysis, 

212 } 

213 

214 

215# ----------------------------------------------------------------------------- 

216# Core Analysis Functions 

217# ----------------------------------------------------------------------------- 

218 

219 

220def extract_entities(content: str) -> set[str]: 

221 """Extract file paths, commands, and concepts from message content. 

222 

223 Args: 

224 content: Message text content 

225 

226 Returns: 

227 Set of extracted entities (file paths, commands, issue IDs, etc.) 

228 """ 

229 entities: set[str] = set() 

230 

231 # File paths 

232 entities.update(FILE_PATTERN.findall(content)) 

233 

234 # Phase/module references 

235 entities.update(PHASE_PATTERN.findall(content.lower())) 

236 entities.update(MODULE_PATTERN.findall(content.lower())) 

237 

238 # Slash commands 

239 entities.update(COMMAND_PATTERN.findall(content)) 

240 

241 # Issue IDs (normalize to uppercase) 

242 entities.update(match.upper() for match in ISSUE_PATTERN.findall(content)) 

243 

244 return entities 

245 

246 

247def calculate_boundary_weight(gap_seconds: int) -> float: 

248 """Calculate workflow boundary weight based on time gap. 

249 

250 Args: 

251 gap_seconds: Time gap between messages in seconds 

252 

253 Returns: 

254 Boundary weight from 0.0 to 0.95 

255 """ 

256 if gap_seconds < 30: 

257 return 0.0 

258 elif gap_seconds < 120: 

259 return 0.1 

260 elif gap_seconds < 300: 

261 return 0.3 

262 elif gap_seconds < 900: 

263 return 0.5 

264 elif gap_seconds < 1800: 

265 return 0.7 

266 elif gap_seconds < 7200: 

267 return 0.85 

268 else: 

269 return 0.95 

270 

271 

272def entity_overlap(entities_a: set[str], entities_b: set[str]) -> float: 

273 """Calculate Jaccard similarity between two entity sets. 

274 

275 Args: 

276 entities_a: First set of entities 

277 entities_b: Second set of entities 

278 

279 Returns: 

280 Jaccard similarity coefficient (0.0 to 1.0) 

281 """ 

282 if not entities_a or not entities_b: 

283 return 0.0 

284 intersection = len(entities_a & entities_b) 

285 union = len(entities_a | entities_b) 

286 return intersection / union if union > 0 else 0.0 

287 

288 

289def get_verb_class(content: str) -> str | None: 

290 """Extract verb class from message content. 

291 

292 Args: 

293 content: Message text content 

294 

295 Returns: 

296 Verb class name or None if no match 

297 """ 

298 content_lower = content.lower() 

299 words = set(re.findall(r"\b\w+\b", content_lower)) 

300 

301 for verb_class, verbs in VERB_CLASSES.items(): 

302 if words & verbs: 

303 return verb_class 

304 return None 

305 

306 

307def semantic_similarity( 

308 content_a: str, 

309 content_b: str, 

310 entities_a: set[str], 

311 entities_b: set[str], 

312 category_a: str | None, 

313 category_b: str | None, 

314) -> float: 

315 """Calculate semantic similarity between two messages. 

316 

317 Uses weighted combination of: 

318 - Keyword overlap (0.3) 

319 - Verb class match (0.3) 

320 - Entity overlap (0.3) 

321 - Category match (0.1) 

322 

323 Args: 

324 content_a: First message content 

325 content_b: Second message content 

326 entities_a: Entities from first message 

327 entities_b: Entities from second message 

328 category_a: Category of first message 

329 category_b: Category of second message 

330 

331 Returns: 

332 Similarity score (0.0 to 1.0) 

333 """ 

334 # Keyword overlap (simple word-level Jaccard) 

335 words_a = set(re.findall(r"\b[a-z]{3,}\b", content_a.lower())) 

336 words_b = set(re.findall(r"\b[a-z]{3,}\b", content_b.lower())) 

337 keyword_sim = len(words_a & words_b) / len(words_a | words_b) if words_a | words_b else 0.0 

338 

339 # Verb class similarity 

340 verb_a = get_verb_class(content_a) 

341 verb_b = get_verb_class(content_b) 

342 verb_sim = 1.0 if verb_a and verb_a == verb_b else 0.0 

343 

344 # Entity overlap 

345 entity_sim = entity_overlap(entities_a, entities_b) 

346 

347 # Category match 

348 category_sim = 1.0 if category_a and category_a == category_b else 0.0 

349 

350 # Weighted combination 

351 return keyword_sim * 0.3 + verb_sim * 0.3 + entity_sim * 0.3 + category_sim * 0.1 

352 

353 

354# ----------------------------------------------------------------------------- 

355# Internal Analysis Functions 

356# ----------------------------------------------------------------------------- 

357 

358 

359def _load_messages(messages_file: Path) -> list[dict[str, Any]]: 

360 """Load messages from JSONL file.""" 

361 messages = [] 

362 skipped = 0 

363 with open(messages_file, encoding="utf-8") as f: 

364 for line_num, raw_line in enumerate(f, 1): 

365 line = raw_line.strip() 

366 if not line: 

367 continue 

368 try: 

369 messages.append(json.loads(line)) 

370 except json.JSONDecodeError as e: 

371 skipped += 1 

372 print(f"Warning: skipping malformed line {line_num}: {e}", file=sys.stderr) 

373 if skipped: 

374 print(f"Warning: skipped {skipped} malformed line(s) in {messages_file}", file=sys.stderr) 

375 return messages 

376 

377 

378def _load_patterns(patterns_file: Path) -> dict[str, Any]: 

379 """Load patterns from Step 1 YAML output.""" 

380 with open(patterns_file, encoding="utf-8") as f: 

381 return yaml.safe_load(f) or {} 

382 

383 

384def _group_by_session(messages: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: 

385 """Group messages by session_id.""" 

386 sessions: dict[str, list[dict[str, Any]]] = {} 

387 for msg in messages: 

388 session_id = msg.get("session_id", "unknown") 

389 if session_id not in sessions: 

390 sessions[session_id] = [] 

391 sessions[session_id].append(msg) 

392 return sessions 

393 

394 

395def _detect_handoff(content: str) -> bool: 

396 """Check if message indicates a session handoff.""" 

397 handoff_markers = [ 

398 "/ll:handoff", 

399 "continue in new session", 

400 "pick up in next session", 

401 "resuming from", 

402 "continuation of", 

403 ] 

404 content_lower = content.lower() 

405 return any(marker in content_lower for marker in handoff_markers) 

406 

407 

408def _parse_timestamps(messages: list[dict[str, Any]]) -> list[datetime]: 

409 """Parse valid ISO timestamps from a list of messages, stripping timezone info.""" 

410 timestamps = [] 

411 for msg in messages: 

412 ts_str = msg.get("timestamp", "") 

413 if ts_str: 

414 try: 

415 ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) 

416 if ts.tzinfo is not None: 

417 ts = ts.replace(tzinfo=None) 

418 timestamps.append(ts) 

419 except (ValueError, AttributeError, TypeError): 

420 pass 

421 return timestamps 

422 

423 

424def _link_sessions(sessions: dict[str, list[dict[str, Any]]]) -> list[SessionLink]: 

425 """Identify sessions that are part of the same workflow.""" 

426 links: list[SessionLink] = [] 

427 session_ids = list(sessions.keys()) 

428 link_counter = 0 

429 

430 for i, session_a_id in enumerate(session_ids): 

431 session_a = sessions[session_a_id] 

432 if not session_a: 

433 continue 

434 

435 # Extract session metadata 

436 last_msg_a = session_a[-1] if session_a else {} 

437 entities_a: set[str] = set() 

438 for msg in session_a: 

439 entities_a.update(extract_entities(msg.get("content", ""))) 

440 branch_a = last_msg_a.get("git_branch") 

441 

442 for session_b_id in session_ids[i + 1 :]: 

443 session_b = sessions[session_b_id] 

444 if not session_b: 

445 continue 

446 

447 first_msg_b = session_b[0] if session_b else {} 

448 entities_b: set[str] = set() 

449 for msg in session_b: 

450 entities_b.update(extract_entities(msg.get("content", ""))) 

451 branch_b = first_msg_b.get("git_branch") 

452 

453 # Calculate link score 

454 score = 0.0 

455 evidence: list[str] = [] 

456 

457 # Same git branch (HIGH weight) 

458 if branch_a and branch_a == branch_b: 

459 score += 0.4 

460 evidence.append("shared_branch") 

461 

462 # Explicit handoff marker (HIGH weight) 

463 if any(_detect_handoff(msg.get("content", "")) for msg in session_a): 

464 score += 0.4 

465 evidence.append("handoff_detected") 

466 

467 # Shared entities (MEDIUM weight) 

468 overlap = entity_overlap(entities_a, entities_b) 

469 if overlap > 0.5: 

470 score += 0.2 

471 evidence.append("entity_overlap") 

472 elif overlap > 0.3: 

473 score += 0.1 

474 evidence.append("partial_entity_overlap") 

475 

476 if score > 0.3: 

477 link_counter += 1 

478 

479 # Calculate span 

480 timestamps = _parse_timestamps(session_a + session_b) 

481 

482 span_hours = 0.0 

483 if len(timestamps) >= 2: 

484 try: 

485 span_hours = (max(timestamps) - min(timestamps)).total_seconds() / 3600 

486 except TypeError: 

487 span_hours = 0.0 

488 

489 links.append( 

490 SessionLink( 

491 link_id=f"link-{link_counter:03d}", 

492 sessions=[ 

493 { 

494 "session_id": session_a_id, 

495 "position": 1, 

496 "link_evidence": evidence[0] if evidence else "score", 

497 }, 

498 { 

499 "session_id": session_b_id, 

500 "position": 2, 

501 "link_evidence": evidence[-1] if evidence else "score", 

502 }, 

503 ], 

504 unified_workflow={ 

505 "name": f"Linked workflow {link_counter}", 

506 "total_messages": len(session_a) + len(session_b), 

507 "span_hours": round(span_hours, 1), 

508 "evidence": evidence, 

509 }, 

510 confidence=min(score, 1.0), 

511 ) 

512 ) 

513 

514 return links 

515 

516 

517def _cluster_by_entities( 

518 messages: list[dict[str, Any]], overlap_threshold: float = 0.3 

519) -> list[EntityCluster]: 

520 """Cluster messages with significant entity overlap.""" 

521 clusters: list[EntityCluster] = [] 

522 cluster_counter = 0 

523 

524 for msg in messages: 

525 content = msg.get("content", "") 

526 msg_entities = extract_entities(content) 

527 

528 if not msg_entities: 

529 continue 

530 

531 # Find matching cluster 

532 matched_cluster = None 

533 best_overlap = overlap_threshold 

534 

535 for cluster in clusters: 

536 overlap = entity_overlap(msg_entities, cluster.all_entities) 

537 if overlap > best_overlap: 

538 best_overlap = overlap 

539 matched_cluster = cluster 

540 

541 if matched_cluster: 

542 entities_matched = sorted(msg_entities & matched_cluster.all_entities) 

543 matched_cluster.all_entities.update(msg_entities) 

544 matched_cluster.messages.append( 

545 { 

546 "uuid": msg.get("uuid", ""), 

547 "content": content[:80] + "..." if len(content) > 80 else content, 

548 "entities_matched": entities_matched, 

549 "timestamp": msg.get("timestamp"), 

550 } 

551 ) 

552 # Update cohesion score (average overlap of messages) 

553 matched_cluster.cohesion_score = ( 

554 matched_cluster.cohesion_score * (len(matched_cluster.messages) - 1) + best_overlap 

555 ) / len(matched_cluster.messages) 

556 else: 

557 cluster_counter += 1 

558 # Create new cluster 

559 primary = sorted(msg_entities)[:3] # Top 3 entities 

560 cluster = EntityCluster( 

561 cluster_id=f"cluster-{cluster_counter:03d}", 

562 primary_entities=primary, 

563 all_entities=msg_entities.copy(), 

564 messages=[ 

565 { 

566 "uuid": msg.get("uuid", ""), 

567 "content": content[:80] + "..." if len(content) > 80 else content, 

568 "entities_matched": sorted(msg_entities), 

569 "timestamp": msg.get("timestamp"), 

570 } 

571 ], 

572 cohesion_score=1.0, 

573 ) 

574 clusters.append(cluster) 

575 

576 # Populate span and inferred_workflow for each multi-message cluster 

577 for cluster in clusters: 

578 # Compute span from timestamps 

579 timestamps = _parse_timestamps(cluster.messages) 

580 if len(timestamps) >= 2: 

581 cluster.span = { 

582 "start": min(timestamps).isoformat(), 

583 "end": max(timestamps).isoformat(), 

584 } 

585 

586 # Infer workflow by matching cluster message content against WORKFLOW_TEMPLATES 

587 cluster_categories: set[str] = set() 

588 for m in cluster.messages: 

589 lower = m.get("content", "").lower() 

590 for category, keywords in _CONTENT_CATEGORY_MAP.items(): 

591 if any(kw in lower for kw in keywords): 

592 cluster_categories.add(category) 

593 

594 best_name: str | None = None 

595 best_score = 0.0 

596 for template_name, template_cats in WORKFLOW_TEMPLATES.items(): 

597 template_set = set(template_cats) 

598 if template_set: 

599 overlap = len(cluster_categories & template_set) / len(template_set) 

600 if overlap > best_score: 

601 best_score = overlap 

602 best_name = template_name 

603 if best_score >= 0.3: 

604 cluster.inferred_workflow = best_name 

605 

606 # Filter out single-message clusters 

607 return [c for c in clusters if len(c.messages) >= 2] 

608 

609 

610def _compute_boundaries( 

611 messages: list[dict[str, Any]], boundary_threshold: float = 0.6 

612) -> list[WorkflowBoundary]: 

613 """Compute workflow boundaries between consecutive messages.""" 

614 boundaries: list[WorkflowBoundary] = [] 

615 

616 # Sort by timestamp 

617 sorted_msgs = sorted(messages, key=lambda m: m.get("timestamp", "")) 

618 

619 for i in range(len(sorted_msgs) - 1): 

620 msg_a = sorted_msgs[i] 

621 msg_b = sorted_msgs[i + 1] 

622 

623 # Parse timestamps 

624 pair_timestamps = _parse_timestamps([msg_a, msg_b]) 

625 if len(pair_timestamps) == 2: 

626 gap_seconds = int((pair_timestamps[1] - pair_timestamps[0]).total_seconds()) 

627 else: 

628 gap_seconds = 0 

629 

630 # Calculate time gap weight 

631 time_weight = calculate_boundary_weight(gap_seconds) 

632 

633 # Calculate entity overlap 

634 entities_a = extract_entities(msg_a.get("content", "")) 

635 entities_b = extract_entities(msg_b.get("content", "")) 

636 overlap = entity_overlap(entities_a, entities_b) 

637 

638 # Adjust for entity overlap (reduce boundary weight if same topic) 

639 final_score = time_weight 

640 if overlap > 0.5: 

641 final_score = max(0.0, time_weight - 0.3) 

642 elif overlap > 0.3: 

643 final_score = max(0.0, time_weight - 0.15) 

644 

645 is_boundary = final_score >= boundary_threshold 

646 

647 boundaries.append( 

648 WorkflowBoundary( 

649 msg_a=msg_a.get("uuid", ""), 

650 msg_b=msg_b.get("uuid", ""), 

651 time_gap_seconds=gap_seconds, 

652 time_gap_weight=time_weight, 

653 entity_overlap=overlap, 

654 final_boundary_score=final_score, 

655 is_boundary=is_boundary, 

656 ) 

657 ) 

658 

659 return boundaries 

660 

661 

662def _get_message_category(msg_uuid: str, patterns: dict[str, Any]) -> str | None: 

663 """Look up message category from Step 1 patterns.""" 

664 for category_info in patterns.get("category_distribution", []): 

665 for example in category_info.get("example_messages", []): 

666 if example.get("uuid") == msg_uuid: 

667 category = category_info.get("category") 

668 return category if isinstance(category, str) else None 

669 return None 

670 

671 

672def _detect_workflows( 

673 messages: list[dict[str, Any]], 

674 boundaries: list[WorkflowBoundary], 

675 patterns: dict[str, Any], 

676) -> list[Workflow]: 

677 """Detect multi-step workflows using template matching.""" 

678 workflows: list[Workflow] = [] 

679 workflow_counter = 0 

680 

681 # Sort messages by timestamp 

682 sorted_msgs = sorted(messages, key=lambda m: m.get("timestamp", "")) 

683 

684 # Build boundary index (msg_b uuid -> is_boundary) 

685 boundary_before: dict[str, bool] = {} 

686 for b in boundaries: 

687 boundary_before[b.msg_b] = b.is_boundary 

688 

689 # Segment messages by boundaries 

690 segments: list[list[dict[str, Any]]] = [] 

691 current_segment: list[dict[str, Any]] = [] 

692 

693 for msg in sorted_msgs: 

694 uuid = msg.get("uuid", "") 

695 if boundary_before.get(uuid, False) and current_segment: 

696 segments.append(current_segment) 

697 current_segment = [] 

698 current_segment.append(msg) 

699 

700 if current_segment: 

701 segments.append(current_segment) 

702 

703 # Match each segment against workflow templates 

704 for segment in segments: 

705 if len(segment) < 2: 

706 continue 

707 

708 # Get categories for segment messages (from patterns) 

709 segment_categories: list[str] = [] 

710 for msg in segment: 

711 cat = _get_message_category(msg.get("uuid", ""), patterns) 

712 if cat: 

713 segment_categories.append(cat) 

714 

715 if len(segment_categories) < 2: 

716 continue 

717 

718 # Find best matching template 

719 best_match: tuple[str, float] | None = None 

720 

721 for template_name, template_cats in WORKFLOW_TEMPLATES.items(): 

722 # Check if template categories appear in sequence (allowing gaps) 

723 template_idx = 0 

724 matches = 0 

725 

726 for cat in segment_categories: 

727 if template_idx < len(template_cats) and cat == template_cats[template_idx]: 

728 matches += 1 

729 template_idx += 1 

730 

731 if matches >= 2: # At least 2 template steps matched 

732 confidence = matches / len(template_cats) 

733 if best_match is None or confidence > best_match[1]: 

734 best_match = (template_name, confidence) 

735 

736 if best_match: 

737 workflow_counter += 1 

738 

739 # Calculate duration 

740 timestamps = _parse_timestamps(segment) 

741 

742 duration_minutes = 0 

743 if len(timestamps) >= 2: 

744 try: 

745 duration_minutes = int((max(timestamps) - min(timestamps)).total_seconds() / 60) 

746 except TypeError: 

747 duration_minutes = 0 

748 

749 # Get sessions 

750 session_ids = list({msg.get("session_id", "") for msg in segment}) 

751 

752 workflows.append( 

753 Workflow( 

754 workflow_id=f"wf-{workflow_counter:03d}", 

755 name=f"Detected: {best_match[0]}", 

756 pattern=best_match[0], 

757 pattern_confidence=best_match[1], 

758 messages=[ 

759 { 

760 "uuid": msg.get("uuid", ""), 

761 "category": _get_message_category(msg.get("uuid", ""), patterns), 

762 "step": i + 1, 

763 } 

764 for i, msg in enumerate(segment) 

765 ], 

766 session_span=session_ids, 

767 duration_minutes=duration_minutes, 

768 ) 

769 ) 

770 

771 return workflows 

772 

773 

774# ----------------------------------------------------------------------------- 

775# Main API 

776# ----------------------------------------------------------------------------- 

777 

778 

779def analyze_workflows( 

780 messages_file: Path, 

781 patterns_file: Path, 

782 output_file: Path | None = None, 

783 overlap_threshold: float = 0.3, 

784 boundary_threshold: float = 0.6, 

785 verbose: bool = False, 

786 output_format: str = "yaml", 

787) -> WorkflowAnalysis: 

788 """Main entry point: analyze workflows from messages and patterns. 

789 

790 Args: 

791 messages_file: Path to JSONL file with extracted user messages 

792 patterns_file: Path to YAML file from Step 1 (workflow-pattern-analyzer) 

793 output_file: Output path for step2-workflows.yaml (optional) 

794 overlap_threshold: Minimum entity overlap to cluster messages together (default: 0.3) 

795 boundary_threshold: Minimum boundary score to split workflow segments (default: 0.6) 

796 verbose: Emit per-stage progress to stderr (default: False) 

797 output_format: Output serialization format, "yaml" or "json" (default: "yaml") 

798 

799 Returns: 

800 WorkflowAnalysis with all analysis results 

801 """ 

802 # Load inputs 

803 messages = _load_messages(messages_file) 

804 patterns = _load_patterns(patterns_file) 

805 

806 # Build metadata 

807 metadata = { 

808 "source_file": messages_file.name, 

809 "patterns_file": patterns_file.name, 

810 "message_count": len(messages), 

811 "analysis_timestamp": datetime.now().isoformat(), 

812 "module": "workflow-sequence-analyzer", 

813 "version": "1.0", 

814 } 

815 

816 # Run analysis pipeline 

817 sessions = _group_by_session(messages) 

818 if verbose: 

819 print(f"[1/4] Linking sessions across {len(sessions)} session(s)...", file=sys.stderr) 

820 session_links = _link_sessions(sessions) 

821 if verbose: 

822 print(f"{len(session_links)} link(s) found", file=sys.stderr) 

823 if verbose: 

824 print("[2/4] Clustering by entities...", file=sys.stderr) 

825 entity_clusters = _cluster_by_entities(messages, overlap_threshold=overlap_threshold) 

826 if verbose: 

827 print(f"{len(entity_clusters)} cluster(s) found", file=sys.stderr) 

828 if verbose: 

829 print("[3/4] Computing workflow boundaries...", file=sys.stderr) 

830 boundaries = _compute_boundaries(messages, boundary_threshold=boundary_threshold) 

831 if verbose: 

832 print(f"{len(boundaries)} boundary/boundaries found", file=sys.stderr) 

833 if verbose: 

834 print("[4/4] Detecting workflows...", file=sys.stderr) 

835 workflows = _detect_workflows(messages, boundaries, patterns) 

836 if verbose: 

837 print(f"{len(workflows)} workflow(s) detected", file=sys.stderr) 

838 

839 # Cross-reference: link workflows to entity clusters and populate handoff_points 

840 uuid_to_cluster: dict[str, str] = {} 

841 for cluster in entity_clusters: 

842 for msg in cluster.messages: 

843 uuid = msg.get("uuid", "") 

844 if uuid: 

845 uuid_to_cluster[uuid] = cluster.cluster_id 

846 

847 uuid_to_content: dict[str, str] = { 

848 m.get("uuid", ""): m.get("content", "") for m in messages if m.get("uuid", "") 

849 } 

850 

851 for workflow in workflows: 

852 cluster_votes: dict[str, int] = {} 

853 for msg in workflow.messages: 

854 cluster_id = uuid_to_cluster.get(msg.get("uuid", "")) 

855 if cluster_id: 

856 cluster_votes[cluster_id] = cluster_votes.get(cluster_id, 0) + 1 

857 if cluster_votes: 

858 workflow.entity_cluster = max(cluster_votes, key=cluster_votes.__getitem__) 

859 

860 for msg in workflow.messages: 

861 uuid = msg.get("uuid", "") 

862 if uuid and _detect_handoff(uuid_to_content.get(uuid, "")): 

863 workflow.handoff_points.append({"uuid": uuid, "type": "explicit_handoff"}) 

864 

865 # Compute handoff analysis 

866 handoff_count = sum( 

867 1 

868 for link in session_links 

869 if "handoff_detected" in link.unified_workflow.get("evidence", []) 

870 ) 

871 

872 handoff_analysis: dict[str, Any] = { 

873 "total_handoffs": handoff_count, 

874 "handoff_patterns": [ 

875 {"pattern": "explicit_handoff", "count": handoff_count}, 

876 {"pattern": "session_timeout", "count": len(session_links) - handoff_count}, 

877 ], 

878 "recommendations": [], 

879 } 

880 

881 if len(session_links) > handoff_count: 

882 handoff_analysis["recommendations"].append( 

883 "Consider using /ll:handoff for cleaner session transitions" 

884 ) 

885 

886 # Build result 

887 analysis = WorkflowAnalysis( 

888 metadata=metadata, 

889 session_links=session_links, 

890 entity_clusters=entity_clusters, 

891 workflow_boundaries=boundaries, 

892 workflows=workflows, 

893 handoff_analysis=handoff_analysis, 

894 ) 

895 

896 # Write output if path provided 

897 if output_file: 

898 output_file = Path(output_file) 

899 output_file.parent.mkdir(parents=True, exist_ok=True) 

900 with open(output_file, "w", encoding="utf-8") as f: 

901 if output_format == "json": 

902 json.dump(analysis.to_dict(), f, indent=2, default=str) 

903 else: 

904 yaml.dump(analysis.to_dict(), f, default_flow_style=False, sort_keys=False) 

905 

906 return analysis 

907 

908 

909# ----------------------------------------------------------------------------- 

910# CLI 

911# ----------------------------------------------------------------------------- 

912 

913 

914def main() -> int: 

915 """Entry point for ll-workflows command. 

916 

917 Analyze workflows from user messages and Step 1 patterns. 

918 

919 Returns: 

920 Exit code (0 = success, 1 = failure) 

921 """ 

922 import argparse 

923 import sys 

924 

925 parser = argparse.ArgumentParser( 

926 description="Identify multi-step workflow patterns from user message history", 

927 formatter_class=argparse.RawDescriptionHelpFormatter, 

928 epilog=""" 

929Examples: 

930 %(prog)s analyze --input messages.jsonl --patterns step1.yaml 

931 %(prog)s analyze -i messages.jsonl -p patterns.yaml -o output.yaml 

932 %(prog)s analyze --input .claude/user-messages.jsonl \\ 

933 --patterns .claude/workflow-analysis/step1-patterns.yaml 

934""", 

935 ) 

936 

937 subparsers = parser.add_subparsers(dest="command", help="Available commands") 

938 

939 # analyze subcommand 

940 analyze_parser = subparsers.add_parser( 

941 "analyze", 

942 help="Analyze workflows from messages and patterns", 

943 ) 

944 analyze_parser.add_argument( 

945 "-i", 

946 "--input", 

947 type=Path, 

948 required=True, 

949 help="Input JSONL file with user messages", 

950 ) 

951 analyze_parser.add_argument( 

952 "-p", 

953 "--patterns", 

954 type=Path, 

955 required=True, 

956 help="Input YAML file from Step 1 (workflow-pattern-analyzer)", 

957 ) 

958 analyze_parser.add_argument( 

959 "-o", 

960 "--output", 

961 type=Path, 

962 default=None, 

963 help="Output file (default: .claude/workflow-analysis/step2-workflows.yaml or .json)", 

964 ) 

965 analyze_parser.add_argument( 

966 "-f", 

967 "--format", 

968 choices=["yaml", "json"], 

969 default="yaml", 

970 help="Output format (default: yaml)", 

971 ) 

972 analyze_parser.add_argument( 

973 "-v", 

974 "--verbose", 

975 action="store_true", 

976 help="Print verbose progress information", 

977 ) 

978 analyze_parser.add_argument( 

979 "--overlap-threshold", 

980 type=float, 

981 default=0.3, 

982 metavar="FLOAT", 

983 help="Minimum entity overlap to cluster messages together (default: 0.3)", 

984 ) 

985 analyze_parser.add_argument( 

986 "--boundary-threshold", 

987 type=float, 

988 default=0.6, 

989 metavar="FLOAT", 

990 help="Minimum boundary score to split workflow segments (default: 0.6)", 

991 ) 

992 

993 args = parser.parse_args() 

994 

995 if args.command is None: 

996 parser.print_help() 

997 return 1 

998 

999 if args.command == "analyze": 

1000 # Validate input files 

1001 if not args.input.exists(): 

1002 print(f"Error: Input file not found: {args.input}", file=sys.stderr) 

1003 return 1 

1004 

1005 if not args.patterns.exists(): 

1006 print(f"Error: Patterns file not found: {args.patterns}", file=sys.stderr) 

1007 return 1 

1008 

1009 # Validate threshold ranges 

1010 if not (0.0 <= args.overlap_threshold <= 1.0): 

1011 print( 

1012 f"Error: --overlap-threshold must be in [0.0, 1.0], got {args.overlap_threshold}", 

1013 file=sys.stderr, 

1014 ) 

1015 return 1 

1016 

1017 if not (0.0 <= args.boundary_threshold <= 1.0): 

1018 print( 

1019 f"Error: --boundary-threshold must be in [0.0, 1.0], got {args.boundary_threshold}", 

1020 file=sys.stderr, 

1021 ) 

1022 return 1 

1023 

1024 # Set default output path 

1025 output_path = args.output 

1026 if output_path is None: 

1027 if args.format == "json": 

1028 output_path = Path(".claude/workflow-analysis/step2-workflows.json") 

1029 else: 

1030 output_path = Path(".claude/workflow-analysis/step2-workflows.yaml") 

1031 

1032 if args.verbose: 

1033 print(f"Input: {args.input}") 

1034 print(f"Patterns: {args.patterns}") 

1035 print(f"Output: {output_path}") 

1036 

1037 try: 

1038 analysis = analyze_workflows( 

1039 messages_file=args.input, 

1040 patterns_file=args.patterns, 

1041 output_file=output_path, 

1042 overlap_threshold=args.overlap_threshold, 

1043 boundary_threshold=args.boundary_threshold, 

1044 verbose=args.verbose, 

1045 output_format=args.format, 

1046 ) 

1047 

1048 if args.verbose: 

1049 print(f"Analyzed {analysis.metadata['message_count']} messages") 

1050 print(f"Found {len(analysis.session_links)} session links") 

1051 print(f"Found {len(analysis.entity_clusters)} entity clusters") 

1052 print(f"Detected {len(analysis.workflows)} workflows") 

1053 print(f"Output written to: {output_path}") 

1054 

1055 return 0 

1056 

1057 except Exception as e: 

1058 print(f"Error: {e}", file=sys.stderr) 

1059 return 1 

1060 

1061 return 1 

1062 

1063 

1064if __name__ == "__main__": 

1065 raise SystemExit(main())