Coverage for src / invariant / expressions.py: 93.57%
171 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 10:21 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 10:21 +0100
1"""CEL expression engine for evaluating ${...} expressions in node parameters.
3This module provides expression evaluation using the Common Expression Language (CEL).
4Expressions are embedded in parameter values using ${...} delimiters and are evaluated
5with dependency artifacts exposed as variables.
7Expression Syntax
8-----------------
10Variable References:
11 - `${x}` - References the entire artifact. For artifacts with a `.value` attribute,
12 this returns the value directly.
13 - `${x.value}` - Accesses the `value` field of artifact `x`. This is the most common
14 pattern for accessing numeric or string values from artifacts.
16Integer Arithmetic:
17 - `${x.value + 1}` - Add integers
18 - `${x.value + y.value}` - Add two artifact values
19 - `${x.value * 2}` - Multiply
20 - `${x.value - y.value}` - Subtract
22Decimal Arithmetic (Avoiding Floats):
23 Per the Strict Numeric Policy, native float types are forbidden in cacheable data
24 because IEEE 754 floats are non-deterministic across architectures. Use the `decimal()`
25 function for fractional arithmetic:
27 - `${decimal("3.14")}` - Create a Decimal from a string
28 - `${decimal("1.5") + decimal("2.5")}` - Decimal arithmetic (returns Decimal)
29 - `${decimal(x.value)}` - Convert an integer to Decimal
30 - `${decimal("3.14") * 2}` - Decimal multiplication
32Built-in Functions:
33 - `min(a, b)` - Returns the minimum of two comparable values
34 - `max(a, b)` - Returns the maximum of two comparable values
35 - `decimal(value)` - Converts a value (int, string, or Decimal) to Decimal
37 Examples:
38 - `${min(x, y)}` - Returns the artifact with the smaller value
39 - `${min(x.value, y.value)}` - Returns the smaller numeric value
40 - `${max(x.value, 10)}` - Returns the larger of x.value or 10
42String Interpolation:
43 - `"prefix_${x.value}_suffix"` - Mixed text and expressions
44 - `"Result: ${x.value + y.value}"` - String concatenation with expressions
46Error Cases:
47 - Expressions that return float (double) values raise ValueError. Use `decimal()`
48 for fractional arithmetic.
49 - References to undeclared dependencies raise ValueError.
50 - Invalid expression syntax raises ValueError with details.
52Examples
53--------
55Basic variable access:
56 >>> params = {"width": "${background}"}
57 >>> deps = {"background": 100}
58 >>> resolve_params(params, deps)
59 {"width": 100}
61Arithmetic with multiple dependencies:
62 >>> params = {"sum": "${x + y}"}
63 >>> deps = {"x": 3, "y": 7}
64 >>> resolve_params(params, deps)
65 {"sum": 10}
67Decimal arithmetic (avoiding floats):
68 >>> params = {"result": "${decimal(\"3.14\") + decimal(\"2.86\")}"}
69 >>> deps = {}
70 >>> resolve_params(params, deps)
71 {"result": Decimal("6.00")}
73Using min/max for canonicalization:
74 >>> params = {"a": "${min(x, y)}", "b": "${max(x, y)}"}
75 >>> deps = {"x": 7, "y": 3}
76 >>> resolved = resolve_params(params, deps)
77 >>> resolved["a"] # Returns 3 (the smaller value)
78 3
79 >>> resolved["b"] # Returns 7 (the larger value)
80 7
82String interpolation:
83 >>> params = {"message": "Width is ${width}px"}
84 >>> deps = {"width": 200}
85 >>> resolve_params(params, deps)
86 {"message": "Width is 200px"}
87"""
89import re
90from decimal import Decimal
91from typing import Any
93import celpy
94import celpy.celparser
95import celpy.celtypes as celtypes
96import celpy.evaluation
98from invariant.params import cel, ref
99from invariant.protocol import ICacheable
102def resolve_params(
103 params: dict[str, Any], dependencies: dict[str, Any]
104) -> dict[str, Any]:
105 """Resolve ${...} CEL expressions in parameter values.
107 Walks through the params dictionary, finds values containing ${...} delimiters,
108 evaluates the CEL expression with dependency artifacts exposed as variables,
109 and replaces the expression with the resolved value.
111 Args:
112 params: Dictionary of parameter name -> value. Values may contain ${...} expressions.
113 dependencies: Dictionary mapping dependency IDs to their artifacts (native types or ICacheable).
114 These are exposed as variables in CEL expressions.
116 Returns:
117 Dictionary with all ${...} expressions resolved to their evaluated values.
119 Raises:
120 ValueError: If expression evaluation fails, undeclared dependencies are referenced,
121 or expression result is a float (double).
122 """
123 resolved = {}
124 for key, value in params.items():
125 resolved[key] = _resolve_value(value, dependencies)
126 return resolved
129def _resolve_value(value: Any, dependencies: dict[str, Any]) -> Any:
130 """Recursively resolve expressions in a value.
132 Handles ref() markers (artifact passthrough), cel() markers (CEL expressions),
133 ${...} string interpolation, and nested structures (dicts, lists).
135 Args:
136 value: The value to resolve. May be:
137 - ref(dep): Resolves to the artifact from dependency (native type or ICacheable)
138 - cel(expr): Evaluates CEL expression and returns computed value
139 - str with ${...}: String interpolation with CEL expressions
140 - dict/list: Recursively resolves nested values
141 - Other: Returns as-is
142 dependencies: Dictionary of dependency artifacts (native types or ICacheable).
144 Returns:
145 Resolved value with all markers and expressions evaluated.
147 Raises:
148 ValueError: If ref() references undeclared dependency.
149 """
150 # Handle ref() marker - artifact passthrough
151 if isinstance(value, ref):
152 if value.dep not in dependencies:
153 raise ValueError(
154 f"ref('{value.dep}') references undeclared dependency '{value.dep}'. "
155 f"Available dependencies: {list(dependencies.keys())}"
156 )
157 return dependencies[value.dep]
159 # Handle cel() marker - CEL expression evaluation
160 if isinstance(value, cel):
161 return _evaluate_cel(value.expr, dependencies)
163 # Handle string with ${...} interpolation
164 if isinstance(value, str):
165 if "${" in value and "}" in value:
166 # Extract and evaluate CEL expression
167 return _evaluate_expression(value, dependencies)
168 return value
170 # Handle nested structures
171 if isinstance(value, dict):
172 return {k: _resolve_value(v, dependencies) for k, v in value.items()}
173 if isinstance(value, list):
174 return [_resolve_value(item, dependencies) for item in value]
176 # Primitive value, return as-is
177 return value
180def _evaluate_expression(expr_string: str, dependencies: dict[str, Any]) -> Any:
181 """Evaluate a CEL expression string.
183 Args:
184 expr_string: String potentially containing ${...} expressions.
185 dependencies: Dictionary of dependency artifacts.
187 Returns:
188 Resolved value after evaluating all expressions.
190 Raises:
191 ValueError: If expression is invalid or references undeclared dependencies.
192 """
193 # Find all ${...} expressions
194 pattern = r"\$\{([^}]+)\}"
195 matches = re.findall(pattern, expr_string)
197 if not matches:
198 # No expressions found, return as-is
199 return expr_string
201 if len(matches) == 1 and expr_string.strip() == f"${{{matches[0]}}}":
202 # Single expression covering the entire string - evaluate it
203 cel_expr = matches[0].strip()
204 return _evaluate_cel(cel_expr, dependencies)
205 else:
206 # Multiple expressions or mixed with text - evaluate each and substitute
207 result = expr_string
208 for match in matches:
209 cel_expr = match.strip()
210 evaluated = _evaluate_cel(cel_expr, dependencies)
211 # Convert to string for substitution
212 result = result.replace(f"${{{match}}}", str(evaluated))
213 return result
216def _evaluate_cel(expression: str, dependencies: dict[str, Any]) -> Any:
217 """Evaluate a single CEL expression.
219 Args:
220 expression: The CEL expression to evaluate (without ${...} delimiters).
221 dependencies: Dictionary of dependency artifacts (native types or ICacheable).
223 Returns:
224 The evaluated result. Must be a type that can be hashed (no floats).
226 Raises:
227 ValueError: If expression is invalid, references undeclared deps, or returns float.
228 """
229 # Build CEL environment
230 env = celpy.Environment()
232 # Convert dependencies to CEL-compatible types
233 var_declarations: dict[str, Any] = {}
234 for dep_id, artifact in dependencies.items():
235 # Convert artifact to CEL-compatible type
236 cel_value = _value_to_cel(artifact)
237 var_declarations[dep_id] = cel_value
239 # Compile the expression
240 try:
241 ast = env.compile(expression)
242 except celpy.celparser.CELParseError as e:
243 raise ValueError(f"Failed to parse CEL expression '{expression}': {e}") from e
245 # Register custom functions: decimal(), min(), max()
246 custom_functions = {
247 "decimal": _decimal_function,
248 "min": _min_function,
249 "max": _max_function,
250 }
252 # Create program with custom functions
253 program = env.program(ast, functions=custom_functions)
255 # Create activation with variables
256 activation: dict[str, Any] = {}
257 for var_name, var_value in var_declarations.items():
258 activation[var_name] = var_value
260 # Evaluate
261 try:
262 result = program.evaluate(activation)
263 except celpy.evaluation.CELEvalError as e:
264 raise ValueError(
265 f"Failed to evaluate CEL expression '{expression}': {e}"
266 ) from e
268 # Check for float results (forbidden per Strict Numeric Policy)
269 if isinstance(result, float):
270 raise ValueError(
271 f"Expression '{expression}' returned a float (double), which is forbidden. "
272 f"Use decimal() for fractional arithmetic."
273 )
275 # If result is a MapType (from ICacheable domain type), extract the value for simple variable references
276 if isinstance(result, celtypes.MapType):
277 # Check if this is a simple variable reference like ${x}
278 # If the map has a 'value' key, extract it
279 value_key = celtypes.StringType("value")
280 if value_key in result:
281 result = result[value_key]
282 else:
283 # Return the map as-is (might be used in further expressions)
284 return _cel_to_python(result, expression)
286 # Convert result to appropriate Python type
287 return _cel_to_python(result, expression)
290def _value_to_cel(value: Any) -> Any:
291 """Convert a value (native type or ICacheable) to a CEL-compatible type.
293 Native types are exposed directly. ICacheable domain types are converted to MapType
294 for field access.
296 Args:
297 value: The value to convert (int, str, Decimal, dict, list, ICacheable, etc.).
299 Returns:
300 CEL-compatible value (IntType, StringType, MapType, etc.).
301 """
302 # Native types - expose directly
303 if isinstance(value, int):
304 return celtypes.IntType(value)
305 if isinstance(value, str):
306 return celtypes.StringType(value)
307 if isinstance(value, bool):
308 return celtypes.BoolType(value)
309 if isinstance(value, Decimal):
310 # Decimal is passed through as-is (CEL doesn't have native Decimal)
311 return value
312 if isinstance(value, dict):
313 # Convert dict to CEL MapType recursively
314 result: dict[celtypes.StringType, Any] = {}
315 for key, val in value.items():
316 result[celtypes.StringType(key)] = _value_to_cel(val)
317 return celtypes.MapType(result)
318 if isinstance(value, (list, tuple)):
319 # Convert list/tuple to CEL ListType recursively
320 return celtypes.ListType([_value_to_cel(item) for item in value])
321 if value is None:
322 # None is represented as null in CEL
323 return None
325 # ICacheable domain types - convert to MapType for field access
326 if isinstance(value, ICacheable):
327 return _icacheable_to_cel_map(value)
329 # Fallback: convert to string
330 return celtypes.StringType(str(value))
333def _icacheable_to_cel_map(artifact: ICacheable) -> celtypes.MapType:
334 """Convert an ICacheable domain type to a CEL-compatible MapType.
336 Exposes all public attributes of the artifact for field access.
338 Args:
339 artifact: The ICacheable artifact to convert.
341 Returns:
342 MapType representation suitable for CEL evaluation.
343 """
344 result: dict[celtypes.StringType, Any] = {}
346 # Expose all public attributes
347 for attr_name in dir(artifact):
348 if not attr_name.startswith("_") and not callable(
349 getattr(artifact, attr_name, None)
350 ):
351 try:
352 attr_value = getattr(artifact, attr_name)
353 # Convert attribute value to CEL type recursively
354 result[celtypes.StringType(attr_name)] = _value_to_cel(attr_value)
355 except AttributeError:
356 pass
358 return celtypes.MapType(result)
361def _cel_to_python(cel_value: Any, expression: str) -> Any:
362 """Convert a CEL value to a Python type.
364 Handles conversion of CEL types (IntType, StringType, etc.) to Python types.
366 Args:
367 cel_value: The value returned from CEL evaluation.
368 expression: The original expression (for error messages).
370 Returns:
371 Python value (int, str, Decimal, ICacheable, etc.)
373 Raises:
374 ValueError: If value is a float (forbidden).
375 """
376 # Handle CEL types
377 if isinstance(cel_value, celtypes.IntType):
378 return int(cel_value)
379 elif isinstance(cel_value, celtypes.StringType):
380 return str(cel_value)
381 elif isinstance(cel_value, celtypes.BoolType):
382 return bool(cel_value)
383 elif isinstance(cel_value, float):
384 # This should have been caught earlier, but double-check
385 raise ValueError(
386 f"Expression '{expression}' returned float, which is forbidden. "
387 f"Use decimal() for fractional values."
388 )
389 elif isinstance(cel_value, Decimal):
390 return cel_value
391 elif isinstance(cel_value, celtypes.MapType):
392 # Convert MapType to dict
393 result_dict: dict[str, Any] = {}
394 for key, value in cel_value.items():
395 if isinstance(key, celtypes.StringType):
396 result_dict[str(key)] = _cel_to_python(value, expression)
397 else:
398 result_dict[str(key)] = _cel_to_python(value, expression)
399 return result_dict
400 elif isinstance(cel_value, (int, str, bool)):
401 # Already Python types
402 return cel_value
403 elif isinstance(cel_value, dict):
404 # Plain dict - convert recursively
405 return {k: _cel_to_python(v, expression) for k, v in cel_value.items()}
406 elif isinstance(cel_value, list):
407 # Plain list - convert recursively
408 return [_cel_to_python(item, expression) for item in cel_value]
409 else:
410 # Unknown type - return as-is (might be a CEL-specific type or already Python)
411 return cel_value
414# Custom CEL functions
415def _decimal_function(value: Any) -> Decimal:
416 """CEL function: decimal(value) - constructs a Decimal from int, string, or Decimal.
418 Args:
419 value: Can be IntType, StringType, int, str, Decimal, or MapType with value field.
421 Returns:
422 Decimal value.
424 Raises:
425 ValueError: If value cannot be converted to Decimal.
426 """
427 # Handle CEL types
428 if isinstance(value, celtypes.IntType):
429 return Decimal(str(int(value)))
430 elif isinstance(value, celtypes.StringType):
431 return Decimal(str(value))
432 elif isinstance(value, celtypes.MapType):
433 # Extract value from MapType
434 value_key = celtypes.StringType("value")
435 if value_key in value:
436 return _decimal_function(value[value_key])
437 else:
438 raise ValueError("Cannot extract value from MapType for decimal conversion")
439 # Handle Python types
440 elif isinstance(value, Decimal):
441 return value
442 elif isinstance(value, int):
443 return Decimal(str(value))
444 elif isinstance(value, str):
445 return Decimal(value)
446 elif isinstance(value, ICacheable) and hasattr(value, "value"):
447 # Extract value from ICacheable
448 return Decimal(str(value.value))
449 else:
450 raise ValueError(f"Cannot convert {type(value)} to Decimal")
453def _min_function(a: Any, b: Any) -> Any:
454 """CEL function: min(a, b) - returns the minimum of two comparable values.
456 Handles both direct values and MapType artifacts (extracts .value for comparison).
458 Args:
459 a: First value (can be IntType, StringType, MapType, etc.)
460 b: Second value (can be IntType, StringType, MapType, etc.)
462 Returns:
463 The minimum of a and b. If both are MapType, extracts .value for comparison.
464 """
465 # Handle MapType arguments - extract value for comparison
466 a_val = _extract_comparison_value(a)
467 b_val = _extract_comparison_value(b)
469 # Compare and return the original type (not the extracted value)
470 if a_val < b_val:
471 return a
472 return b
475def _max_function(a: Any, b: Any) -> Any:
476 """CEL function: max(a, b) - returns the maximum of two comparable values.
478 Handles both direct values and MapType artifacts (extracts .value for comparison).
480 Args:
481 a: First value (can be IntType, StringType, MapType, etc.)
482 b: Second value (can be IntType, StringType, MapType, etc.)
484 Returns:
485 The maximum of a and b. If both are MapType, extracts .value for comparison.
486 """
487 # Handle MapType arguments - extract value for comparison
488 a_val = _extract_comparison_value(a)
489 b_val = _extract_comparison_value(b)
491 # Compare and return the original type (not the extracted value)
492 if a_val > b_val:
493 return a
494 return b
497def _extract_comparison_value(value: Any) -> Any:
498 """Extract a comparable value from a CEL type for min/max comparison.
500 Args:
501 value: Can be MapType, IntType, StringType, or other comparable type.
503 Returns:
504 A value suitable for comparison (<, >, etc.)
505 """
506 if isinstance(value, celtypes.MapType):
507 # Extract .value field from MapType
508 value_key = celtypes.StringType("value")
509 if value_key in value:
510 return _extract_comparison_value(value[value_key])
511 else:
512 # If no value field, try to compare the map itself (not ideal but works)
513 return value
514 elif isinstance(value, celtypes.IntType):
515 return int(value)
516 elif isinstance(value, celtypes.StringType):
517 return str(value)
518 elif isinstance(value, celtypes.BoolType):
519 return bool(value)
520 else:
521 # Already a comparable Python type
522 return value