Coverage for little_loops / fsm / interpolation.py: 25%

116 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Variable interpolation for FSM loop definitions. 

2 

3This module provides runtime variable substitution using ${namespace.path} 

4syntax. Variables are resolved against an InterpolationContext that holds 

5runtime state including user context, captured values, and metadata. 

6 

7Supported namespaces: 

8 context: User-defined variables from FSM context block 

9 captured: Values stored via capture: in previous states 

10 prev: Previous state's result (shorthand) 

11 result: Current evaluation result 

12 state: Current state metadata (name, iteration) 

13 loop: Loop-level metadata (name, started_at, elapsed_ms, elapsed) 

14 env: Environment variables 

15""" 

16 

17from __future__ import annotations 

18 

19import os 

20import re 

21from dataclasses import dataclass, field 

22from typing import Any 

23 

24# Pre-compiled patterns for performance 

25VARIABLE_PATTERN = re.compile(r"\$\{([^}]+)\}") 

26ESCAPED_PATTERN = re.compile(r"\$\$\{") 

27ESCAPED_PLACEHOLDER = "\x00ESCAPED\x00" 

28 

29 

30class InterpolationError(Exception): 

31 """Raised when variable interpolation fails.""" 

32 

33 pass 

34 

35 

36@dataclass 

37class InterpolationContext: 

38 """Runtime context for variable resolution. 

39 

40 Holds all namespace data needed to resolve ${namespace.path} variables 

41 during FSM execution. 

42 

43 Attributes: 

44 context: User-defined variables from FSM context block 

45 captured: Stored action results {varname: {output, stderr, exit_code, duration_ms}} 

46 prev: Previous state result or None if first state 

47 result: Current evaluation result or None 

48 state_name: Current state name 

49 iteration: Current loop iteration (1-based) 

50 loop_name: FSM loop name 

51 started_at: ISO timestamp when loop started 

52 elapsed_ms: Milliseconds since loop started 

53 """ 

54 

55 context: dict[str, Any] = field(default_factory=dict) 

56 captured: dict[str, dict[str, Any]] = field(default_factory=dict) 

57 prev: dict[str, Any] | None = None 

58 result: dict[str, Any] | None = None 

59 state_name: str = "" 

60 iteration: int = 1 

61 loop_name: str = "" 

62 started_at: str = "" 

63 elapsed_ms: int = 0 

64 

65 def resolve(self, namespace: str, path: str) -> Any: 

66 """Resolve a namespace.path reference to its value. 

67 

68 Args: 

69 namespace: The namespace identifier (context, captured, etc.) 

70 path: The dot-separated path within the namespace 

71 

72 Returns: 

73 The resolved value 

74 

75 Raises: 

76 InterpolationError: If namespace unknown or path not found 

77 """ 

78 if namespace == "context": 

79 return self._get_nested(self.context, path, "context") 

80 elif namespace == "captured": 

81 return self._get_nested(self.captured, path, "captured") 

82 elif namespace == "prev": 

83 if self.prev is None: 

84 raise InterpolationError("No previous state result available") 

85 return self._get_nested(self.prev, path, "prev") 

86 elif namespace == "result": 

87 if self.result is None: 

88 raise InterpolationError("No evaluation result available") 

89 return self._get_nested(self.result, path, "result") 

90 elif namespace == "state": 

91 return self._get_state_value(path) 

92 elif namespace == "loop": 

93 return self._get_loop_value(path) 

94 elif namespace == "env": 

95 value = os.environ.get(path) 

96 if value is None: 

97 raise InterpolationError(f"Environment variable '{path}' not set") 

98 return value 

99 else: 

100 raise InterpolationError(f"Unknown namespace: {namespace}") 

101 

102 def _get_nested(self, obj: dict[str, Any], path: str, namespace: str) -> Any: 

103 """Get nested value from dict using dot notation. 

104 

105 Args: 

106 obj: Dictionary to traverse 

107 path: Dot-separated path (e.g., "errors.output") 

108 namespace: Namespace name for error messages 

109 

110 Returns: 

111 The value at the path 

112 

113 Raises: 

114 InterpolationError: If path not found 

115 """ 

116 parts = path.split(".") 

117 current: Any = obj 

118 for i, part in enumerate(parts): 

119 if isinstance(current, dict) and part in current: 

120 current = current[part] 

121 else: 

122 traversed = ".".join(parts[: i + 1]) 

123 raise InterpolationError(f"Path '{traversed}' not found in {namespace}") 

124 return current 

125 

126 def _get_state_value(self, key: str) -> Any: 

127 """Get state metadata value. 

128 

129 Args: 

130 key: State property name (name or iteration) 

131 

132 Returns: 

133 The state property value 

134 

135 Raises: 

136 InterpolationError: If key unknown 

137 """ 

138 if key == "name": 

139 return self.state_name 

140 elif key == "iteration": 

141 return self.iteration 

142 else: 

143 raise InterpolationError(f"Unknown state property: {key}") 

144 

145 def _get_loop_value(self, key: str) -> Any: 

146 """Get loop metadata value. 

147 

148 Args: 

149 key: Loop property name 

150 

151 Returns: 

152 The loop property value 

153 

154 Raises: 

155 InterpolationError: If key unknown 

156 """ 

157 if key == "name": 

158 return self.loop_name 

159 elif key == "started_at": 

160 return self.started_at 

161 elif key == "elapsed_ms": 

162 return self.elapsed_ms 

163 elif key == "elapsed": 

164 return _format_duration(self.elapsed_ms) 

165 else: 

166 raise InterpolationError(f"Unknown loop property: {key}") 

167 

168 

169def interpolate(template: str, ctx: InterpolationContext) -> str: 

170 """Replace ${namespace.path} variables in template string. 

171 

172 Resolves variables at runtime against the provided context. 

173 Handles $${...} escaping (becomes literal ${...}). 

174 

175 Args: 

176 template: String containing variable references 

177 ctx: Runtime context for resolution 

178 

179 Returns: 

180 String with all variables resolved 

181 

182 Raises: 

183 InterpolationError: If variable format invalid or value not found 

184 """ 

185 # Replace escaped sequences with placeholder 

186 result = ESCAPED_PATTERN.sub(ESCAPED_PLACEHOLDER, template) 

187 

188 def replace_var(match: re.Match[str]) -> str: 

189 full_path = match.group(1) 

190 if "." not in full_path: 

191 raise InterpolationError( 

192 f"Invalid variable: ${{{full_path}}} (expected namespace.path)" 

193 ) 

194 namespace, path = full_path.split(".", 1) 

195 value = ctx.resolve(namespace, path) 

196 # Convert to string, handling empty values 

197 if value is None: 

198 return "" 

199 return str(value) 

200 

201 result = VARIABLE_PATTERN.sub(replace_var, result) 

202 

203 # Restore escaped sequences as literal ${ 

204 result = result.replace(ESCAPED_PLACEHOLDER, "${") 

205 

206 return result 

207 

208 

209def interpolate_dict(obj: dict[str, Any], ctx: InterpolationContext) -> dict[str, Any]: 

210 """Recursively interpolate all string values in a dict. 

211 

212 Only string values are interpolated. Non-string values (int, float, 

213 bool, None) are passed through unchanged. Nested dicts and lists 

214 are recursively processed. 

215 

216 Args: 

217 obj: Dictionary to process 

218 ctx: Runtime context for resolution 

219 

220 Returns: 

221 New dictionary with interpolated string values 

222 

223 Raises: 

224 InterpolationError: If any variable resolution fails 

225 """ 

226 result: dict[str, Any] = {} 

227 for key, value in obj.items(): 

228 if isinstance(value, str): 

229 result[key] = interpolate(value, ctx) 

230 elif isinstance(value, dict): 

231 result[key] = interpolate_dict(value, ctx) 

232 elif isinstance(value, list): 

233 result[key] = _interpolate_list(value, ctx) 

234 else: 

235 result[key] = value 

236 return result 

237 

238 

239def _interpolate_list(items: list[Any], ctx: InterpolationContext) -> list[Any]: 

240 """Interpolate string values in a list. 

241 

242 Args: 

243 items: List to process 

244 ctx: Runtime context for resolution 

245 

246 Returns: 

247 New list with interpolated string values 

248 """ 

249 result: list[Any] = [] 

250 for item in items: 

251 if isinstance(item, str): 

252 result.append(interpolate(item, ctx)) 

253 elif isinstance(item, dict): 

254 result.append(interpolate_dict(item, ctx)) 

255 elif isinstance(item, list): 

256 result.append(_interpolate_list(item, ctx)) 

257 else: 

258 result.append(item) 

259 return result 

260 

261 

262def _format_duration(ms: int) -> str: 

263 """Format milliseconds as human-readable duration. 

264 

265 Args: 

266 ms: Duration in milliseconds 

267 

268 Returns: 

269 Formatted string like "500ms", "30s", or "2m 15s" 

270 """ 

271 if ms < 1000: 

272 return f"{ms}ms" 

273 seconds = ms // 1000 

274 if seconds < 60: 

275 return f"{seconds}s" 

276 minutes = seconds // 60 

277 remaining_seconds = seconds % 60 

278 if remaining_seconds == 0: 

279 return f"{minutes}m" 

280 return f"{minutes}m {remaining_seconds}s"