Coverage for little_loops / cli / loop / layout.py: 4%
934 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:20 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:20 -0500
1"""FSM diagram layout engine.
3Implements adaptive layout for FSM diagrams using a Sugiyama-inspired
4layered graph drawing approach. Supports topology detection, vertical
5linear chains, side-by-side branching, and margin back-edge rendering.
7Extracted from info.py and extended with adaptive layout capabilities.
8"""
10from __future__ import annotations
12import re
13from collections import deque
15from wcwidth import wcswidth as _wcswidth
16from wcwidth import wcwidth as _wcwidth
18from little_loops.cli.output import colorize, terminal_width
19from little_loops.fsm.schema import FSMLoop, StateConfig
21# ---------------------------------------------------------------------------
22# Edge label colorization
23# ---------------------------------------------------------------------------
25_EDGE_LABEL_COLORS: dict[str, str] = {
26 "yes": "32",
27 "no": "38;5;208",
28 "error": "31",
29 "partial": "33",
30 "next": "2",
31 "_": "2",
32}
35def _colorize_label(label: str) -> str:
36 """Colorize a (possibly compound) edge label like 'no/error'."""
37 parts = label.split("/")
38 code = ""
39 for part in parts:
40 if part in ("no", "error"):
41 code = _EDGE_LABEL_COLORS["no"]
42 break
43 if part == "partial" and not code:
44 code = _EDGE_LABEL_COLORS["partial"]
45 elif part == "yes" and not code:
46 code = _EDGE_LABEL_COLORS["yes"]
47 elif part in ("next", "_") and not code:
48 code = _EDGE_LABEL_COLORS["next"]
49 return colorize(label, code) if code else label
52def _colorize_diagram_labels(diagram: str) -> str:
53 """Apply ANSI color to known edge labels in a rendered diagram string.
55 Labels are colorized only when bounded by box-drawing or whitespace chars
56 to avoid coloring partial matches inside state names.
57 """
58 for label, code in _EDGE_LABEL_COLORS.items():
59 colored = colorize(label, code)
60 diagram = re.sub(
61 r"(?<=[─ │▶\n])" + re.escape(label) + r"(?=[─ │▶\n])",
62 colored,
63 diagram,
64 )
65 return diagram
68# ---------------------------------------------------------------------------
69# State box badge definitions
70# ---------------------------------------------------------------------------
72_ACTION_TYPE_BADGES: dict[str, str] = {
73 "prompt": "\u2726", # ✦
74 "slash_command": "/\u2501\u25ba", # /━►
75 "shell": "\u276f_", # ❯_
76 "mcp_tool": "\u26a1", # ⚡
77}
79_SUB_LOOP_BADGE = "\u21b3\u27f3" # ↳⟳
82def _badge_display_width(badge: str) -> int:
83 """Compute terminal display width of a badge string using wcwidth."""
84 w = _wcswidth(badge)
85 return w if w >= 0 else len(badge)
88def _get_state_badge(state: StateConfig | None) -> str:
89 """Return the unicode badge string for a state, or '' if none."""
90 if state is None:
91 return ""
92 if state.loop is not None:
93 return _SUB_LOOP_BADGE
94 if state.action_type:
95 return _ACTION_TYPE_BADGES.get(state.action_type, f"[{state.action_type}]")
96 if state.action:
97 return _ACTION_TYPE_BADGES["shell"]
98 return ""
101# ---------------------------------------------------------------------------
102# Box content helpers for multi-row diagram boxes
103# ---------------------------------------------------------------------------
106def _box_inner_lines(
107 state: StateConfig | None,
108 display_label: str,
109 verbose: bool,
110 inner_width: int,
111) -> list[str]:
112 """Return interior lines for a state box (between top and bottom borders).
114 The first line is always ``display_label`` + type badge (if any).
115 Subsequent lines are action content lines. All lines fit within
116 ``inner_width`` characters (content is truncated or wrapped accordingly).
117 """
118 # Badge is now rendered in the top-right corner by _draw_box; name row is label only
119 name_line = display_label[:inner_width]
121 lines: list[str] = [name_line]
123 # Action lines
124 if state and state.action:
125 action_src = [ln.rstrip() for ln in state.action.strip().splitlines()]
126 if verbose:
127 for src in action_src:
128 if not src:
129 continue
130 # Wrap long lines to inner_width
131 while len(src) > inner_width:
132 lines.append(src[:inner_width])
133 src = src[inner_width:]
134 if src:
135 lines.append(src)
136 else:
137 # Show first non-empty line, truncated
138 first = next((ln for ln in action_src if ln), "")
139 if len(first) > inner_width:
140 first = first[: inner_width - 1] + "\u2026"
141 if first:
142 lines.append(first)
144 return lines
147# ---------------------------------------------------------------------------
148# Topology detection
149# ---------------------------------------------------------------------------
152def _collect_edges(fsm: FSMLoop) -> list[tuple[str, str, str]]:
153 """Collect all (source, target, label) edges from an FSM."""
154 edges: list[tuple[str, str, str]] = []
155 for name, state in fsm.states.items():
156 if state.on_yes:
157 edges.append((name, state.on_yes, "yes"))
158 if state.on_no:
159 edges.append((name, state.on_no, "no"))
160 if state.on_error:
161 edges.append((name, state.on_error, "error"))
162 if state.on_partial:
163 edges.append((name, state.on_partial, "partial"))
164 if state.next:
165 edges.append((name, state.next, "next"))
166 if state.route:
167 for verdict, target in state.route.routes.items():
168 edges.append((name, target, verdict))
169 if state.route.default:
170 edges.append((name, state.route.default, "_"))
171 return edges
174def _bfs_order(initial: str, edges: list[tuple[str, str, str]]) -> tuple[list[str], dict[str, int]]:
175 """BFS from initial state. Returns (order, depth_map)."""
176 order: list[str] = []
177 depth: dict[str, int] = {initial: 0}
178 queue: deque[str] = deque([initial])
179 while queue:
180 node = queue.popleft()
181 order.append(node)
182 for src, dst, _ in edges:
183 if src == node and dst not in depth:
184 depth[dst] = depth[node] + 1
185 queue.append(dst)
186 return order, depth
189def _trace_main_path(
190 fsm: FSMLoop, edges: list[tuple[str, str, str]]
191) -> tuple[list[str], set[tuple[str, str]]]:
192 """Trace the main (happy) path through the FSM."""
193 visited: set[str] = set()
194 main_path: list[str] = []
195 main_edge_set: set[tuple[str, str]] = set()
196 current = fsm.initial
197 while current and current not in visited:
198 visited.add(current)
199 main_path.append(current)
200 st = fsm.states.get(current)
201 if not st or st.terminal:
202 break
203 nxt: str = st.on_yes or st.next or ""
204 if not nxt and st.route:
205 nxt = next(iter(st.route.routes.values()), "") or st.route.default or ""
206 if nxt:
207 main_edge_set.add((current, nxt))
208 current = nxt
209 else:
210 break
211 return main_path, main_edge_set
214def _classify_edges(
215 edges: list[tuple[str, str, str]],
216 main_edge_set: set[tuple[str, str]],
217 bfs_order: list[str],
218) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
219 """Split non-main edges into branches and back_edges."""
220 main_consumed: set[int] = set()
221 for src, dst in main_edge_set:
222 for i, (s, d, _) in enumerate(edges):
223 if s == src and d == dst and i not in main_consumed:
224 main_consumed.add(i)
225 break
227 branches: list[tuple[str, str, str]] = []
228 back_edges: list[tuple[str, str, str]] = []
229 for i, (src, dst, label) in enumerate(edges):
230 if i in main_consumed:
231 continue
232 src_pos = bfs_order.index(src) if src in bfs_order else len(bfs_order)
233 dst_pos = bfs_order.index(dst) if dst in bfs_order else len(bfs_order)
234 if dst == src or dst_pos < src_pos:
235 back_edges.append((src, dst, label))
236 else:
237 branches.append((src, dst, label))
238 return branches, back_edges
241class TopologyDetector:
242 """Classify FSM graph topology for layout strategy selection."""
244 def __init__(
245 self,
246 edges: list[tuple[str, str, str]],
247 main_path: list[str],
248 branches: list[tuple[str, str, str]],
249 back_edges: list[tuple[str, str, str]],
250 ) -> None:
251 self._edges = edges
252 self._main_path = main_path
253 self._branches = branches
254 self._back_edges = back_edges
256 def classify(self) -> str:
257 """Return 'linear', 'tree', or 'general'.
259 Linear: main path only, no non-self branches, only self-loop back-edges.
260 Tree: branches exist but no fan-in (every non-initial state has ≤1 incoming).
261 General: everything else (full Sugiyama needed).
262 """
263 non_self_branches = [b for b in self._branches if b[0] != b[1]]
264 non_self_back = [b for b in self._back_edges if b[0] != b[1]]
266 if not non_self_branches and not non_self_back:
267 return "linear"
269 # Check for fan-in: any state with >1 incoming forward edge
270 in_count: dict[str, int] = {}
271 for _, dst, _ in self._edges:
272 # Only count forward edges (not back-edges)
273 in_count[dst] = in_count.get(dst, 0) + 1
275 if not non_self_back and all(v <= 1 for v in in_count.values()):
276 return "tree"
278 return "general"
281# ---------------------------------------------------------------------------
282# Sugiyama layout pipeline
283# ---------------------------------------------------------------------------
286class LayerAssigner:
287 """Assign nodes to layers using longest-path + width constraint."""
289 def __init__(
290 self,
291 all_states: list[str],
292 edges: list[tuple[str, str, str]],
293 back_edge_set: set[tuple[str, str]],
294 initial: str,
295 max_width: int = 4,
296 ) -> None:
297 self._all_states = all_states
298 self._edges = edges
299 self._back_edge_set = back_edge_set
300 self._initial = initial
301 self._max_width = max_width
303 def assign(self) -> list[list[str]]:
304 """Return list of layers, each a list of state names (top to bottom)."""
305 # Build adjacency (forward edges only)
306 forward: dict[str, list[str]] = {s: [] for s in self._all_states}
307 reverse: dict[str, list[str]] = {s: [] for s in self._all_states}
308 seen_edges: set[tuple[str, str]] = set()
309 for src, dst, _ in self._edges:
310 if (src, dst) in self._back_edge_set or src == dst:
311 continue
312 if src in forward and dst in forward and (src, dst) not in seen_edges:
313 forward[src].append(dst)
314 reverse[dst].append(src)
315 seen_edges.add((src, dst))
317 # Longest-path layer assignment (topological order)
318 layer_of: dict[str, int] = {}
320 # Kahn's algorithm for topological order
321 in_degree = {s: len(reverse[s]) for s in self._all_states}
322 queue: deque[str] = deque()
323 for s in self._all_states:
324 if in_degree[s] == 0:
325 queue.append(s)
327 topo_order: list[str] = []
328 while queue:
329 node = queue.popleft()
330 topo_order.append(node)
331 for dst in forward[node]:
332 in_degree[dst] -= 1
333 if in_degree[dst] == 0:
334 queue.append(dst)
336 # Handle nodes not reached by topo sort (cycles in forward graph)
337 for s in self._all_states:
338 if s not in set(topo_order):
339 topo_order.append(s)
341 # Assign layers: longest path from root
342 for node in topo_order:
343 if not reverse[node]:
344 layer_of[node] = 0
345 else:
346 layer_of[node] = max(
347 (layer_of.get(p, 0) + 1 for p in reverse[node]),
348 default=0,
349 )
351 # Ensure initial state is at layer 0
352 if self._initial in layer_of and layer_of[self._initial] != 0:
353 offset = layer_of[self._initial]
354 for s in layer_of:
355 layer_of[s] -= offset
357 # Build layers list
358 max_layer = max(layer_of.values(), default=0)
359 layers: list[list[str]] = [[] for _ in range(max_layer + 1)]
360 for s in self._all_states:
361 layer = layer_of.get(s, 0)
362 layers[layer].append(s)
364 # Width constraint: if any layer exceeds max_width, split
365 if self._max_width > 0:
366 new_layers: list[list[str]] = []
367 for grp in layers:
368 remaining = list(grp)
369 while len(remaining) > self._max_width:
370 new_layers.append(remaining[: self._max_width])
371 remaining = remaining[self._max_width :]
372 if remaining:
373 new_layers.append(remaining)
374 layers = new_layers
376 return layers
379class CrossingMinimizer:
380 """Minimize edge crossings using barycenter heuristic."""
382 def __init__(
383 self,
384 layers: list[list[str]],
385 edges: list[tuple[str, str, str]],
386 back_edge_set: set[tuple[str, str]],
387 ) -> None:
388 self._layers = layers
389 self._edges = edges
390 self._back_edge_set = back_edge_set
392 def minimize(self) -> list[list[str]]:
393 """Return reordered layers with reduced crossings."""
394 # Build position lookup
395 layer_of: dict[str, int] = {}
396 for i, layer in enumerate(self._layers):
397 for s in layer:
398 layer_of[s] = i
400 # Forward adjacency (non-back, non-self)
401 adj_down: dict[str, list[str]] = {}
402 adj_up: dict[str, list[str]] = {}
403 for src, dst, _ in self._edges:
404 if (src, dst) in self._back_edge_set or src == dst:
405 continue
406 if src in layer_of and dst in layer_of:
407 adj_down.setdefault(src, []).append(dst)
408 adj_up.setdefault(dst, []).append(src)
410 layers = [list(layer) for layer in self._layers]
412 # 3 sweeps: down, up, down
413 for sweep in range(3):
414 if sweep % 2 == 0:
415 # Top-down sweep
416 for i in range(1, len(layers)):
417 pos_above = {s: j for j, s in enumerate(layers[i - 1])}
418 bary: dict[str, float] = {}
419 for s in layers[i]:
420 parents = [p for p in adj_up.get(s, []) if p in pos_above]
421 if parents:
422 bary[s] = sum(pos_above[p] for p in parents) / len(parents)
423 else:
424 bary[s] = float(layers[i].index(s))
425 layers[i].sort(key=lambda s: bary.get(s, 0))
426 else:
427 # Bottom-up sweep
428 for i in range(len(layers) - 2, -1, -1):
429 pos_below = {s: j for j, s in enumerate(layers[i + 1])}
430 bary_up: dict[str, float] = {}
431 for s in layers[i]:
432 children = [c for c in adj_down.get(s, []) if c in pos_below]
433 if children:
434 bary_up[s] = sum(pos_below[c] for c in children) / len(children)
435 else:
436 bary_up[s] = float(layers[i].index(s))
437 layers[i].sort(key=lambda s: bary_up.get(s, 0))
439 return layers
442# ---------------------------------------------------------------------------
443# Rendering helpers
444# ---------------------------------------------------------------------------
447def _compute_display_labels(
448 all_states: list[str],
449 initial: str,
450 terminal_states: set[str],
451) -> dict[str, str]:
452 """Compute display labels with → prefix and ◉ suffix."""
453 display_label: dict[str, str] = {}
454 for s in all_states:
455 label = s
456 if s == initial:
457 label = "\u2192 " + label
458 if s in terminal_states:
459 label = label + " \u25c9"
460 display_label[s] = label
461 return display_label
464def _compute_box_sizes(
465 all_states: list[str],
466 display_label: dict[str, str],
467 fsm_states: dict[str, StateConfig] | None,
468 verbose: bool,
469 max_box_inner: int,
470) -> tuple[dict[str, list[str]], dict[str, int], dict[str, int], dict[str, str]]:
471 """Compute box content, widths, and heights for all states.
473 Returns (box_inner, box_width, box_height, box_badge).
474 """
475 box_inner: dict[str, list[str]] = {}
476 box_width: dict[str, int] = {}
477 box_badge: dict[str, str] = {}
479 for s in all_states:
480 state_obj = fsm_states.get(s) if fsm_states else None
482 badge = _get_state_badge(state_obj)
483 badge_w = _badge_display_width(badge) if badge else 0
484 box_badge[s] = badge
486 # Width must fit: name label on content row, badge on top border
487 base_w = max(len(display_label[s]), badge_w)
489 inner_w = base_w
490 if state_obj and state_obj.action and max_box_inner > 0:
491 action_lines = state_obj.action.strip().splitlines()
492 if verbose:
493 max_action_w = max(
494 (len(ln.rstrip()) for ln in action_lines if ln.rstrip()), default=0
495 )
496 inner_w = max(base_w, min(max_action_w, max_box_inner))
497 else:
498 first_action = next((ln.rstrip() for ln in action_lines if ln.rstrip()), "")
499 inner_w = max(base_w, min(len(first_action), max_box_inner))
501 content = _box_inner_lines(state_obj, display_label[s], verbose, inner_w)
502 actual_w = max(len(ln) for ln in content)
503 inner_w = max(inner_w, actual_w)
504 box_inner[s] = content
505 box_width[s] = inner_w + 4 # "│ " + content + " │"
507 box_height: dict[str, int] = {s: len(box_inner[s]) + 2 for s in all_states}
508 return box_inner, box_width, box_height, box_badge
511def _draw_box(
512 grid: list[list[str]],
513 row: int,
514 col: int,
515 width: int,
516 height: int,
517 content: list[str],
518 is_highlighted: bool,
519 highlight_color: str,
520 badge: str = "",
521) -> None:
522 """Draw a state box onto a character grid at (row, col).
524 If *badge* is provided it is placed right-aligned in the top border row,
525 immediately before the ┐ corner character.
526 """
527 total_width = len(grid[0]) if grid else 0
529 def _bc(ch: str) -> str:
530 return colorize(ch, highlight_color) if is_highlighted else ch
532 # Top border: ┌ ─ ─ … ─ ┐
533 if col < total_width:
534 grid[row][col] = _bc("\u250c")
535 for j in range(1, width - 1):
536 if col + j < total_width:
537 grid[row][col + j] = _bc("\u2500")
538 if col + width - 1 < total_width:
539 grid[row][col + width - 1] = _bc("\u2510")
541 # Overlay badge in top-right corner (before ┐)
542 if badge:
543 badge_w = _badge_display_width(badge)
544 pos = col + width - 1 - badge_w
545 for ch in badge:
546 ch_w = _wcwidth(ch)
547 if ch_w < 1:
548 ch_w = 1
549 if col + 1 <= pos < col + width - 1 and pos < total_width:
550 grid[row][pos] = ch
551 if ch_w == 2 and pos + 1 < col + width - 1 and pos + 1 < total_width:
552 grid[row][pos + 1] = ""
553 pos += ch_w
555 # Content rows
556 for i, line in enumerate(content):
557 r = row + i + 1
558 if r >= len(grid):
559 break
560 if col < total_width:
561 grid[r][col] = _bc("\u2502")
562 if col + width - 1 < total_width:
563 grid[r][col + width - 1] = _bc("\u2502")
564 if is_highlighted and i == 0:
565 colored_line = colorize(line, f"{highlight_color};1")
566 if col + 2 < total_width:
567 grid[r][col + 2] = colored_line
568 for j in range(1, len(line)):
569 if col + 2 + j < col + width - 1:
570 grid[r][col + 2 + j] = ""
571 else:
572 for j, ch in enumerate(line):
573 if col + 2 + j < col + width - 1:
574 grid[r][col + 2 + j] = ch
576 # Padding rows between content and bottom border
577 for i in range(len(content) + 1, height - 1):
578 r = row + i
579 if r >= len(grid):
580 break
581 if col < total_width:
582 grid[r][col] = _bc("\u2502")
583 if col + width - 1 < total_width:
584 grid[r][col + width - 1] = _bc("\u2502")
586 # Bottom border
587 brow = row + height - 1
588 if brow < len(grid):
589 if col < total_width:
590 grid[brow][col] = _bc("\u2514")
591 for j in range(1, width - 1):
592 if col + j < total_width:
593 grid[brow][col + j] = _bc("\u2500")
594 if col + width - 1 < total_width:
595 grid[brow][col + width - 1] = _bc("\u2518")
598# ---------------------------------------------------------------------------
599# Layered (vertical) renderer
600# ---------------------------------------------------------------------------
603def _render_layered_diagram(
604 layers: list[list[str]],
605 edges: list[tuple[str, str, str]],
606 main_edge_set: set[tuple[str, str]],
607 branches: list[tuple[str, str, str]],
608 back_edges: list[tuple[str, str, str]],
609 initial: str,
610 terminal_states: set[str] | None,
611 fsm_states: dict[str, StateConfig] | None,
612 verbose: bool,
613 highlight_state: str | None,
614 highlight_color: str,
615) -> str:
616 """Render FSM using layered (Sugiyama-style) vertical layout."""
617 terminal_states = terminal_states or set()
618 fsm_states = fsm_states or {}
619 tw = terminal_width()
621 # Flatten layers to get all states
622 all_states = [s for layer in layers for s in layer]
623 if not all_states:
624 return ""
626 display_label = _compute_display_labels(all_states, initial, terminal_states)
628 # Compute max_box_inner based on widest layer
629 max_layer_size = max(len(layer) for layer in layers)
630 if verbose:
631 max_box_inner = max(20, min(60, (tw - 4) // max(1, max_layer_size) - 6))
632 else:
633 max_box_inner = max(20, min(40, (tw - 4) // max(1, max_layer_size) - 6))
635 box_inner, box_width, box_height, box_badge = _compute_box_sizes(
636 all_states, display_label, fsm_states, verbose, max_box_inner
637 )
639 # Post-hoc layer merge: re-merge adjacent single-state layers that were
640 # over-split by the conservative max_width_per_layer estimate. Only merge
641 # when both layers receive an edge from the same source state (indicating
642 # they were originally one layer split by width constraint).
643 available_w = tw - 20 # conservative content-area estimate
644 gap_between = 6
645 # Build edge target sets: for each state, which earlier states point to it
646 _incoming: dict[str, set[str]] = {s: set() for layer in layers for s in layer}
647 for src, dst, _ in edges:
648 if src != dst and dst in _incoming:
649 _incoming[dst].add(src)
650 merged = True
651 while merged:
652 merged = False
653 i = 0
654 while i < len(layers) - 1:
655 la, lb = layers[i], layers[i + 1]
656 # Only merge single-state layers that share an incoming source
657 if len(la) == 1 and len(lb) == 1:
658 sources_a = _incoming.get(la[0], set())
659 sources_b = _incoming.get(lb[0], set())
660 shared_source = sources_a & sources_b
661 else:
662 shared_source = set()
663 combined_w = (
664 sum(box_width[s] for s in la)
665 + gap_between * (len(la) - 1)
666 + gap_between
667 + sum(box_width[s] for s in lb)
668 + gap_between * (len(lb) - 1)
669 )
670 if shared_source and combined_w <= available_w and len(la) + len(lb) <= 4:
671 layers[i] = la + lb
672 layers.pop(i + 1)
673 merged = True
674 else:
675 i += 1
677 # Collect ALL non-self-loop forward edge labels (main + branches + same-depth back-edges)
678 # Multiple edges between the same pair are combined as "label1/label2"
679 forward_edge_labels: dict[tuple[str, str], str] = {}
680 for src, dst, lbl in edges:
681 if src == dst:
682 continue
683 if (src, dst) in main_edge_set or (src, dst, lbl) in branches:
684 if (src, dst) in forward_edge_labels:
685 forward_edge_labels[(src, dst)] += "/" + lbl
686 else:
687 forward_edge_labels[(src, dst)] = lbl
689 # True back-edges: only those going to an earlier layer (computed after layer assignment)
690 # Will be finalized below after col positions are computed
691 # Combine same-pair back-edges into single entries with merged labels (e.g. "error/fail")
692 back_edge_labels_initial: dict[tuple[str, str], str] = {}
693 for s, d, lbl in back_edges:
694 if s != d:
695 if (s, d) in back_edge_labels_initial:
696 back_edge_labels_initial[(s, d)] += "/" + lbl
697 else:
698 back_edge_labels_initial[(s, d)] = lbl
700 # Pre-compute layer positions to detect main-path cycle edges early.
701 # This ensures back_edge_margin accounts for ALL backward-pointing edges
702 # (including main-path cycles like commit → initial_state) before column
703 # positions are computed.
704 prelim_layer_of: dict[str, int] = {}
705 for li, layer in enumerate(layers):
706 for s in layer:
707 prelim_layer_of[s] = li
709 # Include main-path/branch edges that point backward in margin estimate
710 all_back_labels: dict[tuple[str, str], str] = dict(back_edge_labels_initial)
711 for (src, dst), lbl in forward_edge_labels.items():
712 src_layer = prelim_layer_of.get(src, -1)
713 dst_layer = prelim_layer_of.get(dst, -1)
714 if dst_layer < src_layer:
715 if (src, dst) in all_back_labels:
716 all_back_labels[(src, dst)] += "/" + lbl
717 else:
718 all_back_labels[(src, dst)] = lbl
720 non_self_back_initial = [(s, d, lbl) for (s, d), lbl in all_back_labels.items()]
721 back_edge_margin = 0
722 if non_self_back_initial:
723 max_label_len = max(len(lbl) for _, _, lbl in non_self_back_initial)
724 n_back_initial = len(non_self_back_initial)
725 back_edge_margin = max_label_len + max(6, 2 * n_back_initial + 2)
727 content_left = 2 + back_edge_margin
729 # Self-loops per state
730 self_loops: dict[str, list[str]] = {}
731 for src, dst, lbl in back_edges:
732 if src == dst:
733 self_loops.setdefault(src, []).append(lbl)
735 # --- Compute a common center column for alignment ---
736 # For layers with single boxes, we want vertical alignment through a
737 # shared center column. Use the widest single-state layer's center.
738 max_single_w = max((box_width[layer[0]] for layer in layers if len(layer) == 1), default=0)
739 # The common center is at content_left + max_single_w // 2
740 common_center = content_left + max_single_w // 2
742 # Compute column positions per layer
743 col_start: dict[str, int] = {}
744 col_center: dict[str, int] = {}
745 layer_of: dict[str, int] = {}
747 for li, layer in enumerate(layers):
748 if len(layer) == 1:
749 # Single-state layer: center-align to common center
750 sname = layer[0]
751 col_start[sname] = common_center - box_width[sname] // 2
752 col_center[sname] = common_center
753 layer_of[sname] = li
754 else:
755 # Multi-state layer: place side-by-side, centered around common_center
756 gap_between = 6
757 total_layer_w = sum(box_width[s] for s in layer) + gap_between * (len(layer) - 1)
758 x = common_center - total_layer_w // 2
759 x = max(content_left, x)
760 for i, sname in enumerate(layer):
761 col_start[sname] = x
762 col_center[sname] = x + box_width[sname] // 2
763 layer_of[sname] = li
764 if i < len(layer) - 1:
765 next_s = layer[i + 1]
766 # Check for edge labels in both directions between adjacent states
767 label_fwd = forward_edge_labels.get((sname, next_s), "")
768 label_rev = forward_edge_labels.get((next_s, sname), "")
769 max_label = max(len(label_fwd), len(label_rev))
770 gap = max(gap_between, max_label + 6) if max_label > 0 else gap_between
771 x += box_width[sname] + gap
772 else:
773 x += box_width[sname]
775 # Reclassify back-edges based on actual layer positions
776 # Only edges going to an earlier layer are true margin back-edges
777 # Combine same-pair edges into merged labels (e.g. "error/fail")
778 back_edge_labels_reclass: dict[tuple[str, str], str] = {}
779 same_layer_edges: list[tuple[str, str, str]] = []
780 for src, dst, lbl in back_edges:
781 if src == dst:
782 continue
783 src_layer = layer_of.get(src, -1)
784 dst_layer = layer_of.get(dst, -1)
785 if dst_layer < src_layer:
786 if (src, dst) in back_edge_labels_reclass:
787 back_edge_labels_reclass[(src, dst)] += "/" + lbl
788 else:
789 back_edge_labels_reclass[(src, dst)] = lbl
790 elif dst_layer == src_layer:
791 same_layer_edges.append((src, dst, lbl))
792 else: # dst_layer > src_layer: actually forward edge
793 if (src, dst) in forward_edge_labels:
794 forward_edge_labels[(src, dst)] += "/" + lbl
795 else:
796 forward_edge_labels[(src, dst)] = lbl
798 # Also reclassify main/branch edges in forward_edge_labels that point backward
799 # after layer assignment (e.g. main-path cycle edges like commit → initial_state)
800 backward_in_fwd: list[tuple[str, str]] = []
801 for (src, dst), lbl in forward_edge_labels.items():
802 src_layer = layer_of.get(src, -1)
803 dst_layer = layer_of.get(dst, -1)
804 if dst_layer < src_layer:
805 backward_in_fwd.append((src, dst))
806 if (src, dst) in back_edge_labels_reclass:
807 back_edge_labels_reclass[(src, dst)] += "/" + lbl
808 else:
809 back_edge_labels_reclass[(src, dst)] = lbl
810 elif dst_layer == src_layer and src != dst:
811 backward_in_fwd.append((src, dst))
812 same_layer_edges.append((src, dst, lbl))
813 for key in backward_in_fwd:
814 del forward_edge_labels[key]
816 # Add same-layer back-edges to forward_edge_labels so gap calculation accounts for them
817 for src, dst, lbl in same_layer_edges:
818 if (src, dst) in forward_edge_labels:
819 forward_edge_labels[(src, dst)] += "/" + lbl
820 else:
821 forward_edge_labels[(src, dst)] = lbl
823 # Recalculate inter-box gaps for layers with newly discovered same-layer edges
824 affected_layers: set[int] = set()
825 for src, dst, _lbl in same_layer_edges:
826 sl = layer_of.get(src, -1)
827 dl = layer_of.get(dst, -1)
828 if sl >= 0:
829 affected_layers.add(sl)
830 if dl >= 0:
831 affected_layers.add(dl)
832 for li in affected_layers:
833 layer = layers[li]
834 if len(layer) < 2:
835 continue
836 gap_between = 6
837 total_layer_w = sum(box_width[s] for s in layer)
838 # For non-adjacent same-layer edges the label lands in the gap immediately
839 # adjacent to the source box (left of src for right-to-left, right of src
840 # for left-to-right). Collect those requirements so the gap is wide enough.
841 extra_gap_req: dict[tuple[str, str], int] = {}
842 for src, dst, lbl in same_layer_edges:
843 if layer_of.get(src) != li or layer_of.get(dst) != li:
844 continue
845 try:
846 si, di = layer.index(src), layer.index(dst)
847 except ValueError:
848 continue
849 if abs(si - di) <= 1:
850 continue # adjacent — already handled by forward_edge_labels
851 if si > di:
852 key = (layer[si - 1], src) # gap to the left of src
853 else:
854 key = (src, layer[si + 1]) # gap to the right of src
855 extra_gap_req[key] = max(extra_gap_req.get(key, 0), len(lbl))
856 # Recalculate gaps with label-aware spacing
857 gaps: list[int] = []
858 for i in range(len(layer) - 1):
859 sname, next_s = layer[i], layer[i + 1]
860 label_fwd = forward_edge_labels.get((sname, next_s), "")
861 label_rev = forward_edge_labels.get((next_s, sname), "")
862 max_label = max(len(label_fwd), len(label_rev), extra_gap_req.get((sname, next_s), 0))
863 gap = max(gap_between, max_label + 6) if max_label > 0 else gap_between
864 gaps.append(gap)
865 total_layer_w += sum(gaps)
866 x = common_center - total_layer_w // 2
867 x = max(content_left, x)
868 for i, sname in enumerate(layer):
869 col_start[sname] = x
870 col_center[sname] = x + box_width[sname] // 2
871 if i < len(layer) - 1:
872 x += box_width[sname] + gaps[i]
873 else:
874 x += box_width[sname]
876 non_self_back = [(s, d, lbl) for (s, d), lbl in back_edge_labels_reclass.items()]
878 # Recalculate back-edge margin if it changed
879 if non_self_back:
880 max_label_len = max(len(lbl) for _, _, lbl in non_self_back)
881 n_back = len(non_self_back)
882 actual_margin = max_label_len + max(6, 2 * n_back + 2)
883 if actual_margin != back_edge_margin:
884 # Need to recalculate positions (rare case - usually matches)
885 back_edge_margin = actual_margin
886 content_left = 2 + back_edge_margin
888 # Identify forward skip-layer edges (span > 1 layer, not handled by consecutive renderer)
889 skip_forward_edges: list[tuple[str, str, str]] = []
890 for (src, dst), lbl in forward_edge_labels.items():
891 src_layer = layer_of.get(src, -1)
892 dst_layer = layer_of.get(dst, -1)
893 if dst_layer > src_layer + 1:
894 skip_forward_edges.append((src, dst, lbl))
896 # Pre-compute right margin width for forward skip-layer edges
897 right_edge_margin = 0
898 if skip_forward_edges:
899 max_fwd_label_len = max(len(lbl) for _, _, lbl in skip_forward_edges)
900 right_edge_margin = max_fwd_label_len + 6
902 # Compute total width needed
903 total_content_w = 0
904 for s in all_states:
905 right = col_start[s] + box_width[s]
906 total_content_w = max(total_content_w, right)
907 total_width = max(total_content_w + right_edge_margin + 4, tw)
909 # Compute vertical positions with space for self-loops and inter-layer arrows
910 row_start: dict[str, int] = {}
911 y = 0
912 for li, layer in enumerate(layers):
913 layer_h = max(box_height[s] for s in layer)
914 for sname in layer:
915 row_start[sname] = y
916 y += layer_h
918 # Add self-loop row if any state in this layer has self-loops
919 has_self_loop = any(s in self_loops for s in layer)
920 if has_self_loop:
921 y += 1 # self-loop marker row
923 if li < len(layers) - 1:
924 y += 3 # arrow gap: │, label, ▼
926 total_height = y
928 # Build character grid
929 grid: list[list[str]] = [[" "] * total_width for _ in range(total_height)]
931 # Draw boxes
932 for sname in all_states:
933 is_highlighted = highlight_state is not None and sname == highlight_state
934 _draw_box(
935 grid,
936 row_start[sname],
937 col_start[sname],
938 box_width[sname],
939 box_height[sname],
940 box_inner[sname],
941 is_highlighted,
942 highlight_color,
943 badge=box_badge[sname],
944 )
946 # Precompute box-occupied (row, col) pairs so connector lines can avoid overwriting box cells
947 _box_occ: dict[int, set[int]] = {}
948 for _s in all_states:
949 for _r in range(row_start[_s], row_start[_s] + box_height[_s]):
950 _row_set = _box_occ.setdefault(_r, set())
951 for _c in range(col_start[_s], col_start[_s] + box_width[_s]):
952 _row_set.add(_c)
954 # Draw self-loop markers immediately below their boxes
955 for sname, labels in self_loops.items():
956 marker = "\u21ba " + ", ".join(labels)
957 r = row_start[sname] + box_height[sname]
958 if r < total_height:
959 cx = col_center[sname]
960 pos = max(0, cx - len(marker) // 2)
961 for j, ch in enumerate(marker):
962 if pos + j < total_width:
963 grid[r][pos + j] = ch
965 # Draw forward edges between layers (vertical arrows with labels)
966 for li in range(len(layers) - 1):
967 layer_h = max(box_height[s] for s in layers[li])
968 has_self_loop = any(s in self_loops for s in layers[li])
969 self_loop_offset = 1 if has_self_loop else 0
971 # Arrow area starts after box bottom + self-loop row
972 arrow_start_row = row_start[layers[li][0]] + layer_h + self_loop_offset
973 arrow_end_row = row_start[layers[li + 1][0]] - 1
975 # Collect all inter-layer edges from this layer to the next
976 inter_edges: list[tuple[str, str, str]] = []
977 for src in layers[li]:
978 for dst in layers[li + 1]:
979 label = forward_edge_labels.get((src, dst))
980 if label is not None:
981 inter_edges.append((src, dst, label))
983 # Draw each edge with its own vertical pipe to the target's center
984 for src, dst, label in inter_edges:
985 dst_cc = col_center[dst]
986 src_left = col_start[src]
987 src_right = src_left + box_width[src]
989 # Horizontal connector when pipe is outside source box range
990 if dst_cc >= src_right or dst_cc < src_left:
991 conn_row = arrow_start_row
992 if 0 <= conn_row < total_height:
993 if dst_cc >= src_right:
994 # Pipe right of source: └───┐
995 src_cc = col_center[src]
996 if 0 <= src_cc < total_width and grid[conn_row][src_cc] == " ":
997 grid[conn_row][src_cc] = "\u2514" # └
998 start_c = src_cc + 1
999 else:
1000 start_c = src_right
1001 for c in range(start_c, dst_cc):
1002 if 0 <= c < total_width:
1003 grid[conn_row][c] = "\u2500"
1004 if 0 <= dst_cc < total_width:
1005 grid[conn_row][dst_cc] = "\u2510" # ┐
1006 else:
1007 # Pipe left of source: ┌───┘
1008 src_cc = col_center[src]
1009 if 0 <= src_cc < total_width and grid[conn_row][src_cc] == " ":
1010 end_c = src_cc
1011 grid[conn_row][src_cc] = "\u2518" # ┘
1012 else:
1013 end_c = src_left
1014 for c in range(dst_cc + 1, end_c):
1015 if 0 <= c < total_width:
1016 grid[conn_row][c] = "\u2500"
1017 if 0 <= dst_cc < total_width:
1018 grid[conn_row][dst_cc] = "\u250c" # ┌
1019 pipe_start = arrow_start_row + 1
1020 else:
1021 pipe_start = arrow_start_row
1023 # Draw vertical pipe at destination's center column
1024 for r in range(pipe_start, arrow_end_row):
1025 if 0 <= dst_cc < total_width and r < total_height:
1026 grid[r][dst_cc] = "\u2502"
1028 # Arrow tip at destination center
1029 if arrow_end_row < total_height and 0 <= dst_cc < total_width:
1030 grid[arrow_end_row][dst_cc] = "\u25bc"
1032 # Label to the right of the pipe (or left if it would overlap)
1033 label_row = arrow_start_row
1034 if label_row < total_height:
1035 label_start = dst_cc + 2
1036 for j, ch in enumerate(label):
1037 if label_start + j < total_width:
1038 grid[label_row][label_start + j] = ch
1040 # Post-pass: connect horizontal gaps for multi-branch sources
1041 if len(inter_edges) >= 2 and 0 <= arrow_start_row < total_height:
1042 src_targets: dict[str, list[int]] = {}
1043 for src, dst, _ in inter_edges:
1044 if dst in col_center:
1045 src_targets.setdefault(src, []).append(col_center[dst])
1046 for _src, centers in src_targets.items():
1047 if len(centers) < 2:
1048 continue
1049 left = min(centers)
1050 right = max(centers)
1051 for c in range(left + 1, right):
1052 if 0 <= c < total_width:
1053 cell = grid[arrow_start_row][c]
1054 if cell == " ":
1055 grid[arrow_start_row][c] = "\u2500" # ─
1056 elif cell == "\u2502": # │ → ┼
1057 grid[arrow_start_row][c] = "\u253c"
1058 elif cell == "\u2518": # ┘ → ┴
1059 grid[arrow_start_row][c] = "\u2534"
1060 elif cell == "\u2514": # └ → ┴
1061 grid[arrow_start_row][c] = "\u2534"
1062 elif cell == "\u2510": # ┐ → ┬
1063 grid[arrow_start_row][c] = "\u252c"
1064 elif cell == "\u250c": # ┌ → ┬
1065 grid[arrow_start_row][c] = "\u252c"
1066 # Update boundary junction chars where the horizontal bar meets a pipe
1067 if 0 <= left < total_width and grid[arrow_start_row][left] == "\u2502": # │ → ├
1068 grid[arrow_start_row][left] = "\u251c"
1069 if 0 <= right < total_width and grid[arrow_start_row][right] == "\u2502": # │ → ┤
1070 grid[arrow_start_row][right] = "\u2524"
1072 # Draw same-layer edges (horizontal arrows between states on same layer)
1073 # Includes both branches and reclassified back-edges within same layer
1074 all_same_layer: list[tuple[str, str, str]] = list(same_layer_edges)
1075 for _li, layer in enumerate(layers):
1076 for i, src in enumerate(layer):
1077 for j, dst in enumerate(layer):
1078 if i == j:
1079 continue
1080 label = forward_edge_labels.get((src, dst))
1081 if label is not None and (src, dst, label) not in all_same_layer:
1082 all_same_layer.append((src, dst, label))
1084 for src, dst, label in all_same_layer:
1085 if src not in col_start or dst not in col_start:
1086 continue
1087 name_row = row_start[src] + 1
1088 src_right = col_start[src] + box_width[src]
1089 dst_right = col_start[dst] + box_width[dst]
1090 dst_left = col_start[dst]
1091 src_left = col_start[src]
1092 _row_boxes = _box_occ.get(name_row, set())
1093 if dst_left >= src_right:
1094 # Left to right horizontal arrow: src ──label──▶ dst
1095 start = src_right
1096 end = dst_left
1097 edge_text = "\u2500" + label + "\u2500\u2500\u25b6"
1098 available = end - start
1099 if available < len(edge_text):
1100 edge_text = edge_text[: max(1, available)]
1101 left_dashes = max(0, available - len(edge_text))
1102 for k in range(left_dashes):
1103 pos = start + k
1104 if pos < total_width and name_row < total_height and pos not in _row_boxes:
1105 grid[name_row][pos] = "\u2500"
1106 for k, ch in enumerate(edge_text):
1107 pos = start + left_dashes + k
1108 if (
1109 0 <= pos < end
1110 and pos < total_width
1111 and name_row < total_height
1112 and pos not in _row_boxes
1113 ):
1114 grid[name_row][pos] = ch
1115 elif dst_right <= src_left:
1116 # Right to left: dst is left of src: src → dst drawn as dst ◀──label── src
1117 start = dst_right
1118 end = src_left
1119 edge_text = "\u2500" + label + "\u2500\u2500\u25b6"
1120 available = end - start
1121 if available < len(edge_text):
1122 edge_text = edge_text[: max(1, available)]
1123 left_dashes = max(0, available - len(edge_text))
1124 for k in range(left_dashes):
1125 pos = start + k
1126 if pos < total_width and name_row < total_height and pos not in _row_boxes:
1127 grid[name_row][pos] = "\u2500"
1128 for k, ch in enumerate(edge_text):
1129 pos = start + left_dashes + k
1130 if (
1131 0 <= pos < end
1132 and pos < total_width
1133 and name_row < total_height
1134 and pos not in _row_boxes
1135 ):
1136 grid[name_row][pos] = ch
1138 # Back-edges: left-margin vertical arrows with labels
1139 if non_self_back:
1140 sorted_back = sorted(
1141 non_self_back,
1142 key=lambda e: abs(row_start.get(e[0], 0) - row_start.get(e[1], 0)),
1143 reverse=True,
1144 )
1145 used_cols: list[int] = []
1146 # Compute rightmost pipe column so labels go right of ALL pipes
1147 rightmost_pipe_col = 1 + (len(sorted_back) - 1) * 2
1149 for src, dst, label in sorted_back:
1150 # Source: name row of source box; target: name row of target box
1151 src_row = row_start.get(src, 0) + 1 # name row, not bottom border
1152 dst_row = row_start.get(dst, 0) + 1 # name row
1154 # Find a free column in the margin
1155 col = 1
1156 for uc in sorted(used_cols):
1157 if uc == col:
1158 col += 2
1159 used_cols.append(col)
1161 if col >= content_left - 1:
1162 continue
1164 top_row = min(src_row, dst_row)
1165 bot_row = max(src_row, dst_row)
1167 # Draw vertical line in margin (exclude corner rows handled below)
1168 for r in range(top_row + 1, bot_row):
1169 if 0 <= r < total_height and col < total_width:
1170 cell = grid[r][col]
1171 if cell == "\u2500": # ─ → ┼
1172 grid[r][col] = "\u253c"
1173 elif cell == " ":
1174 grid[r][col] = "\u2502"
1176 # Horizontal connector from source box to margin
1177 # Draw right-to-left, crossing existing pipes with junction chars
1178 if 0 <= src_row < total_height:
1179 src_left = col_start.get(src, col + 1)
1180 _src_row_boxes = _box_occ.get(src_row, set())
1181 for c in range(col + 1, src_left):
1182 if c < total_width and c not in _src_row_boxes:
1183 cell = grid[src_row][c]
1184 if cell == " ":
1185 grid[src_row][c] = "\u2500" # ─
1186 elif cell == "\u2502": # │ → ┼
1187 grid[src_row][c] = "\u253c"
1188 elif cell == "\u2514": # └ → ┴
1189 grid[src_row][c] = "\u2534"
1190 elif cell == "\u250c": # ┌ → ┬
1191 grid[src_row][c] = "\u252c"
1192 elif cell == "\u251c": # ├ → ┼
1193 grid[src_row][c] = "\u253c"
1194 # Leave ─, ▶, box chars unchanged
1196 # Horizontal connector from margin to target box
1197 # Draw right-to-left, crossing existing pipes with junction chars
1198 dst_left = col_start.get(dst, col + 1)
1199 if 0 <= dst_row < total_height:
1200 _dst_row_boxes = _box_occ.get(dst_row, set())
1201 for c in range(col + 1, dst_left):
1202 if c < total_width and c not in _dst_row_boxes:
1203 cell = grid[dst_row][c]
1204 if cell == " ":
1205 grid[dst_row][c] = "\u2500" # ─
1206 elif cell == "\u2502": # │ → ┼
1207 grid[dst_row][c] = "\u253c"
1208 elif cell == "\u2514": # └ → ┴
1209 grid[dst_row][c] = "\u2534"
1210 elif cell == "\u250c": # ┌ → ┬
1211 grid[dst_row][c] = "\u252c"
1212 elif cell == "\u251c": # ├ → ┼
1213 grid[dst_row][c] = "\u253c"
1215 # Corner characters at pipe-to-horizontal turn points
1216 for row in (src_row, dst_row):
1217 if 0 <= row < total_height and col < total_width:
1218 existing = grid[row][col]
1219 if row == bot_row:
1220 # Pipe ends, turns right: └; if horizontal already crosses here: ┴
1221 grid[row][col] = "\u2534" if existing == "\u2500" else "\u2514"
1222 else: # row == top_row
1223 # Pipe starts going down, turns right: ┌; if horizontal already crosses here: ┬
1224 grid[row][col] = "\u252c" if existing == "\u2500" else "\u250c"
1226 # Arrow tip at target: place ▶ at end of horizontal connector (entering box from left)
1227 if 0 <= dst_row < total_height and dst_left - 1 > col and dst_left - 1 < total_width:
1228 grid[dst_row][dst_left - 1] = "\u25b6"
1230 # Label on the margin line (right of ALL pipes, not just this one)
1231 label_row_pos = (top_row + bot_row) // 2
1232 if 0 <= label_row_pos < total_height:
1233 label_start = rightmost_pipe_col + 2
1234 for j, ch in enumerate(label):
1235 if label_start + j < content_left - 1 and label_start + j < total_width:
1236 grid[label_row_pos][label_start + j] = ch
1238 # Forward skip-layer edges: right-margin vertical arrows with labels
1239 # Symmetric to the left-margin back-edge renderer above
1240 if skip_forward_edges:
1241 sorted_fwd_skip = sorted(
1242 skip_forward_edges,
1243 key=lambda e: abs(row_start.get(e[0], 0) - row_start.get(e[1], 0)),
1244 reverse=True,
1245 )
1246 used_fwd_cols: list[int] = []
1247 # Rightmost pipe column (furthest from content) for label placement
1248 rightmost_fwd_pipe_col = total_content_w + 2 + (len(sorted_fwd_skip) - 1) * 2
1250 for src, dst, label in sorted_fwd_skip:
1251 src_row = row_start.get(src, 0) + 1 # name row
1252 dst_row = row_start.get(dst, 0) + 1 # name row
1254 # Allocate column in right margin (starting from content edge, going right)
1255 col = total_content_w + 2
1256 for uc in sorted(used_fwd_cols):
1257 if uc == col:
1258 col += 2
1259 used_fwd_cols.append(col)
1261 if col >= total_width:
1262 continue
1264 top_row = min(src_row, dst_row)
1265 bot_row = max(src_row, dst_row)
1267 # Draw vertical line in right margin (exclude corner rows handled below)
1268 for r in range(top_row + 1, bot_row):
1269 if 0 <= r < total_height and col < total_width:
1270 cell = grid[r][col]
1271 if cell == "\u2500": # ─ → ┼
1272 grid[r][col] = "\u253c"
1273 elif cell == " ":
1274 grid[r][col] = "\u2502"
1276 # Horizontal connector from source box right side to margin
1277 # Draw left-to-right, crossing existing pipes with junction chars
1278 src_right = col_start.get(src, 0) + box_width.get(src, 0)
1279 _src_row_boxes = _box_occ.get(src_row, set())
1280 if 0 <= src_row < total_height:
1281 for c in range(src_right, col):
1282 if 0 <= c < total_width and c not in _src_row_boxes:
1283 cell = grid[src_row][c]
1284 if cell == " ":
1285 grid[src_row][c] = "\u2500" # ─
1286 elif cell == "\u2502": # │ → ┼
1287 grid[src_row][c] = "\u253c"
1288 elif cell == "\u2518": # ┘ → ┴
1289 grid[src_row][c] = "\u2534"
1290 elif cell == "\u2510": # ┐ → ┬
1291 grid[src_row][c] = "\u252c"
1292 elif cell == "\u2524": # ┤ → ┼
1293 grid[src_row][c] = "\u253c"
1294 # Leave ─, ◀, box chars unchanged
1296 # Horizontal connector from margin to destination box right side
1297 dst_right = col_start.get(dst, 0) + box_width.get(dst, 0)
1298 _dst_row_boxes = _box_occ.get(dst_row, set())
1299 if 0 <= dst_row < total_height:
1300 for c in range(dst_right, col):
1301 if 0 <= c < total_width and c not in _dst_row_boxes:
1302 cell = grid[dst_row][c]
1303 if cell == " ":
1304 grid[dst_row][c] = "\u2500" # ─
1305 elif cell == "\u2502": # │ → ┼
1306 grid[dst_row][c] = "\u253c"
1307 elif cell == "\u2518": # ┘ → ┴
1308 grid[dst_row][c] = "\u2534"
1309 elif cell == "\u2510": # ┐ → ┬
1310 grid[dst_row][c] = "\u252c"
1311 elif cell == "\u2524": # ┤ → ┼
1312 grid[dst_row][c] = "\u253c"
1314 # Corner characters at pipe-to-horizontal turn points
1315 for row in (src_row, dst_row):
1316 if 0 <= row < total_height and col < total_width:
1317 existing = grid[row][col]
1318 if row == bot_row:
1319 # Pipe ends, turns left: ┘; if horizontal crosses: ┤
1320 grid[row][col] = "\u2524" if existing == "\u2500" else "\u2518"
1321 else: # row == top_row
1322 # Pipe starts going down, turns left: ┐; if horizontal crosses: ┤
1323 grid[row][col] = "\u2524" if existing == "\u2500" else "\u2510"
1325 # Arrow tip at target: ◀ entering box from right side
1326 if 0 <= dst_row < total_height and dst_right < col and dst_right < total_width:
1327 grid[dst_row][dst_right] = "\u25c0"
1329 # Label on the margin line (right of ALL pipes, mirroring left-margin approach)
1330 label_row_pos = (top_row + bot_row) // 2
1331 if 0 <= label_row_pos < total_height:
1332 label_start = rightmost_fwd_pipe_col + 2
1333 for j, ch in enumerate(label):
1334 if label_start + j < total_width:
1335 grid[label_row_pos][label_start + j] = ch
1337 # Convert grid to string
1338 lines = ["".join(row).rstrip() for row in grid]
1340 # Remove trailing empty lines
1341 while lines and not lines[-1].strip():
1342 lines.pop()
1344 # Center diagram
1345 max_line_len = max((len(ln) for ln in lines), default=0)
1346 diagram_indent = max(0, (tw - max_line_len) // 2)
1347 if diagram_indent > 0:
1348 lines = [" " * diagram_indent + ln if ln.strip() else ln for ln in lines]
1350 return _colorize_diagram_labels("\n".join(lines))
1353# ---------------------------------------------------------------------------
1354# FSM diagram renderer (main entry point)
1355# ---------------------------------------------------------------------------
1358def _render_fsm_diagram(
1359 fsm: FSMLoop,
1360 verbose: bool = False,
1361 highlight_state: str | None = None,
1362 highlight_color: str = "32",
1363) -> str:
1364 """Render an adaptive text diagram of the FSM graph.
1366 Detects FSM topology and selects appropriate layout:
1367 - Linear chains: vertical top-to-bottom
1368 - Branching/cyclic: layered Sugiyama-style
1370 Args:
1371 fsm: The FSM loop to render.
1372 verbose: If True, show expanded action content in boxes.
1373 highlight_state: If provided, render this state's box with the highlight color.
1374 highlight_color: ANSI SGR code for the highlighted state (default: green).
1375 """
1376 edges = _collect_edges(fsm)
1377 bfs_order_list, bfs_depth = _bfs_order(fsm.initial, edges)
1378 main_path, main_edge_set = _trace_main_path(fsm, edges)
1379 branches, back_edges = _classify_edges(edges, main_edge_set, bfs_order_list)
1381 terminal_states = {name for name, state in fsm.states.items() if state.terminal}
1383 # Collect all states
1384 all_states = list(main_path)
1385 for src, dst, _ in branches:
1386 for s in (src, dst):
1387 if s not in all_states:
1388 all_states.append(s)
1390 # Topology detection
1391 detector = TopologyDetector(edges, main_path, branches, back_edges)
1392 topology = detector.classify()
1394 # Build back-edge set for layout pipeline
1395 back_edge_set: set[tuple[str, str]] = set()
1396 for src, dst, _ in back_edges:
1397 if src != dst:
1398 back_edge_set.add((src, dst))
1400 tw = terminal_width()
1402 if topology == "linear" and len(all_states) <= 1:
1403 # Single state or empty — use simple horizontal
1404 return _render_horizontal_simple(
1405 main_path,
1406 edges,
1407 main_edge_set,
1408 branches,
1409 back_edges,
1410 bfs_order_list,
1411 fsm.initial,
1412 terminal_states,
1413 fsm.states,
1414 verbose,
1415 highlight_state,
1416 highlight_color,
1417 )
1419 # Compute max node width to determine width constraint
1420 # Quick estimate: widest state name or badge + padding
1421 max_node_w = 30 # reasonable default
1422 for s in all_states:
1423 st = fsm.states.get(s)
1424 badge = _get_state_badge(st)
1425 badge_w = _badge_display_width(badge) if badge else 0
1426 label = s
1427 if s == fsm.initial:
1428 label = "\u2192 " + label
1429 if s in terminal_states:
1430 label = label + " \u25c9"
1431 w = max(len(label), badge_w)
1432 max_node_w = max(max_node_w, w + 4 + 4) # inner + borders + padding
1434 max_width_per_layer = max(1, (tw - 10) // (max_node_w + 4))
1436 # Layer assignment
1437 assigner = LayerAssigner(all_states, edges, back_edge_set, fsm.initial, max_width_per_layer)
1438 layers = assigner.assign()
1440 # Crossing minimization
1441 minimizer = CrossingMinimizer(layers, edges, back_edge_set)
1442 layers = minimizer.minimize()
1444 return _render_layered_diagram(
1445 layers,
1446 edges,
1447 main_edge_set,
1448 branches,
1449 back_edges,
1450 fsm.initial,
1451 terminal_states,
1452 fsm.states,
1453 verbose,
1454 highlight_state,
1455 highlight_color,
1456 )
1459def _render_horizontal_simple(
1460 main_path: list[str],
1461 edges: list[tuple[str, str, str]],
1462 main_edge_set: set[tuple[str, str]],
1463 branches: list[tuple[str, str, str]],
1464 back_edges: list[tuple[str, str, str]],
1465 bfs_order: list[str],
1466 initial: str,
1467 terminal_states: set[str],
1468 fsm_states: dict[str, StateConfig],
1469 verbose: bool,
1470 highlight_state: str | None,
1471 highlight_color: str,
1472) -> str:
1473 """Simple horizontal rendering for single-state or very simple FSMs."""
1474 if not main_path:
1475 return ""
1477 all_states = list(main_path)
1478 display_label = _compute_display_labels(all_states, initial, terminal_states)
1480 tw = terminal_width()
1481 num_main = max(1, len(main_path))
1482 if verbose and fsm_states and main_path:
1483 max_box_inner = max(20, min(60, (tw - 4) // num_main - 6))
1484 else:
1485 max_box_inner = max(20, min(40, (tw - 4) // num_main - 6))
1487 box_inner, box_width, box_height, box_badge = _compute_box_sizes(
1488 all_states, display_label, fsm_states, verbose, max_box_inner
1489 )
1491 main_height = max((box_height[s] for s in main_path), default=3)
1492 total_width = tw
1494 # Column positions
1495 col_start: dict[str, int] = {}
1496 col_center: dict[str, int] = {}
1497 x = 2
1498 for i, sname in enumerate(main_path):
1499 col_start[sname] = x
1500 col_center[sname] = x + box_width[sname] // 2
1501 x += box_width[sname]
1502 if i < len(main_path) - 1:
1503 x += 4
1505 rows: list[list[str]] = [[" "] * total_width for _ in range(main_height)]
1507 for sname in main_path:
1508 is_highlighted = highlight_state is not None and sname == highlight_state
1509 _draw_box(
1510 rows,
1511 0,
1512 col_start[sname],
1513 box_width[sname],
1514 main_height,
1515 box_inner[sname],
1516 is_highlighted,
1517 highlight_color,
1518 badge=box_badge[sname],
1519 )
1521 # Self-loops
1522 self_loops_list = [(s, d, lbl) for s, d, lbl in back_edges if s == d]
1523 lines = ["".join(row).rstrip() for row in rows]
1524 if self_loops_list:
1525 self_labels: dict[str, list[str]] = {}
1526 for src, _, label in self_loops_list:
1527 self_labels.setdefault(src, []).append(label)
1528 for sname, labels in self_labels.items():
1529 marker = "\u21ba " + ", ".join(labels)
1530 self_row = [" "] * total_width
1531 cx = col_center.get(sname, 0)
1532 pos = max(0, cx - len(marker) // 2)
1533 for j, ch in enumerate(marker):
1534 if pos + j < total_width:
1535 self_row[pos + j] = ch
1536 lines.append("".join(self_row).rstrip())
1538 diagram_indent = max(0, (tw - (x + 4)) // 2)
1539 if diagram_indent > 0:
1540 lines = [" " * diagram_indent + ln if ln.strip() else ln for ln in lines]
1542 return _colorize_diagram_labels("\n".join(lines))