Coverage for little_loops / fsm / validation.py: 19%
149 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:20 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:20 -0500
1"""FSM loop validation logic.
3This module provides validation for FSM loop definitions, ensuring
4structural correctness and catching common configuration errors.
6Validation checks:
7- Initial state exists in states dict
8- All referenced states exist
9- At least one terminal state
10- Evaluator configs have required fields for their type
11- No conflicting routing (shorthand vs full route)
12- Numeric fields in valid ranges (max_iterations > 0, backoff >= 0, timeout > 0)
13"""
15from __future__ import annotations
17import logging
18from collections import deque
19from dataclasses import dataclass
20from enum import Enum
21from pathlib import Path
22from typing import Any
24import yaml
26from little_loops.fsm.schema import EvaluateConfig, FSMLoop, StateConfig
28logger = logging.getLogger(__name__)
31class ValidationSeverity(Enum):
32 """Severity level for validation issues."""
34 ERROR = "error"
35 WARNING = "warning"
38@dataclass
39class ValidationError:
40 """Structured validation error.
42 Attributes:
43 message: Human-readable error description
44 path: Path to the problematic element (e.g., "states.check.route")
45 severity: Error severity (error or warning)
46 """
48 message: str
49 path: str | None = None
50 severity: ValidationSeverity = ValidationSeverity.ERROR
52 def __str__(self) -> str:
53 """Format error for display."""
54 prefix = f"[{self.severity.value.upper()}]"
55 if self.path:
56 return f"{prefix} {self.path}: {self.message}"
57 return f"{prefix} {self.message}"
60# Evaluator type to required fields mapping
61EVALUATOR_REQUIRED_FIELDS: dict[str, list[str]] = {
62 "exit_code": [],
63 "output_numeric": ["operator", "target"],
64 "output_json": ["path", "operator", "target"],
65 "output_contains": ["pattern"],
66 "convergence": ["target"],
67 "diff_stall": [],
68 "llm_structured": [],
69 "mcp_result": [],
70}
72# Valid comparison operators
73VALID_OPERATORS = {"eq", "ne", "lt", "le", "gt", "ge"}
75# All top-level keys recognized by FSMLoop.from_dict()
76KNOWN_TOP_LEVEL_KEYS: frozenset[str] = frozenset(
77 {
78 "name",
79 "description",
80 "initial",
81 "states",
82 "context",
83 "scope",
84 "max_iterations",
85 "backoff",
86 "timeout",
87 "maintain",
88 "llm",
89 "on_handoff",
90 "input_key",
91 }
92)
95def _validate_evaluator(state_name: str, evaluate: EvaluateConfig) -> list[ValidationError]:
96 """Validate evaluator configuration for type-specific requirements.
98 Args:
99 state_name: Name of the state containing this evaluator
100 evaluate: The evaluator configuration to validate
102 Returns:
103 List of validation errors found
104 """
105 errors: list[ValidationError] = []
106 path = f"states.{state_name}.evaluate"
108 # Check required fields for evaluator type
109 required = EVALUATOR_REQUIRED_FIELDS.get(evaluate.type, [])
110 for field_name in required:
111 value = getattr(evaluate, field_name, None)
112 if value is None:
113 errors.append(
114 ValidationError(
115 message=f"Evaluator type '{evaluate.type}' requires '{field_name}' field",
116 path=path,
117 )
118 )
120 # Validate operator if present
121 if evaluate.operator is not None and evaluate.operator not in VALID_OPERATORS:
122 errors.append(
123 ValidationError(
124 message=f"Invalid operator '{evaluate.operator}'. "
125 f"Must be one of: {', '.join(sorted(VALID_OPERATORS))}",
126 path=f"{path}.operator",
127 )
128 )
130 # Validate convergence-specific fields
131 if evaluate.type == "convergence":
132 if evaluate.direction not in ("minimize", "maximize"):
133 errors.append(
134 ValidationError(
135 message=f"Invalid direction '{evaluate.direction}'. "
136 "Must be 'minimize' or 'maximize'",
137 path=f"{path}.direction",
138 )
139 )
140 # Only validate tolerance if it's a numeric value (not an interpolation string)
141 if (
142 evaluate.tolerance is not None
143 and isinstance(evaluate.tolerance, (int, float))
144 and evaluate.tolerance < 0
145 ):
146 errors.append(
147 ValidationError(
148 message="Tolerance cannot be negative",
149 path=f"{path}.tolerance",
150 )
151 )
153 # Validate llm_structured-specific fields
154 if evaluate.type == "llm_structured":
155 if evaluate.min_confidence < 0 or evaluate.min_confidence > 1:
156 errors.append(
157 ValidationError(
158 message="min_confidence must be between 0 and 1",
159 path=f"{path}.min_confidence",
160 )
161 )
163 # Validate diff_stall-specific fields
164 if evaluate.type == "diff_stall":
165 if evaluate.max_stall < 1:
166 errors.append(
167 ValidationError(
168 message="max_stall must be >= 1",
169 path=f"{path}.max_stall",
170 )
171 )
173 return errors
176def _validate_state_action(state_name: str, state: StateConfig) -> list[ValidationError]:
177 """Validate state action configuration.
179 Args:
180 state_name: Name of the state to validate
181 state: The state configuration to validate
183 Returns:
184 List of validation errors found
185 """
186 errors: list[ValidationError] = []
187 path = f"states.{state_name}"
189 # params field is only valid for mcp_tool states
190 if state.params and state.action_type != "mcp_tool":
191 errors.append(
192 ValidationError(
193 message="'params' field is only valid when action_type is 'mcp_tool'",
194 path=f"{path}.params",
195 )
196 )
198 # loop and action are mutually exclusive
199 if state.loop is not None and state.action is not None:
200 errors.append(
201 ValidationError(
202 message="'loop' and 'action' are mutually exclusive — "
203 "a sub-loop state cannot also have an action",
204 path=f"{path}",
205 )
206 )
208 return errors
211def _validate_state_routing(state_name: str, state: StateConfig) -> list[ValidationError]:
212 """Validate state routing configuration.
214 Checks for conflicting routing definitions (shorthand vs full route).
216 Args:
217 state_name: Name of the state to validate
218 state: The state configuration to validate
220 Returns:
221 List of validation errors/warnings found
222 """
223 errors: list[ValidationError] = []
224 path = f"states.{state_name}"
226 has_shorthand = (
227 state.on_yes is not None
228 or state.on_no is not None
229 or state.on_error is not None
230 or state.on_partial is not None
231 )
232 has_route = state.route is not None
234 # Warn about conflicting definitions
235 if has_shorthand and has_route:
236 errors.append(
237 ValidationError(
238 message="Both shorthand routing (on_yes/on_no/on_error) "
239 "and full route table defined. Route table will take precedence.",
240 path=path,
241 severity=ValidationSeverity.WARNING,
242 )
243 )
245 # Check for no valid transition definition
246 has_next = state.next is not None
247 has_terminal = state.terminal
248 has_loop = state.loop is not None
250 if not has_shorthand and not has_route and not has_next and not has_terminal and not has_loop:
251 errors.append(
252 ValidationError(
253 message="State has no transition defined. Add routing, 'next', "
254 "or mark as 'terminal: true'",
255 path=path,
256 )
257 )
259 # Validate retry field pairing: max_retries requires on_retry_exhausted and vice versa
260 if state.max_retries is not None and state.on_retry_exhausted is None:
261 errors.append(
262 ValidationError(
263 message="'max_retries' requires 'on_retry_exhausted' to also be set",
264 path=path,
265 )
266 )
267 if state.on_retry_exhausted is not None and state.max_retries is None:
268 errors.append(
269 ValidationError(
270 message="'on_retry_exhausted' requires 'max_retries' to also be set",
271 path=path,
272 )
273 )
274 if state.max_retries is not None and state.max_retries < 1:
275 errors.append(
276 ValidationError(
277 message=f"'max_retries' must be >= 1, got {state.max_retries}",
278 path=path,
279 )
280 )
282 return errors
285def validate_fsm(fsm: FSMLoop) -> list[ValidationError]:
286 """Validate FSM structure and return list of errors.
288 Performs comprehensive validation:
289 - Initial state exists
290 - All referenced states exist
291 - At least one terminal state
292 - Evaluator configurations are valid
293 - Routing configurations are valid
294 - Numeric fields are in valid ranges (max_iterations > 0, backoff >= 0, timeout > 0)
296 Args:
297 fsm: The FSM loop to validate
299 Returns:
300 List of validation errors (empty if valid)
301 """
302 errors: list[ValidationError] = []
303 defined_states = fsm.get_all_state_names()
305 # Check initial state exists
306 if fsm.initial not in defined_states:
307 errors.append(
308 ValidationError(
309 message=f"Initial state '{fsm.initial}' not found in states",
310 path="initial",
311 )
312 )
314 # Check at least one terminal state
315 terminal_states = fsm.get_terminal_states()
316 if not terminal_states:
317 errors.append(
318 ValidationError(
319 message="No terminal state defined. At least one state must have 'terminal: true'",
320 path="states",
321 )
322 )
324 # Validate each state
325 for state_name, state in fsm.states.items():
326 # Check all referenced states exist
327 refs = state.get_referenced_states()
328 for ref in refs:
329 # $current is a special token for retry
330 if ref != "$current" and ref not in defined_states:
331 errors.append(
332 ValidationError(
333 message=f"References unknown state '{ref}'",
334 path=f"states.{state_name}",
335 )
336 )
338 # Validate action configuration
339 errors.extend(_validate_state_action(state_name, state))
341 # Validate evaluator if present
342 if state.evaluate is not None:
343 errors.extend(_validate_evaluator(state_name, state.evaluate))
345 # Validate routing configuration
346 errors.extend(_validate_state_routing(state_name, state))
348 # Check numeric field ranges
349 if fsm.max_iterations <= 0:
350 errors.append(
351 ValidationError(
352 message=f"max_iterations must be > 0, got {fsm.max_iterations}",
353 path="max_iterations",
354 )
355 )
356 if fsm.backoff is not None and fsm.backoff < 0:
357 errors.append(
358 ValidationError(
359 message=f"backoff must be >= 0, got {fsm.backoff}",
360 path="backoff",
361 )
362 )
363 if fsm.timeout is not None and fsm.timeout <= 0:
364 errors.append(
365 ValidationError(
366 message=f"timeout must be > 0, got {fsm.timeout}",
367 path="timeout",
368 )
369 )
370 if fsm.llm.max_tokens <= 0:
371 errors.append(
372 ValidationError(
373 message=f"llm.max_tokens must be > 0, got {fsm.llm.max_tokens}",
374 path="llm.max_tokens",
375 )
376 )
377 if fsm.llm.timeout <= 0:
378 errors.append(
379 ValidationError(
380 message=f"llm.timeout must be > 0, got {fsm.llm.timeout}",
381 path="llm.timeout",
382 )
383 )
385 # Check for unreachable states (warning only)
386 reachable = _find_reachable_states(fsm)
387 unreachable = defined_states - reachable
388 for state_name in unreachable:
389 errors.append(
390 ValidationError(
391 message="State is not reachable from initial state",
392 path=f"states.{state_name}",
393 severity=ValidationSeverity.WARNING,
394 )
395 )
397 return errors
400def _find_reachable_states(fsm: FSMLoop) -> set[str]:
401 """Find all states reachable from the initial state.
403 Uses breadth-first search to find all reachable states.
405 Args:
406 fsm: The FSM loop to analyze
408 Returns:
409 Set of reachable state names
410 """
411 reachable: set[str] = set()
412 to_visit: deque[str] = deque([fsm.initial])
414 while to_visit:
415 current = to_visit.popleft()
416 if current in reachable or current not in fsm.states:
417 continue
419 reachable.add(current)
420 state = fsm.states[current]
421 refs = state.get_referenced_states()
423 for ref in refs:
424 if ref != "$current" and ref not in reachable:
425 to_visit.append(ref)
427 return reachable
430def load_and_validate(path: Path) -> tuple[FSMLoop, list[ValidationError]]:
431 """Load YAML file and validate FSM structure.
433 Args:
434 path: Path to the YAML file to load
436 Returns:
437 Tuple of (validated FSMLoop instance, list of WARNING-severity ValidationErrors)
439 Raises:
440 FileNotFoundError: If the file doesn't exist
441 yaml.YAMLError: If the file is not valid YAML
442 ValueError: If validation fails (contains error details)
443 """
444 if not path.exists():
445 raise FileNotFoundError(f"FSM file not found: {path}")
447 with open(path) as f:
448 data: dict[str, Any] = yaml.safe_load(f)
450 if not isinstance(data, dict):
451 raise ValueError(f"FSM file must contain a YAML mapping, got {type(data)}")
453 # Check required fields before parsing
454 missing = []
455 for field in ["name", "initial", "states"]:
456 if field not in data:
457 missing.append(field)
459 if missing:
460 raise ValueError(f"FSM file missing required fields: {', '.join(missing)}")
462 # Check for unknown top-level keys before parsing
463 unknown_key_warnings: list[ValidationError] = []
464 unknown = set(data.keys()) - KNOWN_TOP_LEVEL_KEYS
465 if unknown:
466 unknown_key_warnings.append(
467 ValidationError(
468 path="<root>",
469 message=f"Unknown top-level keys: {', '.join(sorted(unknown))}",
470 severity=ValidationSeverity.WARNING,
471 )
472 )
474 # Parse into dataclass
475 fsm = FSMLoop.from_dict(data)
477 # Validate
478 errors = validate_fsm(fsm)
480 # Filter to errors only (not warnings) for raising
481 error_list = [e for e in errors if e.severity == ValidationSeverity.ERROR]
483 if error_list:
484 error_messages = "\n ".join(str(e) for e in error_list)
485 raise ValueError(f"FSM validation failed:\n {error_messages}")
487 # Collect all warnings (unknown-key warnings + structural warnings)
488 struct_warnings = [e for e in errors if e.severity == ValidationSeverity.WARNING]
489 all_warnings = unknown_key_warnings + struct_warnings
490 for warning in all_warnings:
491 logger.warning(str(warning))
493 return fsm, all_warnings