Coverage for little_loops / fsm / validation.py: 19%

149 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:20 -0500

1"""FSM loop validation logic. 

2 

3This module provides validation for FSM loop definitions, ensuring 

4structural correctness and catching common configuration errors. 

5 

6Validation checks: 

7- Initial state exists in states dict 

8- All referenced states exist 

9- At least one terminal state 

10- Evaluator configs have required fields for their type 

11- No conflicting routing (shorthand vs full route) 

12- Numeric fields in valid ranges (max_iterations > 0, backoff >= 0, timeout > 0) 

13""" 

14 

15from __future__ import annotations 

16 

17import logging 

18from collections import deque 

19from dataclasses import dataclass 

20from enum import Enum 

21from pathlib import Path 

22from typing import Any 

23 

24import yaml 

25 

26from little_loops.fsm.schema import EvaluateConfig, FSMLoop, StateConfig 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class ValidationSeverity(Enum): 

32 """Severity level for validation issues.""" 

33 

34 ERROR = "error" 

35 WARNING = "warning" 

36 

37 

38@dataclass 

39class ValidationError: 

40 """Structured validation error. 

41 

42 Attributes: 

43 message: Human-readable error description 

44 path: Path to the problematic element (e.g., "states.check.route") 

45 severity: Error severity (error or warning) 

46 """ 

47 

48 message: str 

49 path: str | None = None 

50 severity: ValidationSeverity = ValidationSeverity.ERROR 

51 

52 def __str__(self) -> str: 

53 """Format error for display.""" 

54 prefix = f"[{self.severity.value.upper()}]" 

55 if self.path: 

56 return f"{prefix} {self.path}: {self.message}" 

57 return f"{prefix} {self.message}" 

58 

59 

60# Evaluator type to required fields mapping 

61EVALUATOR_REQUIRED_FIELDS: dict[str, list[str]] = { 

62 "exit_code": [], 

63 "output_numeric": ["operator", "target"], 

64 "output_json": ["path", "operator", "target"], 

65 "output_contains": ["pattern"], 

66 "convergence": ["target"], 

67 "diff_stall": [], 

68 "llm_structured": [], 

69 "mcp_result": [], 

70} 

71 

72# Valid comparison operators 

73VALID_OPERATORS = {"eq", "ne", "lt", "le", "gt", "ge"} 

74 

75# All top-level keys recognized by FSMLoop.from_dict() 

76KNOWN_TOP_LEVEL_KEYS: frozenset[str] = frozenset( 

77 { 

78 "name", 

79 "description", 

80 "initial", 

81 "states", 

82 "context", 

83 "scope", 

84 "max_iterations", 

85 "backoff", 

86 "timeout", 

87 "maintain", 

88 "llm", 

89 "on_handoff", 

90 "input_key", 

91 } 

92) 

93 

94 

95def _validate_evaluator(state_name: str, evaluate: EvaluateConfig) -> list[ValidationError]: 

96 """Validate evaluator configuration for type-specific requirements. 

97 

98 Args: 

99 state_name: Name of the state containing this evaluator 

100 evaluate: The evaluator configuration to validate 

101 

102 Returns: 

103 List of validation errors found 

104 """ 

105 errors: list[ValidationError] = [] 

106 path = f"states.{state_name}.evaluate" 

107 

108 # Check required fields for evaluator type 

109 required = EVALUATOR_REQUIRED_FIELDS.get(evaluate.type, []) 

110 for field_name in required: 

111 value = getattr(evaluate, field_name, None) 

112 if value is None: 

113 errors.append( 

114 ValidationError( 

115 message=f"Evaluator type '{evaluate.type}' requires '{field_name}' field", 

116 path=path, 

117 ) 

118 ) 

119 

120 # Validate operator if present 

121 if evaluate.operator is not None and evaluate.operator not in VALID_OPERATORS: 

122 errors.append( 

123 ValidationError( 

124 message=f"Invalid operator '{evaluate.operator}'. " 

125 f"Must be one of: {', '.join(sorted(VALID_OPERATORS))}", 

126 path=f"{path}.operator", 

127 ) 

128 ) 

129 

130 # Validate convergence-specific fields 

131 if evaluate.type == "convergence": 

132 if evaluate.direction not in ("minimize", "maximize"): 

133 errors.append( 

134 ValidationError( 

135 message=f"Invalid direction '{evaluate.direction}'. " 

136 "Must be 'minimize' or 'maximize'", 

137 path=f"{path}.direction", 

138 ) 

139 ) 

140 # Only validate tolerance if it's a numeric value (not an interpolation string) 

141 if ( 

142 evaluate.tolerance is not None 

143 and isinstance(evaluate.tolerance, (int, float)) 

144 and evaluate.tolerance < 0 

145 ): 

146 errors.append( 

147 ValidationError( 

148 message="Tolerance cannot be negative", 

149 path=f"{path}.tolerance", 

150 ) 

151 ) 

152 

153 # Validate llm_structured-specific fields 

154 if evaluate.type == "llm_structured": 

155 if evaluate.min_confidence < 0 or evaluate.min_confidence > 1: 

156 errors.append( 

157 ValidationError( 

158 message="min_confidence must be between 0 and 1", 

159 path=f"{path}.min_confidence", 

160 ) 

161 ) 

162 

163 # Validate diff_stall-specific fields 

164 if evaluate.type == "diff_stall": 

165 if evaluate.max_stall < 1: 

166 errors.append( 

167 ValidationError( 

168 message="max_stall must be >= 1", 

169 path=f"{path}.max_stall", 

170 ) 

171 ) 

172 

173 return errors 

174 

175 

176def _validate_state_action(state_name: str, state: StateConfig) -> list[ValidationError]: 

177 """Validate state action configuration. 

178 

179 Args: 

180 state_name: Name of the state to validate 

181 state: The state configuration to validate 

182 

183 Returns: 

184 List of validation errors found 

185 """ 

186 errors: list[ValidationError] = [] 

187 path = f"states.{state_name}" 

188 

189 # params field is only valid for mcp_tool states 

190 if state.params and state.action_type != "mcp_tool": 

191 errors.append( 

192 ValidationError( 

193 message="'params' field is only valid when action_type is 'mcp_tool'", 

194 path=f"{path}.params", 

195 ) 

196 ) 

197 

198 # loop and action are mutually exclusive 

199 if state.loop is not None and state.action is not None: 

200 errors.append( 

201 ValidationError( 

202 message="'loop' and 'action' are mutually exclusive — " 

203 "a sub-loop state cannot also have an action", 

204 path=f"{path}", 

205 ) 

206 ) 

207 

208 return errors 

209 

210 

211def _validate_state_routing(state_name: str, state: StateConfig) -> list[ValidationError]: 

212 """Validate state routing configuration. 

213 

214 Checks for conflicting routing definitions (shorthand vs full route). 

215 

216 Args: 

217 state_name: Name of the state to validate 

218 state: The state configuration to validate 

219 

220 Returns: 

221 List of validation errors/warnings found 

222 """ 

223 errors: list[ValidationError] = [] 

224 path = f"states.{state_name}" 

225 

226 has_shorthand = ( 

227 state.on_yes is not None 

228 or state.on_no is not None 

229 or state.on_error is not None 

230 or state.on_partial is not None 

231 ) 

232 has_route = state.route is not None 

233 

234 # Warn about conflicting definitions 

235 if has_shorthand and has_route: 

236 errors.append( 

237 ValidationError( 

238 message="Both shorthand routing (on_yes/on_no/on_error) " 

239 "and full route table defined. Route table will take precedence.", 

240 path=path, 

241 severity=ValidationSeverity.WARNING, 

242 ) 

243 ) 

244 

245 # Check for no valid transition definition 

246 has_next = state.next is not None 

247 has_terminal = state.terminal 

248 has_loop = state.loop is not None 

249 

250 if not has_shorthand and not has_route and not has_next and not has_terminal and not has_loop: 

251 errors.append( 

252 ValidationError( 

253 message="State has no transition defined. Add routing, 'next', " 

254 "or mark as 'terminal: true'", 

255 path=path, 

256 ) 

257 ) 

258 

259 # Validate retry field pairing: max_retries requires on_retry_exhausted and vice versa 

260 if state.max_retries is not None and state.on_retry_exhausted is None: 

261 errors.append( 

262 ValidationError( 

263 message="'max_retries' requires 'on_retry_exhausted' to also be set", 

264 path=path, 

265 ) 

266 ) 

267 if state.on_retry_exhausted is not None and state.max_retries is None: 

268 errors.append( 

269 ValidationError( 

270 message="'on_retry_exhausted' requires 'max_retries' to also be set", 

271 path=path, 

272 ) 

273 ) 

274 if state.max_retries is not None and state.max_retries < 1: 

275 errors.append( 

276 ValidationError( 

277 message=f"'max_retries' must be >= 1, got {state.max_retries}", 

278 path=path, 

279 ) 

280 ) 

281 

282 return errors 

283 

284 

285def validate_fsm(fsm: FSMLoop) -> list[ValidationError]: 

286 """Validate FSM structure and return list of errors. 

287 

288 Performs comprehensive validation: 

289 - Initial state exists 

290 - All referenced states exist 

291 - At least one terminal state 

292 - Evaluator configurations are valid 

293 - Routing configurations are valid 

294 - Numeric fields are in valid ranges (max_iterations > 0, backoff >= 0, timeout > 0) 

295 

296 Args: 

297 fsm: The FSM loop to validate 

298 

299 Returns: 

300 List of validation errors (empty if valid) 

301 """ 

302 errors: list[ValidationError] = [] 

303 defined_states = fsm.get_all_state_names() 

304 

305 # Check initial state exists 

306 if fsm.initial not in defined_states: 

307 errors.append( 

308 ValidationError( 

309 message=f"Initial state '{fsm.initial}' not found in states", 

310 path="initial", 

311 ) 

312 ) 

313 

314 # Check at least one terminal state 

315 terminal_states = fsm.get_terminal_states() 

316 if not terminal_states: 

317 errors.append( 

318 ValidationError( 

319 message="No terminal state defined. At least one state must have 'terminal: true'", 

320 path="states", 

321 ) 

322 ) 

323 

324 # Validate each state 

325 for state_name, state in fsm.states.items(): 

326 # Check all referenced states exist 

327 refs = state.get_referenced_states() 

328 for ref in refs: 

329 # $current is a special token for retry 

330 if ref != "$current" and ref not in defined_states: 

331 errors.append( 

332 ValidationError( 

333 message=f"References unknown state '{ref}'", 

334 path=f"states.{state_name}", 

335 ) 

336 ) 

337 

338 # Validate action configuration 

339 errors.extend(_validate_state_action(state_name, state)) 

340 

341 # Validate evaluator if present 

342 if state.evaluate is not None: 

343 errors.extend(_validate_evaluator(state_name, state.evaluate)) 

344 

345 # Validate routing configuration 

346 errors.extend(_validate_state_routing(state_name, state)) 

347 

348 # Check numeric field ranges 

349 if fsm.max_iterations <= 0: 

350 errors.append( 

351 ValidationError( 

352 message=f"max_iterations must be > 0, got {fsm.max_iterations}", 

353 path="max_iterations", 

354 ) 

355 ) 

356 if fsm.backoff is not None and fsm.backoff < 0: 

357 errors.append( 

358 ValidationError( 

359 message=f"backoff must be >= 0, got {fsm.backoff}", 

360 path="backoff", 

361 ) 

362 ) 

363 if fsm.timeout is not None and fsm.timeout <= 0: 

364 errors.append( 

365 ValidationError( 

366 message=f"timeout must be > 0, got {fsm.timeout}", 

367 path="timeout", 

368 ) 

369 ) 

370 if fsm.llm.max_tokens <= 0: 

371 errors.append( 

372 ValidationError( 

373 message=f"llm.max_tokens must be > 0, got {fsm.llm.max_tokens}", 

374 path="llm.max_tokens", 

375 ) 

376 ) 

377 if fsm.llm.timeout <= 0: 

378 errors.append( 

379 ValidationError( 

380 message=f"llm.timeout must be > 0, got {fsm.llm.timeout}", 

381 path="llm.timeout", 

382 ) 

383 ) 

384 

385 # Check for unreachable states (warning only) 

386 reachable = _find_reachable_states(fsm) 

387 unreachable = defined_states - reachable 

388 for state_name in unreachable: 

389 errors.append( 

390 ValidationError( 

391 message="State is not reachable from initial state", 

392 path=f"states.{state_name}", 

393 severity=ValidationSeverity.WARNING, 

394 ) 

395 ) 

396 

397 return errors 

398 

399 

400def _find_reachable_states(fsm: FSMLoop) -> set[str]: 

401 """Find all states reachable from the initial state. 

402 

403 Uses breadth-first search to find all reachable states. 

404 

405 Args: 

406 fsm: The FSM loop to analyze 

407 

408 Returns: 

409 Set of reachable state names 

410 """ 

411 reachable: set[str] = set() 

412 to_visit: deque[str] = deque([fsm.initial]) 

413 

414 while to_visit: 

415 current = to_visit.popleft() 

416 if current in reachable or current not in fsm.states: 

417 continue 

418 

419 reachable.add(current) 

420 state = fsm.states[current] 

421 refs = state.get_referenced_states() 

422 

423 for ref in refs: 

424 if ref != "$current" and ref not in reachable: 

425 to_visit.append(ref) 

426 

427 return reachable 

428 

429 

430def load_and_validate(path: Path) -> tuple[FSMLoop, list[ValidationError]]: 

431 """Load YAML file and validate FSM structure. 

432 

433 Args: 

434 path: Path to the YAML file to load 

435 

436 Returns: 

437 Tuple of (validated FSMLoop instance, list of WARNING-severity ValidationErrors) 

438 

439 Raises: 

440 FileNotFoundError: If the file doesn't exist 

441 yaml.YAMLError: If the file is not valid YAML 

442 ValueError: If validation fails (contains error details) 

443 """ 

444 if not path.exists(): 

445 raise FileNotFoundError(f"FSM file not found: {path}") 

446 

447 with open(path) as f: 

448 data: dict[str, Any] = yaml.safe_load(f) 

449 

450 if not isinstance(data, dict): 

451 raise ValueError(f"FSM file must contain a YAML mapping, got {type(data)}") 

452 

453 # Check required fields before parsing 

454 missing = [] 

455 for field in ["name", "initial", "states"]: 

456 if field not in data: 

457 missing.append(field) 

458 

459 if missing: 

460 raise ValueError(f"FSM file missing required fields: {', '.join(missing)}") 

461 

462 # Check for unknown top-level keys before parsing 

463 unknown_key_warnings: list[ValidationError] = [] 

464 unknown = set(data.keys()) - KNOWN_TOP_LEVEL_KEYS 

465 if unknown: 

466 unknown_key_warnings.append( 

467 ValidationError( 

468 path="<root>", 

469 message=f"Unknown top-level keys: {', '.join(sorted(unknown))}", 

470 severity=ValidationSeverity.WARNING, 

471 ) 

472 ) 

473 

474 # Parse into dataclass 

475 fsm = FSMLoop.from_dict(data) 

476 

477 # Validate 

478 errors = validate_fsm(fsm) 

479 

480 # Filter to errors only (not warnings) for raising 

481 error_list = [e for e in errors if e.severity == ValidationSeverity.ERROR] 

482 

483 if error_list: 

484 error_messages = "\n ".join(str(e) for e in error_list) 

485 raise ValueError(f"FSM validation failed:\n {error_messages}") 

486 

487 # Collect all warnings (unknown-key warnings + structural warnings) 

488 struct_warnings = [e for e in errors if e.severity == ValidationSeverity.WARNING] 

489 all_warnings = unknown_key_warnings + struct_warnings 

490 for warning in all_warnings: 

491 logger.warning(str(warning)) 

492 

493 return fsm, all_warnings