Coverage for little_loops / workflow_sequence_analyzer.py: 0%
444 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""ll-workflows: Identify multi-step workflow patterns from user message history.
3Identifies multi-step workflows and cross-session patterns using:
4- Entity-based clustering
5- Time-gap weighted boundaries
6- Semantic similarity scoring
7- Workflow template matching
9Usage as CLI:
10 ll-workflows analyze --input messages.jsonl --patterns step1.yaml
11 ll-workflows analyze -i messages.jsonl -p patterns.yaml -o output.yaml
13Usage as library:
14 from little_loops.workflow_sequence_analyzer import analyze_workflows
16 result = analyze_workflows(
17 messages_file=Path("user-messages.jsonl"),
18 patterns_file=Path("step1-patterns.yaml"),
19 output_file=Path("step2-workflows.yaml"),
20 )
21"""
23from __future__ import annotations
25import json
26import re
27import sys
28from dataclasses import dataclass, field
29from datetime import datetime
30from pathlib import Path
31from typing import Any
33import yaml
35__all__ = [
36 "analyze_workflows",
37 "SessionLink",
38 "EntityCluster",
39 "WorkflowBoundary",
40 "Workflow",
41 "WorkflowAnalysis",
42 "extract_entities",
43 "calculate_boundary_weight",
44 "entity_overlap",
45 "get_verb_class",
46 "semantic_similarity",
47]
49# Module-level compiled regex patterns
50FILE_PATTERN = re.compile(r"[\w./-]+\.(?:md|py|json|yaml|yml|js|ts|tsx|jsx|sh|toml)", re.IGNORECASE)
51PHASE_PATTERN = re.compile(r"phase[- ]?\d+", re.IGNORECASE)
52MODULE_PATTERN = re.compile(r"module[- ]?\d+", re.IGNORECASE)
53COMMAND_PATTERN = re.compile(r"/[\w:-]+")
54ISSUE_PATTERN = re.compile(r"(?:BUG|FEAT|ENH)-\d+", re.IGNORECASE)
56# Verb class taxonomy for semantic similarity
57VERB_CLASSES: dict[str, set[str]] = {
58 "deletion": {"remove", "delete", "drop", "eliminate", "clear", "clean"},
59 "modification": {"update", "change", "modify", "edit", "fix", "adjust", "revise"},
60 "creation": {"create", "add", "generate", "write", "make", "build"},
61 "search": {"find", "search", "locate", "where", "what", "which", "list"},
62 "verification": {"check", "verify", "validate", "confirm", "review", "ensure"},
63 "execution": {"run", "execute", "launch", "start", "invoke", "call"},
64}
66# Workflow templates: category sequences that indicate common patterns
67WORKFLOW_TEMPLATES: dict[str, list[str]] = {
68 "explore → modify → verify": ["file_search", "code_modification", "testing"],
69 "create → refine → finalize": ["file_write", "code_modification", "git_operation"],
70 "review → fix → commit": ["code_review", "code_modification", "git_operation"],
71 "plan → implement → verify": ["planning", "code_modification", "testing"],
72 "debug → fix → test": ["debugging", "code_modification", "testing"],
73}
75# Maps content keywords to workflow category labels used by WORKFLOW_TEMPLATES
76_CONTENT_CATEGORY_MAP: dict[str, list[str]] = {
77 "file_search": ["search", "find", "glob", "grep", "locate"],
78 "code_modification": ["edit", "write", "fix", "refactor", "update", "implement"],
79 "testing": ["test", "pytest", "assert", "verify", "check"],
80 "git_operation": ["commit", "push", "branch", "pr", "merge", "pull"],
81 "planning": ["plan", "design", "architect", "outline", "draft"],
82 "debugging": ["debug", "trace", "breakpoint", "error", "exception", "bug"],
83 "code_review": ["review", "inspect", "audit", "read", "examine"],
84 "file_write": ["create", "generate", "scaffold", "write", "add"],
85}
88# -----------------------------------------------------------------------------
89# Data Classes
90# -----------------------------------------------------------------------------
93@dataclass
94class SessionLink:
95 """Link between related sessions."""
97 link_id: str
98 sessions: list[dict[str, Any]] # session_id, position, link_evidence
99 unified_workflow: dict[str, Any] # name, total_messages, span_hours
100 confidence: float
102 def to_dict(self) -> dict[str, Any]:
103 """Convert to dictionary for YAML serialization."""
104 return {
105 "link_id": self.link_id,
106 "sessions": self.sessions,
107 "unified_workflow": self.unified_workflow,
108 "confidence": self.confidence,
109 }
112@dataclass
113class EntityCluster:
114 """Cluster of messages sharing entities."""
116 cluster_id: str
117 primary_entities: list[str]
118 all_entities: set[str] = field(default_factory=set)
119 messages: list[dict[str, Any]] = field(default_factory=list)
120 span: dict[str, Any] | None = None
121 inferred_workflow: str | None = None
122 cohesion_score: float = 0.0
124 def to_dict(self) -> dict[str, Any]:
125 """Convert to dictionary for YAML serialization."""
126 return {
127 "cluster_id": self.cluster_id,
128 "primary_entities": self.primary_entities,
129 "all_entities": sorted(self.all_entities),
130 "messages": self.messages,
131 "span": self.span,
132 "inferred_workflow": self.inferred_workflow,
133 "cohesion_score": round(self.cohesion_score, 2),
134 }
137@dataclass
138class WorkflowBoundary:
139 """Boundary between workflows based on time gaps and entity overlap."""
141 msg_a: str
142 msg_b: str
143 time_gap_seconds: int
144 time_gap_weight: float
145 entity_overlap: float
146 final_boundary_score: float
147 is_boundary: bool
149 def to_dict(self) -> dict[str, Any]:
150 """Convert to dictionary for YAML serialization."""
151 return {
152 "between": {"msg_a": self.msg_a, "msg_b": self.msg_b},
153 "time_gap_seconds": self.time_gap_seconds,
154 "time_gap_weight": round(self.time_gap_weight, 2),
155 "entity_overlap": round(self.entity_overlap, 2),
156 "final_boundary_score": round(self.final_boundary_score, 2),
157 "is_boundary": self.is_boundary,
158 }
161@dataclass
162class Workflow:
163 """Identified multi-step workflow."""
165 workflow_id: str
166 name: str
167 pattern: str
168 pattern_confidence: float
169 messages: list[dict[str, Any]]
170 session_span: list[str]
171 entity_cluster: str | None = None
172 semantic_cluster: str | None = None
173 duration_minutes: int = 0
174 handoff_points: list[dict[str, Any]] = field(default_factory=list)
176 def to_dict(self) -> dict[str, Any]:
177 """Convert to dictionary for YAML serialization."""
178 return {
179 "workflow_id": self.workflow_id,
180 "name": self.name,
181 "pattern": self.pattern,
182 "pattern_confidence": round(self.pattern_confidence, 2),
183 "messages": self.messages,
184 "session_span": self.session_span,
185 "entity_cluster": self.entity_cluster,
186 "semantic_cluster": self.semantic_cluster,
187 "duration_minutes": self.duration_minutes,
188 "handoff_points": self.handoff_points,
189 }
192@dataclass
193class WorkflowAnalysis:
194 """Complete workflow analysis output."""
196 metadata: dict[str, Any]
197 session_links: list[SessionLink] = field(default_factory=list)
198 entity_clusters: list[EntityCluster] = field(default_factory=list)
199 workflow_boundaries: list[WorkflowBoundary] = field(default_factory=list)
200 workflows: list[Workflow] = field(default_factory=list)
201 handoff_analysis: dict[str, Any] | None = None
203 def to_dict(self) -> dict[str, Any]:
204 """Convert to dictionary for YAML serialization."""
205 return {
206 "analysis_metadata": self.metadata,
207 "session_links": [s.to_dict() for s in self.session_links],
208 "entity_clusters": [c.to_dict() for c in self.entity_clusters],
209 "workflow_boundaries": [b.to_dict() for b in self.workflow_boundaries],
210 "workflows": [w.to_dict() for w in self.workflows],
211 "handoff_analysis": self.handoff_analysis,
212 }
215# -----------------------------------------------------------------------------
216# Core Analysis Functions
217# -----------------------------------------------------------------------------
220def extract_entities(content: str) -> set[str]:
221 """Extract file paths, commands, and concepts from message content.
223 Args:
224 content: Message text content
226 Returns:
227 Set of extracted entities (file paths, commands, issue IDs, etc.)
228 """
229 entities: set[str] = set()
231 # File paths
232 entities.update(FILE_PATTERN.findall(content))
234 # Phase/module references
235 entities.update(PHASE_PATTERN.findall(content.lower()))
236 entities.update(MODULE_PATTERN.findall(content.lower()))
238 # Slash commands
239 entities.update(COMMAND_PATTERN.findall(content))
241 # Issue IDs (normalize to uppercase)
242 entities.update(match.upper() for match in ISSUE_PATTERN.findall(content))
244 return entities
247def calculate_boundary_weight(gap_seconds: int) -> float:
248 """Calculate workflow boundary weight based on time gap.
250 Args:
251 gap_seconds: Time gap between messages in seconds
253 Returns:
254 Boundary weight from 0.0 to 0.95
255 """
256 if gap_seconds < 30:
257 return 0.0
258 elif gap_seconds < 120:
259 return 0.1
260 elif gap_seconds < 300:
261 return 0.3
262 elif gap_seconds < 900:
263 return 0.5
264 elif gap_seconds < 1800:
265 return 0.7
266 elif gap_seconds < 7200:
267 return 0.85
268 else:
269 return 0.95
272def entity_overlap(entities_a: set[str], entities_b: set[str]) -> float:
273 """Calculate Jaccard similarity between two entity sets.
275 Args:
276 entities_a: First set of entities
277 entities_b: Second set of entities
279 Returns:
280 Jaccard similarity coefficient (0.0 to 1.0)
281 """
282 if not entities_a or not entities_b:
283 return 0.0
284 intersection = len(entities_a & entities_b)
285 union = len(entities_a | entities_b)
286 return intersection / union if union > 0 else 0.0
289def get_verb_class(content: str) -> str | None:
290 """Extract verb class from message content.
292 Args:
293 content: Message text content
295 Returns:
296 Verb class name or None if no match
297 """
298 content_lower = content.lower()
299 words = set(re.findall(r"\b\w+\b", content_lower))
301 for verb_class, verbs in VERB_CLASSES.items():
302 if words & verbs:
303 return verb_class
304 return None
307def semantic_similarity(
308 content_a: str,
309 content_b: str,
310 entities_a: set[str],
311 entities_b: set[str],
312 category_a: str | None,
313 category_b: str | None,
314) -> float:
315 """Calculate semantic similarity between two messages.
317 Uses weighted combination of:
318 - Keyword overlap (0.3)
319 - Verb class match (0.3)
320 - Entity overlap (0.3)
321 - Category match (0.1)
323 Args:
324 content_a: First message content
325 content_b: Second message content
326 entities_a: Entities from first message
327 entities_b: Entities from second message
328 category_a: Category of first message
329 category_b: Category of second message
331 Returns:
332 Similarity score (0.0 to 1.0)
333 """
334 # Keyword overlap (simple word-level Jaccard)
335 words_a = set(re.findall(r"\b[a-z]{3,}\b", content_a.lower()))
336 words_b = set(re.findall(r"\b[a-z]{3,}\b", content_b.lower()))
337 keyword_sim = len(words_a & words_b) / len(words_a | words_b) if words_a | words_b else 0.0
339 # Verb class similarity
340 verb_a = get_verb_class(content_a)
341 verb_b = get_verb_class(content_b)
342 verb_sim = 1.0 if verb_a and verb_a == verb_b else 0.0
344 # Entity overlap
345 entity_sim = entity_overlap(entities_a, entities_b)
347 # Category match
348 category_sim = 1.0 if category_a and category_a == category_b else 0.0
350 # Weighted combination
351 return keyword_sim * 0.3 + verb_sim * 0.3 + entity_sim * 0.3 + category_sim * 0.1
354# -----------------------------------------------------------------------------
355# Internal Analysis Functions
356# -----------------------------------------------------------------------------
359def _load_messages(messages_file: Path) -> list[dict[str, Any]]:
360 """Load messages from JSONL file."""
361 messages = []
362 skipped = 0
363 with open(messages_file, encoding="utf-8") as f:
364 for line_num, raw_line in enumerate(f, 1):
365 line = raw_line.strip()
366 if not line:
367 continue
368 try:
369 messages.append(json.loads(line))
370 except json.JSONDecodeError as e:
371 skipped += 1
372 print(f"Warning: skipping malformed line {line_num}: {e}", file=sys.stderr)
373 if skipped:
374 print(f"Warning: skipped {skipped} malformed line(s) in {messages_file}", file=sys.stderr)
375 return messages
378def _load_patterns(patterns_file: Path) -> dict[str, Any]:
379 """Load patterns from Step 1 YAML output."""
380 with open(patterns_file, encoding="utf-8") as f:
381 return yaml.safe_load(f) or {}
384def _group_by_session(messages: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
385 """Group messages by session_id."""
386 sessions: dict[str, list[dict[str, Any]]] = {}
387 for msg in messages:
388 session_id = msg.get("session_id", "unknown")
389 if session_id not in sessions:
390 sessions[session_id] = []
391 sessions[session_id].append(msg)
392 return sessions
395def _detect_handoff(content: str) -> bool:
396 """Check if message indicates a session handoff."""
397 handoff_markers = [
398 "/ll:handoff",
399 "continue in new session",
400 "pick up in next session",
401 "resuming from",
402 "continuation of",
403 ]
404 content_lower = content.lower()
405 return any(marker in content_lower for marker in handoff_markers)
408def _parse_timestamps(messages: list[dict[str, Any]]) -> list[datetime]:
409 """Parse valid ISO timestamps from a list of messages, stripping timezone info."""
410 timestamps = []
411 for msg in messages:
412 ts_str = msg.get("timestamp", "")
413 if ts_str:
414 try:
415 ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
416 if ts.tzinfo is not None:
417 ts = ts.replace(tzinfo=None)
418 timestamps.append(ts)
419 except (ValueError, AttributeError, TypeError):
420 pass
421 return timestamps
424def _link_sessions(sessions: dict[str, list[dict[str, Any]]]) -> list[SessionLink]:
425 """Identify sessions that are part of the same workflow."""
426 links: list[SessionLink] = []
427 session_ids = list(sessions.keys())
428 link_counter = 0
430 for i, session_a_id in enumerate(session_ids):
431 session_a = sessions[session_a_id]
432 if not session_a:
433 continue
435 # Extract session metadata
436 last_msg_a = session_a[-1] if session_a else {}
437 entities_a: set[str] = set()
438 for msg in session_a:
439 entities_a.update(extract_entities(msg.get("content", "")))
440 branch_a = last_msg_a.get("git_branch")
442 for session_b_id in session_ids[i + 1 :]:
443 session_b = sessions[session_b_id]
444 if not session_b:
445 continue
447 first_msg_b = session_b[0] if session_b else {}
448 entities_b: set[str] = set()
449 for msg in session_b:
450 entities_b.update(extract_entities(msg.get("content", "")))
451 branch_b = first_msg_b.get("git_branch")
453 # Calculate link score
454 score = 0.0
455 evidence: list[str] = []
457 # Same git branch (HIGH weight)
458 if branch_a and branch_a == branch_b:
459 score += 0.4
460 evidence.append("shared_branch")
462 # Explicit handoff marker (HIGH weight)
463 if any(_detect_handoff(msg.get("content", "")) for msg in session_a):
464 score += 0.4
465 evidence.append("handoff_detected")
467 # Shared entities (MEDIUM weight)
468 overlap = entity_overlap(entities_a, entities_b)
469 if overlap > 0.5:
470 score += 0.2
471 evidence.append("entity_overlap")
472 elif overlap > 0.3:
473 score += 0.1
474 evidence.append("partial_entity_overlap")
476 if score > 0.3:
477 link_counter += 1
479 # Calculate span
480 timestamps = _parse_timestamps(session_a + session_b)
482 span_hours = 0.0
483 if len(timestamps) >= 2:
484 try:
485 span_hours = (max(timestamps) - min(timestamps)).total_seconds() / 3600
486 except TypeError:
487 span_hours = 0.0
489 links.append(
490 SessionLink(
491 link_id=f"link-{link_counter:03d}",
492 sessions=[
493 {
494 "session_id": session_a_id,
495 "position": 1,
496 "link_evidence": evidence[0] if evidence else "score",
497 },
498 {
499 "session_id": session_b_id,
500 "position": 2,
501 "link_evidence": evidence[-1] if evidence else "score",
502 },
503 ],
504 unified_workflow={
505 "name": f"Linked workflow {link_counter}",
506 "total_messages": len(session_a) + len(session_b),
507 "span_hours": round(span_hours, 1),
508 "evidence": evidence,
509 },
510 confidence=min(score, 1.0),
511 )
512 )
514 return links
517def _cluster_by_entities(
518 messages: list[dict[str, Any]], overlap_threshold: float = 0.3
519) -> list[EntityCluster]:
520 """Cluster messages with significant entity overlap."""
521 clusters: list[EntityCluster] = []
522 cluster_counter = 0
524 for msg in messages:
525 content = msg.get("content", "")
526 msg_entities = extract_entities(content)
528 if not msg_entities:
529 continue
531 # Find matching cluster
532 matched_cluster = None
533 best_overlap = overlap_threshold
535 for cluster in clusters:
536 overlap = entity_overlap(msg_entities, cluster.all_entities)
537 if overlap > best_overlap:
538 best_overlap = overlap
539 matched_cluster = cluster
541 if matched_cluster:
542 entities_matched = sorted(msg_entities & matched_cluster.all_entities)
543 matched_cluster.all_entities.update(msg_entities)
544 matched_cluster.messages.append(
545 {
546 "uuid": msg.get("uuid", ""),
547 "content": content[:80] + "..." if len(content) > 80 else content,
548 "entities_matched": entities_matched,
549 "timestamp": msg.get("timestamp"),
550 }
551 )
552 # Update cohesion score (average overlap of messages)
553 matched_cluster.cohesion_score = (
554 matched_cluster.cohesion_score * (len(matched_cluster.messages) - 1) + best_overlap
555 ) / len(matched_cluster.messages)
556 else:
557 cluster_counter += 1
558 # Create new cluster
559 primary = sorted(msg_entities)[:3] # Top 3 entities
560 cluster = EntityCluster(
561 cluster_id=f"cluster-{cluster_counter:03d}",
562 primary_entities=primary,
563 all_entities=msg_entities.copy(),
564 messages=[
565 {
566 "uuid": msg.get("uuid", ""),
567 "content": content[:80] + "..." if len(content) > 80 else content,
568 "entities_matched": sorted(msg_entities),
569 "timestamp": msg.get("timestamp"),
570 }
571 ],
572 cohesion_score=1.0,
573 )
574 clusters.append(cluster)
576 # Populate span and inferred_workflow for each multi-message cluster
577 for cluster in clusters:
578 # Compute span from timestamps
579 timestamps = _parse_timestamps(cluster.messages)
580 if len(timestamps) >= 2:
581 cluster.span = {
582 "start": min(timestamps).isoformat(),
583 "end": max(timestamps).isoformat(),
584 }
586 # Infer workflow by matching cluster message content against WORKFLOW_TEMPLATES
587 cluster_categories: set[str] = set()
588 for m in cluster.messages:
589 lower = m.get("content", "").lower()
590 for category, keywords in _CONTENT_CATEGORY_MAP.items():
591 if any(kw in lower for kw in keywords):
592 cluster_categories.add(category)
594 best_name: str | None = None
595 best_score = 0.0
596 for template_name, template_cats in WORKFLOW_TEMPLATES.items():
597 template_set = set(template_cats)
598 if template_set:
599 overlap = len(cluster_categories & template_set) / len(template_set)
600 if overlap > best_score:
601 best_score = overlap
602 best_name = template_name
603 if best_score >= 0.3:
604 cluster.inferred_workflow = best_name
606 # Filter out single-message clusters
607 return [c for c in clusters if len(c.messages) >= 2]
610def _compute_boundaries(
611 messages: list[dict[str, Any]], boundary_threshold: float = 0.6
612) -> list[WorkflowBoundary]:
613 """Compute workflow boundaries between consecutive messages."""
614 boundaries: list[WorkflowBoundary] = []
616 # Sort by timestamp
617 sorted_msgs = sorted(messages, key=lambda m: m.get("timestamp", ""))
619 for i in range(len(sorted_msgs) - 1):
620 msg_a = sorted_msgs[i]
621 msg_b = sorted_msgs[i + 1]
623 # Parse timestamps
624 pair_timestamps = _parse_timestamps([msg_a, msg_b])
625 if len(pair_timestamps) == 2:
626 gap_seconds = int((pair_timestamps[1] - pair_timestamps[0]).total_seconds())
627 else:
628 gap_seconds = 0
630 # Calculate time gap weight
631 time_weight = calculate_boundary_weight(gap_seconds)
633 # Calculate entity overlap
634 entities_a = extract_entities(msg_a.get("content", ""))
635 entities_b = extract_entities(msg_b.get("content", ""))
636 overlap = entity_overlap(entities_a, entities_b)
638 # Adjust for entity overlap (reduce boundary weight if same topic)
639 final_score = time_weight
640 if overlap > 0.5:
641 final_score = max(0.0, time_weight - 0.3)
642 elif overlap > 0.3:
643 final_score = max(0.0, time_weight - 0.15)
645 is_boundary = final_score >= boundary_threshold
647 boundaries.append(
648 WorkflowBoundary(
649 msg_a=msg_a.get("uuid", ""),
650 msg_b=msg_b.get("uuid", ""),
651 time_gap_seconds=gap_seconds,
652 time_gap_weight=time_weight,
653 entity_overlap=overlap,
654 final_boundary_score=final_score,
655 is_boundary=is_boundary,
656 )
657 )
659 return boundaries
662def _get_message_category(msg_uuid: str, patterns: dict[str, Any]) -> str | None:
663 """Look up message category from Step 1 patterns."""
664 for category_info in patterns.get("category_distribution", []):
665 for example in category_info.get("example_messages", []):
666 if example.get("uuid") == msg_uuid:
667 category = category_info.get("category")
668 return category if isinstance(category, str) else None
669 return None
672def _detect_workflows(
673 messages: list[dict[str, Any]],
674 boundaries: list[WorkflowBoundary],
675 patterns: dict[str, Any],
676) -> list[Workflow]:
677 """Detect multi-step workflows using template matching."""
678 workflows: list[Workflow] = []
679 workflow_counter = 0
681 # Sort messages by timestamp
682 sorted_msgs = sorted(messages, key=lambda m: m.get("timestamp", ""))
684 # Build boundary index (msg_b uuid -> is_boundary)
685 boundary_before: dict[str, bool] = {}
686 for b in boundaries:
687 boundary_before[b.msg_b] = b.is_boundary
689 # Segment messages by boundaries
690 segments: list[list[dict[str, Any]]] = []
691 current_segment: list[dict[str, Any]] = []
693 for msg in sorted_msgs:
694 uuid = msg.get("uuid", "")
695 if boundary_before.get(uuid, False) and current_segment:
696 segments.append(current_segment)
697 current_segment = []
698 current_segment.append(msg)
700 if current_segment:
701 segments.append(current_segment)
703 # Match each segment against workflow templates
704 for segment in segments:
705 if len(segment) < 2:
706 continue
708 # Get categories for segment messages (from patterns)
709 segment_categories: list[str] = []
710 for msg in segment:
711 cat = _get_message_category(msg.get("uuid", ""), patterns)
712 if cat:
713 segment_categories.append(cat)
715 if len(segment_categories) < 2:
716 continue
718 # Find best matching template
719 best_match: tuple[str, float] | None = None
721 for template_name, template_cats in WORKFLOW_TEMPLATES.items():
722 # Check if template categories appear in sequence (allowing gaps)
723 template_idx = 0
724 matches = 0
726 for cat in segment_categories:
727 if template_idx < len(template_cats) and cat == template_cats[template_idx]:
728 matches += 1
729 template_idx += 1
731 if matches >= 2: # At least 2 template steps matched
732 confidence = matches / len(template_cats)
733 if best_match is None or confidence > best_match[1]:
734 best_match = (template_name, confidence)
736 if best_match:
737 workflow_counter += 1
739 # Calculate duration
740 timestamps = _parse_timestamps(segment)
742 duration_minutes = 0
743 if len(timestamps) >= 2:
744 try:
745 duration_minutes = int((max(timestamps) - min(timestamps)).total_seconds() / 60)
746 except TypeError:
747 duration_minutes = 0
749 # Get sessions
750 session_ids = list({msg.get("session_id", "") for msg in segment})
752 workflows.append(
753 Workflow(
754 workflow_id=f"wf-{workflow_counter:03d}",
755 name=f"Detected: {best_match[0]}",
756 pattern=best_match[0],
757 pattern_confidence=best_match[1],
758 messages=[
759 {
760 "uuid": msg.get("uuid", ""),
761 "category": _get_message_category(msg.get("uuid", ""), patterns),
762 "step": i + 1,
763 }
764 for i, msg in enumerate(segment)
765 ],
766 session_span=session_ids,
767 duration_minutes=duration_minutes,
768 )
769 )
771 return workflows
774# -----------------------------------------------------------------------------
775# Main API
776# -----------------------------------------------------------------------------
779def analyze_workflows(
780 messages_file: Path,
781 patterns_file: Path,
782 output_file: Path | None = None,
783 overlap_threshold: float = 0.3,
784 boundary_threshold: float = 0.6,
785 verbose: bool = False,
786 output_format: str = "yaml",
787) -> WorkflowAnalysis:
788 """Main entry point: analyze workflows from messages and patterns.
790 Args:
791 messages_file: Path to JSONL file with extracted user messages
792 patterns_file: Path to YAML file from Step 1 (workflow-pattern-analyzer)
793 output_file: Output path for step2-workflows.yaml (optional)
794 overlap_threshold: Minimum entity overlap to cluster messages together (default: 0.3)
795 boundary_threshold: Minimum boundary score to split workflow segments (default: 0.6)
796 verbose: Emit per-stage progress to stderr (default: False)
797 output_format: Output serialization format, "yaml" or "json" (default: "yaml")
799 Returns:
800 WorkflowAnalysis with all analysis results
801 """
802 # Load inputs
803 messages = _load_messages(messages_file)
804 patterns = _load_patterns(patterns_file)
806 # Build metadata
807 metadata = {
808 "source_file": messages_file.name,
809 "patterns_file": patterns_file.name,
810 "message_count": len(messages),
811 "analysis_timestamp": datetime.now().isoformat(),
812 "module": "workflow-sequence-analyzer",
813 "version": "1.0",
814 }
816 # Run analysis pipeline
817 sessions = _group_by_session(messages)
818 if verbose:
819 print(f"[1/4] Linking sessions across {len(sessions)} session(s)...", file=sys.stderr)
820 session_links = _link_sessions(sessions)
821 if verbose:
822 print(f" → {len(session_links)} link(s) found", file=sys.stderr)
823 if verbose:
824 print("[2/4] Clustering by entities...", file=sys.stderr)
825 entity_clusters = _cluster_by_entities(messages, overlap_threshold=overlap_threshold)
826 if verbose:
827 print(f" → {len(entity_clusters)} cluster(s) found", file=sys.stderr)
828 if verbose:
829 print("[3/4] Computing workflow boundaries...", file=sys.stderr)
830 boundaries = _compute_boundaries(messages, boundary_threshold=boundary_threshold)
831 if verbose:
832 print(f" → {len(boundaries)} boundary/boundaries found", file=sys.stderr)
833 if verbose:
834 print("[4/4] Detecting workflows...", file=sys.stderr)
835 workflows = _detect_workflows(messages, boundaries, patterns)
836 if verbose:
837 print(f" → {len(workflows)} workflow(s) detected", file=sys.stderr)
839 # Cross-reference: link workflows to entity clusters and populate handoff_points
840 uuid_to_cluster: dict[str, str] = {}
841 for cluster in entity_clusters:
842 for msg in cluster.messages:
843 uuid = msg.get("uuid", "")
844 if uuid:
845 uuid_to_cluster[uuid] = cluster.cluster_id
847 uuid_to_content: dict[str, str] = {
848 m.get("uuid", ""): m.get("content", "") for m in messages if m.get("uuid", "")
849 }
851 for workflow in workflows:
852 cluster_votes: dict[str, int] = {}
853 for msg in workflow.messages:
854 cluster_id = uuid_to_cluster.get(msg.get("uuid", ""))
855 if cluster_id:
856 cluster_votes[cluster_id] = cluster_votes.get(cluster_id, 0) + 1
857 if cluster_votes:
858 workflow.entity_cluster = max(cluster_votes, key=cluster_votes.__getitem__)
860 for msg in workflow.messages:
861 uuid = msg.get("uuid", "")
862 if uuid and _detect_handoff(uuid_to_content.get(uuid, "")):
863 workflow.handoff_points.append({"uuid": uuid, "type": "explicit_handoff"})
865 # Compute handoff analysis
866 handoff_count = sum(
867 1
868 for link in session_links
869 if "handoff_detected" in link.unified_workflow.get("evidence", [])
870 )
872 handoff_analysis: dict[str, Any] = {
873 "total_handoffs": handoff_count,
874 "handoff_patterns": [
875 {"pattern": "explicit_handoff", "count": handoff_count},
876 {"pattern": "session_timeout", "count": len(session_links) - handoff_count},
877 ],
878 "recommendations": [],
879 }
881 if len(session_links) > handoff_count:
882 handoff_analysis["recommendations"].append(
883 "Consider using /ll:handoff for cleaner session transitions"
884 )
886 # Build result
887 analysis = WorkflowAnalysis(
888 metadata=metadata,
889 session_links=session_links,
890 entity_clusters=entity_clusters,
891 workflow_boundaries=boundaries,
892 workflows=workflows,
893 handoff_analysis=handoff_analysis,
894 )
896 # Write output if path provided
897 if output_file:
898 output_file = Path(output_file)
899 output_file.parent.mkdir(parents=True, exist_ok=True)
900 with open(output_file, "w", encoding="utf-8") as f:
901 if output_format == "json":
902 json.dump(analysis.to_dict(), f, indent=2, default=str)
903 else:
904 yaml.dump(analysis.to_dict(), f, default_flow_style=False, sort_keys=False)
906 return analysis
909# -----------------------------------------------------------------------------
910# CLI
911# -----------------------------------------------------------------------------
914def main() -> int:
915 """Entry point for ll-workflows command.
917 Analyze workflows from user messages and Step 1 patterns.
919 Returns:
920 Exit code (0 = success, 1 = failure)
921 """
922 import argparse
923 import sys
925 parser = argparse.ArgumentParser(
926 description="Identify multi-step workflow patterns from user message history",
927 formatter_class=argparse.RawDescriptionHelpFormatter,
928 epilog="""
929Examples:
930 %(prog)s analyze --input messages.jsonl --patterns step1.yaml
931 %(prog)s analyze -i messages.jsonl -p patterns.yaml -o output.yaml
932 %(prog)s analyze --input .claude/user-messages.jsonl \\
933 --patterns .claude/workflow-analysis/step1-patterns.yaml
934""",
935 )
937 subparsers = parser.add_subparsers(dest="command", help="Available commands")
939 # analyze subcommand
940 analyze_parser = subparsers.add_parser(
941 "analyze",
942 help="Analyze workflows from messages and patterns",
943 )
944 analyze_parser.add_argument(
945 "-i",
946 "--input",
947 type=Path,
948 required=True,
949 help="Input JSONL file with user messages",
950 )
951 analyze_parser.add_argument(
952 "-p",
953 "--patterns",
954 type=Path,
955 required=True,
956 help="Input YAML file from Step 1 (workflow-pattern-analyzer)",
957 )
958 analyze_parser.add_argument(
959 "-o",
960 "--output",
961 type=Path,
962 default=None,
963 help="Output file (default: .claude/workflow-analysis/step2-workflows.yaml or .json)",
964 )
965 analyze_parser.add_argument(
966 "-f",
967 "--format",
968 choices=["yaml", "json"],
969 default="yaml",
970 help="Output format (default: yaml)",
971 )
972 analyze_parser.add_argument(
973 "-v",
974 "--verbose",
975 action="store_true",
976 help="Print verbose progress information",
977 )
978 analyze_parser.add_argument(
979 "--overlap-threshold",
980 type=float,
981 default=0.3,
982 metavar="FLOAT",
983 help="Minimum entity overlap to cluster messages together (default: 0.3)",
984 )
985 analyze_parser.add_argument(
986 "--boundary-threshold",
987 type=float,
988 default=0.6,
989 metavar="FLOAT",
990 help="Minimum boundary score to split workflow segments (default: 0.6)",
991 )
993 args = parser.parse_args()
995 if args.command is None:
996 parser.print_help()
997 return 1
999 if args.command == "analyze":
1000 # Validate input files
1001 if not args.input.exists():
1002 print(f"Error: Input file not found: {args.input}", file=sys.stderr)
1003 return 1
1005 if not args.patterns.exists():
1006 print(f"Error: Patterns file not found: {args.patterns}", file=sys.stderr)
1007 return 1
1009 # Validate threshold ranges
1010 if not (0.0 <= args.overlap_threshold <= 1.0):
1011 print(
1012 f"Error: --overlap-threshold must be in [0.0, 1.0], got {args.overlap_threshold}",
1013 file=sys.stderr,
1014 )
1015 return 1
1017 if not (0.0 <= args.boundary_threshold <= 1.0):
1018 print(
1019 f"Error: --boundary-threshold must be in [0.0, 1.0], got {args.boundary_threshold}",
1020 file=sys.stderr,
1021 )
1022 return 1
1024 # Set default output path
1025 output_path = args.output
1026 if output_path is None:
1027 if args.format == "json":
1028 output_path = Path(".claude/workflow-analysis/step2-workflows.json")
1029 else:
1030 output_path = Path(".claude/workflow-analysis/step2-workflows.yaml")
1032 if args.verbose:
1033 print(f"Input: {args.input}")
1034 print(f"Patterns: {args.patterns}")
1035 print(f"Output: {output_path}")
1037 try:
1038 analysis = analyze_workflows(
1039 messages_file=args.input,
1040 patterns_file=args.patterns,
1041 output_file=output_path,
1042 overlap_threshold=args.overlap_threshold,
1043 boundary_threshold=args.boundary_threshold,
1044 verbose=args.verbose,
1045 output_format=args.format,
1046 )
1048 if args.verbose:
1049 print(f"Analyzed {analysis.metadata['message_count']} messages")
1050 print(f"Found {len(analysis.session_links)} session links")
1051 print(f"Found {len(analysis.entity_clusters)} entity clusters")
1052 print(f"Detected {len(analysis.workflows)} workflows")
1053 print(f"Output written to: {output_path}")
1055 return 0
1057 except Exception as e:
1058 print(f"Error: {e}", file=sys.stderr)
1059 return 1
1061 return 1
1064if __name__ == "__main__":
1065 raise SystemExit(main())