Coverage for little_loops / fsm / interpolation.py: 25%
116 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Variable interpolation for FSM loop definitions.
3This module provides runtime variable substitution using ${namespace.path}
4syntax. Variables are resolved against an InterpolationContext that holds
5runtime state including user context, captured values, and metadata.
7Supported namespaces:
8 context: User-defined variables from FSM context block
9 captured: Values stored via capture: in previous states
10 prev: Previous state's result (shorthand)
11 result: Current evaluation result
12 state: Current state metadata (name, iteration)
13 loop: Loop-level metadata (name, started_at, elapsed_ms, elapsed)
14 env: Environment variables
15"""
17from __future__ import annotations
19import os
20import re
21from dataclasses import dataclass, field
22from typing import Any
24# Pre-compiled patterns for performance
25VARIABLE_PATTERN = re.compile(r"\$\{([^}]+)\}")
26ESCAPED_PATTERN = re.compile(r"\$\$\{")
27ESCAPED_PLACEHOLDER = "\x00ESCAPED\x00"
30class InterpolationError(Exception):
31 """Raised when variable interpolation fails."""
33 pass
36@dataclass
37class InterpolationContext:
38 """Runtime context for variable resolution.
40 Holds all namespace data needed to resolve ${namespace.path} variables
41 during FSM execution.
43 Attributes:
44 context: User-defined variables from FSM context block
45 captured: Stored action results {varname: {output, stderr, exit_code, duration_ms}}
46 prev: Previous state result or None if first state
47 result: Current evaluation result or None
48 state_name: Current state name
49 iteration: Current loop iteration (1-based)
50 loop_name: FSM loop name
51 started_at: ISO timestamp when loop started
52 elapsed_ms: Milliseconds since loop started
53 """
55 context: dict[str, Any] = field(default_factory=dict)
56 captured: dict[str, dict[str, Any]] = field(default_factory=dict)
57 prev: dict[str, Any] | None = None
58 result: dict[str, Any] | None = None
59 state_name: str = ""
60 iteration: int = 1
61 loop_name: str = ""
62 started_at: str = ""
63 elapsed_ms: int = 0
65 def resolve(self, namespace: str, path: str) -> Any:
66 """Resolve a namespace.path reference to its value.
68 Args:
69 namespace: The namespace identifier (context, captured, etc.)
70 path: The dot-separated path within the namespace
72 Returns:
73 The resolved value
75 Raises:
76 InterpolationError: If namespace unknown or path not found
77 """
78 if namespace == "context":
79 return self._get_nested(self.context, path, "context")
80 elif namespace == "captured":
81 return self._get_nested(self.captured, path, "captured")
82 elif namespace == "prev":
83 if self.prev is None:
84 raise InterpolationError("No previous state result available")
85 return self._get_nested(self.prev, path, "prev")
86 elif namespace == "result":
87 if self.result is None:
88 raise InterpolationError("No evaluation result available")
89 return self._get_nested(self.result, path, "result")
90 elif namespace == "state":
91 return self._get_state_value(path)
92 elif namespace == "loop":
93 return self._get_loop_value(path)
94 elif namespace == "env":
95 value = os.environ.get(path)
96 if value is None:
97 raise InterpolationError(f"Environment variable '{path}' not set")
98 return value
99 else:
100 raise InterpolationError(f"Unknown namespace: {namespace}")
102 def _get_nested(self, obj: dict[str, Any], path: str, namespace: str) -> Any:
103 """Get nested value from dict using dot notation.
105 Args:
106 obj: Dictionary to traverse
107 path: Dot-separated path (e.g., "errors.output")
108 namespace: Namespace name for error messages
110 Returns:
111 The value at the path
113 Raises:
114 InterpolationError: If path not found
115 """
116 parts = path.split(".")
117 current: Any = obj
118 for i, part in enumerate(parts):
119 if isinstance(current, dict) and part in current:
120 current = current[part]
121 else:
122 traversed = ".".join(parts[: i + 1])
123 raise InterpolationError(f"Path '{traversed}' not found in {namespace}")
124 return current
126 def _get_state_value(self, key: str) -> Any:
127 """Get state metadata value.
129 Args:
130 key: State property name (name or iteration)
132 Returns:
133 The state property value
135 Raises:
136 InterpolationError: If key unknown
137 """
138 if key == "name":
139 return self.state_name
140 elif key == "iteration":
141 return self.iteration
142 else:
143 raise InterpolationError(f"Unknown state property: {key}")
145 def _get_loop_value(self, key: str) -> Any:
146 """Get loop metadata value.
148 Args:
149 key: Loop property name
151 Returns:
152 The loop property value
154 Raises:
155 InterpolationError: If key unknown
156 """
157 if key == "name":
158 return self.loop_name
159 elif key == "started_at":
160 return self.started_at
161 elif key == "elapsed_ms":
162 return self.elapsed_ms
163 elif key == "elapsed":
164 return _format_duration(self.elapsed_ms)
165 else:
166 raise InterpolationError(f"Unknown loop property: {key}")
169def interpolate(template: str, ctx: InterpolationContext) -> str:
170 """Replace ${namespace.path} variables in template string.
172 Resolves variables at runtime against the provided context.
173 Handles $${...} escaping (becomes literal ${...}).
175 Args:
176 template: String containing variable references
177 ctx: Runtime context for resolution
179 Returns:
180 String with all variables resolved
182 Raises:
183 InterpolationError: If variable format invalid or value not found
184 """
185 # Replace escaped sequences with placeholder
186 result = ESCAPED_PATTERN.sub(ESCAPED_PLACEHOLDER, template)
188 def replace_var(match: re.Match[str]) -> str:
189 full_path = match.group(1)
190 if "." not in full_path:
191 raise InterpolationError(
192 f"Invalid variable: ${{{full_path}}} (expected namespace.path)"
193 )
194 namespace, path = full_path.split(".", 1)
195 value = ctx.resolve(namespace, path)
196 # Convert to string, handling empty values
197 if value is None:
198 return ""
199 return str(value)
201 result = VARIABLE_PATTERN.sub(replace_var, result)
203 # Restore escaped sequences as literal ${
204 result = result.replace(ESCAPED_PLACEHOLDER, "${")
206 return result
209def interpolate_dict(obj: dict[str, Any], ctx: InterpolationContext) -> dict[str, Any]:
210 """Recursively interpolate all string values in a dict.
212 Only string values are interpolated. Non-string values (int, float,
213 bool, None) are passed through unchanged. Nested dicts and lists
214 are recursively processed.
216 Args:
217 obj: Dictionary to process
218 ctx: Runtime context for resolution
220 Returns:
221 New dictionary with interpolated string values
223 Raises:
224 InterpolationError: If any variable resolution fails
225 """
226 result: dict[str, Any] = {}
227 for key, value in obj.items():
228 if isinstance(value, str):
229 result[key] = interpolate(value, ctx)
230 elif isinstance(value, dict):
231 result[key] = interpolate_dict(value, ctx)
232 elif isinstance(value, list):
233 result[key] = _interpolate_list(value, ctx)
234 else:
235 result[key] = value
236 return result
239def _interpolate_list(items: list[Any], ctx: InterpolationContext) -> list[Any]:
240 """Interpolate string values in a list.
242 Args:
243 items: List to process
244 ctx: Runtime context for resolution
246 Returns:
247 New list with interpolated string values
248 """
249 result: list[Any] = []
250 for item in items:
251 if isinstance(item, str):
252 result.append(interpolate(item, ctx))
253 elif isinstance(item, dict):
254 result.append(interpolate_dict(item, ctx))
255 elif isinstance(item, list):
256 result.append(_interpolate_list(item, ctx))
257 else:
258 result.append(item)
259 return result
262def _format_duration(ms: int) -> str:
263 """Format milliseconds as human-readable duration.
265 Args:
266 ms: Duration in milliseconds
268 Returns:
269 Formatted string like "500ms", "30s", or "2m 15s"
270 """
271 if ms < 1000:
272 return f"{ms}ms"
273 seconds = ms // 1000
274 if seconds < 60:
275 return f"{seconds}s"
276 minutes = seconds // 60
277 remaining_seconds = seconds % 60
278 if remaining_seconds == 0:
279 return f"{minutes}m"
280 return f"{minutes}m {remaining_seconds}s"