Coverage for src / invariant / expressions.py: 93.57%

171 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 10:21 +0100

1"""CEL expression engine for evaluating ${...} expressions in node parameters. 

2 

3This module provides expression evaluation using the Common Expression Language (CEL). 

4Expressions are embedded in parameter values using ${...} delimiters and are evaluated 

5with dependency artifacts exposed as variables. 

6 

7Expression Syntax 

8----------------- 

9 

10Variable References: 

11 - `${x}` - References the entire artifact. For artifacts with a `.value` attribute, 

12 this returns the value directly. 

13 - `${x.value}` - Accesses the `value` field of artifact `x`. This is the most common 

14 pattern for accessing numeric or string values from artifacts. 

15 

16Integer Arithmetic: 

17 - `${x.value + 1}` - Add integers 

18 - `${x.value + y.value}` - Add two artifact values 

19 - `${x.value * 2}` - Multiply 

20 - `${x.value - y.value}` - Subtract 

21 

22Decimal Arithmetic (Avoiding Floats): 

23 Per the Strict Numeric Policy, native float types are forbidden in cacheable data 

24 because IEEE 754 floats are non-deterministic across architectures. Use the `decimal()` 

25 function for fractional arithmetic: 

26 

27 - `${decimal("3.14")}` - Create a Decimal from a string 

28 - `${decimal("1.5") + decimal("2.5")}` - Decimal arithmetic (returns Decimal) 

29 - `${decimal(x.value)}` - Convert an integer to Decimal 

30 - `${decimal("3.14") * 2}` - Decimal multiplication 

31 

32Built-in Functions: 

33 - `min(a, b)` - Returns the minimum of two comparable values 

34 - `max(a, b)` - Returns the maximum of two comparable values 

35 - `decimal(value)` - Converts a value (int, string, or Decimal) to Decimal 

36 

37 Examples: 

38 - `${min(x, y)}` - Returns the artifact with the smaller value 

39 - `${min(x.value, y.value)}` - Returns the smaller numeric value 

40 - `${max(x.value, 10)}` - Returns the larger of x.value or 10 

41 

42String Interpolation: 

43 - `"prefix_${x.value}_suffix"` - Mixed text and expressions 

44 - `"Result: ${x.value + y.value}"` - String concatenation with expressions 

45 

46Error Cases: 

47 - Expressions that return float (double) values raise ValueError. Use `decimal()` 

48 for fractional arithmetic. 

49 - References to undeclared dependencies raise ValueError. 

50 - Invalid expression syntax raises ValueError with details. 

51 

52Examples 

53-------- 

54 

55Basic variable access: 

56 >>> params = {"width": "${background}"} 

57 >>> deps = {"background": 100} 

58 >>> resolve_params(params, deps) 

59 {"width": 100} 

60 

61Arithmetic with multiple dependencies: 

62 >>> params = {"sum": "${x + y}"} 

63 >>> deps = {"x": 3, "y": 7} 

64 >>> resolve_params(params, deps) 

65 {"sum": 10} 

66 

67Decimal arithmetic (avoiding floats): 

68 >>> params = {"result": "${decimal(\"3.14\") + decimal(\"2.86\")}"} 

69 >>> deps = {} 

70 >>> resolve_params(params, deps) 

71 {"result": Decimal("6.00")} 

72 

73Using min/max for canonicalization: 

74 >>> params = {"a": "${min(x, y)}", "b": "${max(x, y)}"} 

75 >>> deps = {"x": 7, "y": 3} 

76 >>> resolved = resolve_params(params, deps) 

77 >>> resolved["a"] # Returns 3 (the smaller value) 

78 3 

79 >>> resolved["b"] # Returns 7 (the larger value) 

80 7 

81 

82String interpolation: 

83 >>> params = {"message": "Width is ${width}px"} 

84 >>> deps = {"width": 200} 

85 >>> resolve_params(params, deps) 

86 {"message": "Width is 200px"} 

87""" 

88 

89import re 

90from decimal import Decimal 

91from typing import Any 

92 

93import celpy 

94import celpy.celparser 

95import celpy.celtypes as celtypes 

96import celpy.evaluation 

97 

98from invariant.params import cel, ref 

99from invariant.protocol import ICacheable 

100 

101 

102def resolve_params( 

103 params: dict[str, Any], dependencies: dict[str, Any] 

104) -> dict[str, Any]: 

105 """Resolve ${...} CEL expressions in parameter values. 

106 

107 Walks through the params dictionary, finds values containing ${...} delimiters, 

108 evaluates the CEL expression with dependency artifacts exposed as variables, 

109 and replaces the expression with the resolved value. 

110 

111 Args: 

112 params: Dictionary of parameter name -> value. Values may contain ${...} expressions. 

113 dependencies: Dictionary mapping dependency IDs to their artifacts (native types or ICacheable). 

114 These are exposed as variables in CEL expressions. 

115 

116 Returns: 

117 Dictionary with all ${...} expressions resolved to their evaluated values. 

118 

119 Raises: 

120 ValueError: If expression evaluation fails, undeclared dependencies are referenced, 

121 or expression result is a float (double). 

122 """ 

123 resolved = {} 

124 for key, value in params.items(): 

125 resolved[key] = _resolve_value(value, dependencies) 

126 return resolved 

127 

128 

129def _resolve_value(value: Any, dependencies: dict[str, Any]) -> Any: 

130 """Recursively resolve expressions in a value. 

131 

132 Handles ref() markers (artifact passthrough), cel() markers (CEL expressions), 

133 ${...} string interpolation, and nested structures (dicts, lists). 

134 

135 Args: 

136 value: The value to resolve. May be: 

137 - ref(dep): Resolves to the artifact from dependency (native type or ICacheable) 

138 - cel(expr): Evaluates CEL expression and returns computed value 

139 - str with ${...}: String interpolation with CEL expressions 

140 - dict/list: Recursively resolves nested values 

141 - Other: Returns as-is 

142 dependencies: Dictionary of dependency artifacts (native types or ICacheable). 

143 

144 Returns: 

145 Resolved value with all markers and expressions evaluated. 

146 

147 Raises: 

148 ValueError: If ref() references undeclared dependency. 

149 """ 

150 # Handle ref() marker - artifact passthrough 

151 if isinstance(value, ref): 

152 if value.dep not in dependencies: 

153 raise ValueError( 

154 f"ref('{value.dep}') references undeclared dependency '{value.dep}'. " 

155 f"Available dependencies: {list(dependencies.keys())}" 

156 ) 

157 return dependencies[value.dep] 

158 

159 # Handle cel() marker - CEL expression evaluation 

160 if isinstance(value, cel): 

161 return _evaluate_cel(value.expr, dependencies) 

162 

163 # Handle string with ${...} interpolation 

164 if isinstance(value, str): 

165 if "${" in value and "}" in value: 

166 # Extract and evaluate CEL expression 

167 return _evaluate_expression(value, dependencies) 

168 return value 

169 

170 # Handle nested structures 

171 if isinstance(value, dict): 

172 return {k: _resolve_value(v, dependencies) for k, v in value.items()} 

173 if isinstance(value, list): 

174 return [_resolve_value(item, dependencies) for item in value] 

175 

176 # Primitive value, return as-is 

177 return value 

178 

179 

180def _evaluate_expression(expr_string: str, dependencies: dict[str, Any]) -> Any: 

181 """Evaluate a CEL expression string. 

182 

183 Args: 

184 expr_string: String potentially containing ${...} expressions. 

185 dependencies: Dictionary of dependency artifacts. 

186 

187 Returns: 

188 Resolved value after evaluating all expressions. 

189 

190 Raises: 

191 ValueError: If expression is invalid or references undeclared dependencies. 

192 """ 

193 # Find all ${...} expressions 

194 pattern = r"\$\{([^}]+)\}" 

195 matches = re.findall(pattern, expr_string) 

196 

197 if not matches: 

198 # No expressions found, return as-is 

199 return expr_string 

200 

201 if len(matches) == 1 and expr_string.strip() == f"${{{matches[0]}}}": 

202 # Single expression covering the entire string - evaluate it 

203 cel_expr = matches[0].strip() 

204 return _evaluate_cel(cel_expr, dependencies) 

205 else: 

206 # Multiple expressions or mixed with text - evaluate each and substitute 

207 result = expr_string 

208 for match in matches: 

209 cel_expr = match.strip() 

210 evaluated = _evaluate_cel(cel_expr, dependencies) 

211 # Convert to string for substitution 

212 result = result.replace(f"${{{match}}}", str(evaluated)) 

213 return result 

214 

215 

216def _evaluate_cel(expression: str, dependencies: dict[str, Any]) -> Any: 

217 """Evaluate a single CEL expression. 

218 

219 Args: 

220 expression: The CEL expression to evaluate (without ${...} delimiters). 

221 dependencies: Dictionary of dependency artifacts (native types or ICacheable). 

222 

223 Returns: 

224 The evaluated result. Must be a type that can be hashed (no floats). 

225 

226 Raises: 

227 ValueError: If expression is invalid, references undeclared deps, or returns float. 

228 """ 

229 # Build CEL environment 

230 env = celpy.Environment() 

231 

232 # Convert dependencies to CEL-compatible types 

233 var_declarations: dict[str, Any] = {} 

234 for dep_id, artifact in dependencies.items(): 

235 # Convert artifact to CEL-compatible type 

236 cel_value = _value_to_cel(artifact) 

237 var_declarations[dep_id] = cel_value 

238 

239 # Compile the expression 

240 try: 

241 ast = env.compile(expression) 

242 except celpy.celparser.CELParseError as e: 

243 raise ValueError(f"Failed to parse CEL expression '{expression}': {e}") from e 

244 

245 # Register custom functions: decimal(), min(), max() 

246 custom_functions = { 

247 "decimal": _decimal_function, 

248 "min": _min_function, 

249 "max": _max_function, 

250 } 

251 

252 # Create program with custom functions 

253 program = env.program(ast, functions=custom_functions) 

254 

255 # Create activation with variables 

256 activation: dict[str, Any] = {} 

257 for var_name, var_value in var_declarations.items(): 

258 activation[var_name] = var_value 

259 

260 # Evaluate 

261 try: 

262 result = program.evaluate(activation) 

263 except celpy.evaluation.CELEvalError as e: 

264 raise ValueError( 

265 f"Failed to evaluate CEL expression '{expression}': {e}" 

266 ) from e 

267 

268 # Check for float results (forbidden per Strict Numeric Policy) 

269 if isinstance(result, float): 

270 raise ValueError( 

271 f"Expression '{expression}' returned a float (double), which is forbidden. " 

272 f"Use decimal() for fractional arithmetic." 

273 ) 

274 

275 # If result is a MapType (from ICacheable domain type), extract the value for simple variable references 

276 if isinstance(result, celtypes.MapType): 

277 # Check if this is a simple variable reference like ${x} 

278 # If the map has a 'value' key, extract it 

279 value_key = celtypes.StringType("value") 

280 if value_key in result: 

281 result = result[value_key] 

282 else: 

283 # Return the map as-is (might be used in further expressions) 

284 return _cel_to_python(result, expression) 

285 

286 # Convert result to appropriate Python type 

287 return _cel_to_python(result, expression) 

288 

289 

290def _value_to_cel(value: Any) -> Any: 

291 """Convert a value (native type or ICacheable) to a CEL-compatible type. 

292 

293 Native types are exposed directly. ICacheable domain types are converted to MapType 

294 for field access. 

295 

296 Args: 

297 value: The value to convert (int, str, Decimal, dict, list, ICacheable, etc.). 

298 

299 Returns: 

300 CEL-compatible value (IntType, StringType, MapType, etc.). 

301 """ 

302 # Native types - expose directly 

303 if isinstance(value, int): 

304 return celtypes.IntType(value) 

305 if isinstance(value, str): 

306 return celtypes.StringType(value) 

307 if isinstance(value, bool): 

308 return celtypes.BoolType(value) 

309 if isinstance(value, Decimal): 

310 # Decimal is passed through as-is (CEL doesn't have native Decimal) 

311 return value 

312 if isinstance(value, dict): 

313 # Convert dict to CEL MapType recursively 

314 result: dict[celtypes.StringType, Any] = {} 

315 for key, val in value.items(): 

316 result[celtypes.StringType(key)] = _value_to_cel(val) 

317 return celtypes.MapType(result) 

318 if isinstance(value, (list, tuple)): 

319 # Convert list/tuple to CEL ListType recursively 

320 return celtypes.ListType([_value_to_cel(item) for item in value]) 

321 if value is None: 

322 # None is represented as null in CEL 

323 return None 

324 

325 # ICacheable domain types - convert to MapType for field access 

326 if isinstance(value, ICacheable): 

327 return _icacheable_to_cel_map(value) 

328 

329 # Fallback: convert to string 

330 return celtypes.StringType(str(value)) 

331 

332 

333def _icacheable_to_cel_map(artifact: ICacheable) -> celtypes.MapType: 

334 """Convert an ICacheable domain type to a CEL-compatible MapType. 

335 

336 Exposes all public attributes of the artifact for field access. 

337 

338 Args: 

339 artifact: The ICacheable artifact to convert. 

340 

341 Returns: 

342 MapType representation suitable for CEL evaluation. 

343 """ 

344 result: dict[celtypes.StringType, Any] = {} 

345 

346 # Expose all public attributes 

347 for attr_name in dir(artifact): 

348 if not attr_name.startswith("_") and not callable( 

349 getattr(artifact, attr_name, None) 

350 ): 

351 try: 

352 attr_value = getattr(artifact, attr_name) 

353 # Convert attribute value to CEL type recursively 

354 result[celtypes.StringType(attr_name)] = _value_to_cel(attr_value) 

355 except AttributeError: 

356 pass 

357 

358 return celtypes.MapType(result) 

359 

360 

361def _cel_to_python(cel_value: Any, expression: str) -> Any: 

362 """Convert a CEL value to a Python type. 

363 

364 Handles conversion of CEL types (IntType, StringType, etc.) to Python types. 

365 

366 Args: 

367 cel_value: The value returned from CEL evaluation. 

368 expression: The original expression (for error messages). 

369 

370 Returns: 

371 Python value (int, str, Decimal, ICacheable, etc.) 

372 

373 Raises: 

374 ValueError: If value is a float (forbidden). 

375 """ 

376 # Handle CEL types 

377 if isinstance(cel_value, celtypes.IntType): 

378 return int(cel_value) 

379 elif isinstance(cel_value, celtypes.StringType): 

380 return str(cel_value) 

381 elif isinstance(cel_value, celtypes.BoolType): 

382 return bool(cel_value) 

383 elif isinstance(cel_value, float): 

384 # This should have been caught earlier, but double-check 

385 raise ValueError( 

386 f"Expression '{expression}' returned float, which is forbidden. " 

387 f"Use decimal() for fractional values." 

388 ) 

389 elif isinstance(cel_value, Decimal): 

390 return cel_value 

391 elif isinstance(cel_value, celtypes.MapType): 

392 # Convert MapType to dict 

393 result_dict: dict[str, Any] = {} 

394 for key, value in cel_value.items(): 

395 if isinstance(key, celtypes.StringType): 

396 result_dict[str(key)] = _cel_to_python(value, expression) 

397 else: 

398 result_dict[str(key)] = _cel_to_python(value, expression) 

399 return result_dict 

400 elif isinstance(cel_value, (int, str, bool)): 

401 # Already Python types 

402 return cel_value 

403 elif isinstance(cel_value, dict): 

404 # Plain dict - convert recursively 

405 return {k: _cel_to_python(v, expression) for k, v in cel_value.items()} 

406 elif isinstance(cel_value, list): 

407 # Plain list - convert recursively 

408 return [_cel_to_python(item, expression) for item in cel_value] 

409 else: 

410 # Unknown type - return as-is (might be a CEL-specific type or already Python) 

411 return cel_value 

412 

413 

414# Custom CEL functions 

415def _decimal_function(value: Any) -> Decimal: 

416 """CEL function: decimal(value) - constructs a Decimal from int, string, or Decimal. 

417 

418 Args: 

419 value: Can be IntType, StringType, int, str, Decimal, or MapType with value field. 

420 

421 Returns: 

422 Decimal value. 

423 

424 Raises: 

425 ValueError: If value cannot be converted to Decimal. 

426 """ 

427 # Handle CEL types 

428 if isinstance(value, celtypes.IntType): 

429 return Decimal(str(int(value))) 

430 elif isinstance(value, celtypes.StringType): 

431 return Decimal(str(value)) 

432 elif isinstance(value, celtypes.MapType): 

433 # Extract value from MapType 

434 value_key = celtypes.StringType("value") 

435 if value_key in value: 

436 return _decimal_function(value[value_key]) 

437 else: 

438 raise ValueError("Cannot extract value from MapType for decimal conversion") 

439 # Handle Python types 

440 elif isinstance(value, Decimal): 

441 return value 

442 elif isinstance(value, int): 

443 return Decimal(str(value)) 

444 elif isinstance(value, str): 

445 return Decimal(value) 

446 elif isinstance(value, ICacheable) and hasattr(value, "value"): 

447 # Extract value from ICacheable 

448 return Decimal(str(value.value)) 

449 else: 

450 raise ValueError(f"Cannot convert {type(value)} to Decimal") 

451 

452 

453def _min_function(a: Any, b: Any) -> Any: 

454 """CEL function: min(a, b) - returns the minimum of two comparable values. 

455 

456 Handles both direct values and MapType artifacts (extracts .value for comparison). 

457 

458 Args: 

459 a: First value (can be IntType, StringType, MapType, etc.) 

460 b: Second value (can be IntType, StringType, MapType, etc.) 

461 

462 Returns: 

463 The minimum of a and b. If both are MapType, extracts .value for comparison. 

464 """ 

465 # Handle MapType arguments - extract value for comparison 

466 a_val = _extract_comparison_value(a) 

467 b_val = _extract_comparison_value(b) 

468 

469 # Compare and return the original type (not the extracted value) 

470 if a_val < b_val: 

471 return a 

472 return b 

473 

474 

475def _max_function(a: Any, b: Any) -> Any: 

476 """CEL function: max(a, b) - returns the maximum of two comparable values. 

477 

478 Handles both direct values and MapType artifacts (extracts .value for comparison). 

479 

480 Args: 

481 a: First value (can be IntType, StringType, MapType, etc.) 

482 b: Second value (can be IntType, StringType, MapType, etc.) 

483 

484 Returns: 

485 The maximum of a and b. If both are MapType, extracts .value for comparison. 

486 """ 

487 # Handle MapType arguments - extract value for comparison 

488 a_val = _extract_comparison_value(a) 

489 b_val = _extract_comparison_value(b) 

490 

491 # Compare and return the original type (not the extracted value) 

492 if a_val > b_val: 

493 return a 

494 return b 

495 

496 

497def _extract_comparison_value(value: Any) -> Any: 

498 """Extract a comparable value from a CEL type for min/max comparison. 

499 

500 Args: 

501 value: Can be MapType, IntType, StringType, or other comparable type. 

502 

503 Returns: 

504 A value suitable for comparison (<, >, etc.) 

505 """ 

506 if isinstance(value, celtypes.MapType): 

507 # Extract .value field from MapType 

508 value_key = celtypes.StringType("value") 

509 if value_key in value: 

510 return _extract_comparison_value(value[value_key]) 

511 else: 

512 # If no value field, try to compare the map itself (not ideal but works) 

513 return value 

514 elif isinstance(value, celtypes.IntType): 

515 return int(value) 

516 elif isinstance(value, celtypes.StringType): 

517 return str(value) 

518 elif isinstance(value, celtypes.BoolType): 

519 return bool(value) 

520 else: 

521 # Already a comparable Python type 

522 return value