Coverage for src / domain / lifecycle.py: 80%

237 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Implementer lifecycle state machine for orchestrator control flow. 

2 

3Extracts the retry/gate/review policy as a pure state machine that can be 

4tested without Claude SDK or subprocess dependencies. 

5 

6The state machine is data-in/data-out: it receives events and returns 

7effects (actions the orchestrator should take). This separation allows 

8the orchestrator to remain responsible for I/O while the lifecycle 

9handles all policy decisions. 

10 

11This module provides the canonical RetryState and lifecycle state machine: 

12 

131. **Testing policy in isolation**: Use ImplementerLifecycle directly to verify 

14 retry/gate/review transitions without mocking SDK or subprocesses. 

15 

162. **Integration with orchestrator**: The orchestrator imports RetryState from 

17 this module and uses lifecycle_ctx.retry_state directly for gate checks. 

18""" 

19 

20from __future__ import annotations 

21 

22import logging 

23from dataclasses import dataclass, field 

24from enum import Enum, auto 

25from typing import TYPE_CHECKING, Protocol, runtime_checkable 

26 

27from .validation.spec import ResolutionOutcome 

28 

29logger = logging.getLogger(__name__) 

30 

31if TYPE_CHECKING: 

32 from .validation.spec import IssueResolution 

33 

34# Resolution outcomes that skip review (no new code to review) 

35_SKIP_REVIEW_OUTCOMES = frozenset( 

36 { 

37 ResolutionOutcome.NO_CHANGE, 

38 ResolutionOutcome.OBSOLETE, 

39 ResolutionOutcome.ALREADY_COMPLETE, 

40 } 

41) 

42 

43 

44# --------------------------------------------------------------------------- 

45# Local outcome protocols: define the interface lifecycle needs from infra 

46# --------------------------------------------------------------------------- 

47 

48 

49@runtime_checkable 

50class GateOutcome(Protocol): 

51 """Protocol defining what lifecycle needs from a gate result. 

52 

53 Callers (orchestrator) pass infra GateResult objects that satisfy this 

54 protocol. Lifecycle only accesses these fields. 

55 """ 

56 

57 @property 

58 def passed(self) -> bool: 

59 """Whether the gate check passed.""" 

60 ... 

61 

62 @property 

63 def failure_reasons(self) -> list[str]: 

64 """Reasons for failure (empty if passed).""" 

65 ... 

66 

67 @property 

68 def commit_hash(self) -> str | None: 

69 """Commit hash if a commit was found.""" 

70 ... 

71 

72 @property 

73 def no_progress(self) -> bool: 

74 """Whether no progress was detected since last attempt.""" 

75 ... 

76 

77 @property 

78 def resolution(self) -> IssueResolution | None: 

79 """Issue resolution if a resolution marker was found.""" 

80 ... 

81 

82 

83class ReviewIssue(Protocol): 

84 """Protocol for a single issue from an external review. 

85 

86 Matches the fields lifecycle needs from review issues for building 

87 failure messages. Uses Protocol to allow structural subtyping with 

88 cerberus_review.ReviewIssue. 

89 """ 

90 

91 @property 

92 def file(self) -> str: 

93 """File path where the issue was found.""" 

94 ... 

95 

96 @property 

97 def line_start(self) -> int: 

98 """Starting line number of the issue.""" 

99 ... 

100 

101 @property 

102 def line_end(self) -> int: 

103 """Ending line number of the issue.""" 

104 ... 

105 

106 @property 

107 def priority(self) -> int | None: 

108 """Priority level: 0=P0, 1=P1, 2=P2, 3=P3, or None if unknown.""" 

109 ... 

110 

111 @property 

112 def title(self) -> str: 

113 """Short title of the issue.""" 

114 ... 

115 

116 @property 

117 def body(self) -> str: 

118 """Detailed description of the issue.""" 

119 ... 

120 

121 @property 

122 def reviewer(self) -> str: 

123 """Identifier of the reviewer that found this issue.""" 

124 ... 

125 

126 

127@runtime_checkable 

128class ReviewOutcome(Protocol): 

129 """Protocol defining what lifecycle needs from an external review result. 

130 

131 Callers (orchestrator) pass infra review result objects (e.g., ReviewResult 

132 from cerberus_review) that satisfy this protocol. Lifecycle only accesses 

133 these fields. 

134 """ 

135 

136 @property 

137 def passed(self) -> bool: 

138 """Whether the review passed.""" 

139 ... 

140 

141 @property 

142 def parse_error(self) -> str | None: 

143 """Parse error message if JSON parsing failed.""" 

144 ... 

145 

146 @property 

147 def fatal_error(self) -> bool: 

148 """Whether the review failure is unrecoverable.""" 

149 ... 

150 

151 @property 

152 def issues(self) -> list[ReviewIssue]: 

153 """List of issues found during review.""" 

154 ... 

155 

156 

157class LifecycleState(Enum): 

158 """States in the implementer lifecycle.""" 

159 

160 # Initial state - agent session starting 

161 INITIAL = auto() 

162 # Processing messages from SDK stream 

163 PROCESSING = auto() 

164 # Waiting for log file to appear 

165 AWAITING_LOG = auto() 

166 # Running quality gate check 

167 RUNNING_GATE = auto() 

168 # Running external review (after gate passed) 

169 RUNNING_REVIEW = auto() 

170 # Terminal: success 

171 SUCCESS = auto() 

172 # Terminal: failed 

173 FAILED = auto() 

174 

175 

176class Effect(Enum): 

177 """Effects/actions the orchestrator should perform. 

178 

179 These are returned by state transitions to tell the orchestrator 

180 what I/O actions to take. 

181 """ 

182 

183 # Continue processing SDK messages 

184 CONTINUE = auto() 

185 # Wait for log file to appear 

186 WAIT_FOR_LOG = auto() 

187 # Run quality gate check 

188 RUN_GATE = auto() 

189 # Run external review 

190 RUN_REVIEW = auto() 

191 # Send gate retry follow-up prompt to SDK 

192 SEND_GATE_RETRY = auto() 

193 # Send review retry follow-up prompt to SDK 

194 SEND_REVIEW_RETRY = auto() 

195 # Complete with success 

196 COMPLETE_SUCCESS = auto() 

197 # Complete with failure 

198 COMPLETE_FAILURE = auto() 

199 

200 

201@dataclass 

202class LifecycleConfig: 

203 """Configuration for lifecycle behavior.""" 

204 

205 max_gate_retries: int = 3 

206 max_review_retries: int = 3 

207 review_enabled: bool = True 

208 

209 

210@dataclass 

211class RetryState: 

212 """Tracks retry attempts for gate and review. 

213 

214 This is mutable state that the lifecycle updates during transitions. 

215 The orchestrator can read it to format follow-up prompts. 

216 """ 

217 

218 gate_attempt: int = 1 

219 review_attempt: int = 0 

220 log_offset: int = 0 

221 previous_commit_hash: str | None = None 

222 baseline_timestamp: int = 0 

223 

224 

225# Sentinel value indicating usage tracking is disabled (e.g., SDK returned no usage) 

226TRACKING_DISABLED: int = -1 

227 

228 

229@dataclass 

230class ContextUsage: 

231 """Tracks token usage for context exhaustion detection. 

232 

233 The SDK provides cumulative input_tokens in ResultMessage.usage. 

234 We track usage to detect when approaching the 200K context limit. 

235 

236 When usage is unavailable (SDK doesn't provide it), input_tokens is set 

237 to TRACKING_DISABLED (-1) to distinguish from zero usage. 

238 """ 

239 

240 input_tokens: int = 0 

241 output_tokens: int = 0 

242 cache_read_tokens: int = 0 

243 

244 @property 

245 def tracking_disabled(self) -> bool: 

246 """Return True if usage tracking is disabled.""" 

247 return self.input_tokens == TRACKING_DISABLED 

248 

249 def disable_tracking(self) -> None: 

250 """Mark tracking as disabled by setting sentinel value.""" 

251 self.input_tokens = TRACKING_DISABLED 

252 

253 def pressure_ratio(self, limit: int) -> float: 

254 """Return ratio of total tokens used to the limit. 

255 

256 Note: cache_read_tokens are already included in input_tokens 

257 per the Anthropic API, so we only sum input + output. 

258 

259 Args: 

260 limit: Maximum context tokens (e.g., 200_000) 

261 

262 Returns: 

263 Ratio from 0.0 to 1.0+ (e.g., 90000/200000 = 0.45) 

264 Returns 0.0 if limit is 0, or if tracking is disabled. 

265 """ 

266 if limit <= 0 or self.tracking_disabled: 

267 return 0.0 

268 return (self.input_tokens + self.output_tokens) / limit 

269 

270 

271@dataclass 

272class LifecycleContext: 

273 """Context passed to and updated by state transitions. 

274 

275 This bundles the mutable state and accumulated results that 

276 the lifecycle tracks across transitions. 

277 """ 

278 

279 retry_state: RetryState = field(default_factory=RetryState) 

280 session_id: str | None = None 

281 final_result: str = "" 

282 success: bool = False 

283 # Last gate result for building failure messages 

284 last_gate_result: GateOutcome | None = None 

285 # Last review result for building follow-up prompts 

286 last_review_result: ReviewOutcome | None = None 

287 # Resolution for issue (no-op, obsolete, etc.) 

288 resolution: IssueResolution | None = None 

289 # P2/P3 issues from review to create as tracking issues 

290 # These are low-priority issues that don't block the review but should be tracked 

291 low_priority_review_issues: list[ReviewIssue] = field(default_factory=list) 

292 # Token usage for context exhaustion detection 

293 context_usage: ContextUsage = field(default_factory=ContextUsage) 

294 

295 

296@dataclass 

297class TransitionResult: 

298 """Result of a state transition. 

299 

300 Contains the new state and the effect the orchestrator should perform. 

301 """ 

302 

303 state: LifecycleState 

304 effect: Effect 

305 # Optional message explaining the transition (for logging) 

306 message: str | None = None 

307 

308 

309class ImplementerLifecycle: 

310 """Pure state machine for implementer agent lifecycle. 

311 

312 This class encapsulates all policy decisions about: 

313 - When to run gate vs review 

314 - When to retry vs fail 

315 - How to track attempt counts 

316 

317 It does NOT perform any I/O - the orchestrator handles that based 

318 on the Effect returned by each transition. 

319 """ 

320 

321 def __init__(self, config: LifecycleConfig): 

322 self.config = config 

323 self._state = LifecycleState.INITIAL 

324 

325 @property 

326 def state(self) -> LifecycleState: 

327 """Current lifecycle state.""" 

328 return self._state 

329 

330 @property 

331 def is_terminal(self) -> bool: 

332 """Whether the lifecycle has reached a terminal state.""" 

333 return self._state in (LifecycleState.SUCCESS, LifecycleState.FAILED) 

334 

335 def start(self) -> TransitionResult: 

336 """Begin the lifecycle - transition from INITIAL to PROCESSING.""" 

337 if self._state != LifecycleState.INITIAL: 

338 raise ValueError(f"Cannot start from state {self._state}") 

339 self._state = LifecycleState.PROCESSING 

340 logger.info("Lifecycle started: state=%s", self._state.name) 

341 return TransitionResult( 

342 state=self._state, 

343 effect=Effect.CONTINUE, 

344 message="Agent session started", 

345 ) 

346 

347 def on_messages_complete( 

348 self, ctx: LifecycleContext, has_session_id: bool 

349 ) -> TransitionResult: 

350 """Handle completion of message processing. 

351 

352 Called when the SDK receive_response iterator completes. 

353 Transitions to AWAITING_LOG if we have a session ID. 

354 """ 

355 if self._state != LifecycleState.PROCESSING: 

356 raise ValueError(f"Unexpected state for messages_complete: {self._state}") 

357 

358 if not has_session_id: 

359 # No session ID means no log to check 

360 ctx.final_result = "No session ID received from agent" 

361 ctx.success = False 

362 self._state = LifecycleState.FAILED 

363 return TransitionResult( 

364 state=self._state, 

365 effect=Effect.COMPLETE_FAILURE, 

366 message="No session ID", 

367 ) 

368 

369 self._state = LifecycleState.AWAITING_LOG 

370 logger.debug("Messages complete: effect=%s", Effect.WAIT_FOR_LOG.name) 

371 return TransitionResult( 

372 state=self._state, 

373 effect=Effect.WAIT_FOR_LOG, 

374 ) 

375 

376 def on_log_ready(self, ctx: LifecycleContext) -> TransitionResult: 

377 """Handle log file becoming available. 

378 

379 Transitions to RUNNING_GATE. 

380 """ 

381 if self._state != LifecycleState.AWAITING_LOG: 

382 raise ValueError(f"Unexpected state for log_ready: {self._state}") 

383 

384 self._state = LifecycleState.RUNNING_GATE 

385 return TransitionResult( 

386 state=self._state, 

387 effect=Effect.RUN_GATE, 

388 ) 

389 

390 def on_log_timeout(self, ctx: LifecycleContext, log_path: str) -> TransitionResult: 

391 """Handle timeout waiting for log file. 

392 

393 Transitions to FAILED. 

394 """ 

395 if self._state != LifecycleState.AWAITING_LOG: 

396 raise ValueError(f"Unexpected state for log_timeout: {self._state}") 

397 

398 ctx.final_result = f"Session log missing after timeout: {log_path}" 

399 ctx.success = False 

400 self._state = LifecycleState.FAILED 

401 return TransitionResult( 

402 state=self._state, 

403 effect=Effect.COMPLETE_FAILURE, 

404 message="Log file timeout", 

405 ) 

406 

407 def on_gate_result( 

408 self, ctx: LifecycleContext, gate_result: GateOutcome, new_log_offset: int 

409 ) -> TransitionResult: 

410 """Handle quality gate result. 

411 

412 Decides whether to: 

413 - Proceed to review (if gate passed and review enabled) 

414 - Complete with success (if gate passed and review disabled) 

415 - Retry gate (if failed but retries remain) 

416 - Fail (if no retries left or no progress) 

417 """ 

418 if self._state != LifecycleState.RUNNING_GATE: 

419 raise ValueError(f"Unexpected state for gate_result: {self._state}") 

420 

421 ctx.last_gate_result = gate_result 

422 ctx.resolution = gate_result.resolution 

423 

424 logger.info( 

425 "Gate result: outcome=%s attempt=%d state=%s", 

426 "passed" if gate_result.passed else "failed", 

427 ctx.retry_state.gate_attempt, 

428 self._state.name, 

429 ) 

430 

431 if gate_result.passed: 

432 # Gate passed - should we run review? 

433 # Skip review for resolutions with no new code (no_change, obsolete, already_complete) 

434 resolution_skips_review = ( 

435 gate_result.resolution is not None 

436 and gate_result.resolution.outcome in _SKIP_REVIEW_OUTCOMES 

437 ) 

438 if ( 

439 self.config.review_enabled 

440 and gate_result.commit_hash 

441 and not resolution_skips_review 

442 ): 

443 # Only initialize review_attempt if not already started 

444 # (preserves count across gate re-runs after review retry) 

445 if ctx.retry_state.review_attempt == 0: 

446 ctx.retry_state.review_attempt = 1 

447 self._state = LifecycleState.RUNNING_REVIEW 

448 return TransitionResult( 

449 state=self._state, 

450 effect=Effect.RUN_REVIEW, 

451 message=f"Gate passed, running review (attempt {ctx.retry_state.review_attempt}/{self.config.max_review_retries})", 

452 ) 

453 else: 

454 # No review needed - success! 

455 ctx.success = True 

456 self._state = LifecycleState.SUCCESS 

457 return TransitionResult( 

458 state=self._state, 

459 effect=Effect.COMPLETE_SUCCESS, 

460 message="Gate passed, no review required", 

461 ) 

462 

463 # Gate failed - can we retry? 

464 can_retry = ( 

465 ctx.retry_state.gate_attempt < self.config.max_gate_retries 

466 and not gate_result.no_progress 

467 ) 

468 

469 if can_retry: 

470 # Prepare for retry 

471 ctx.retry_state.gate_attempt += 1 

472 ctx.retry_state.log_offset = new_log_offset 

473 ctx.retry_state.previous_commit_hash = gate_result.commit_hash 

474 self._state = LifecycleState.PROCESSING 

475 logger.debug( 

476 "Retry triggered: reason=gate_failed attempt=%d/%d", 

477 ctx.retry_state.gate_attempt, 

478 self.config.max_gate_retries, 

479 ) 

480 return TransitionResult( 

481 state=self._state, 

482 effect=Effect.SEND_GATE_RETRY, 

483 message=f"Gate retry {ctx.retry_state.gate_attempt}/{self.config.max_gate_retries}", 

484 ) 

485 

486 # No retries left or no progress - fail 

487 ctx.final_result = ( 

488 f"Quality gate failed: {'; '.join(gate_result.failure_reasons)}" 

489 ) 

490 ctx.success = False 

491 self._state = LifecycleState.FAILED 

492 logger.info( 

493 "Lifecycle terminal: state=%s message=%s", 

494 self._state.name, 

495 "Gate failed, no retries left", 

496 ) 

497 return TransitionResult( 

498 state=self._state, 

499 effect=Effect.COMPLETE_FAILURE, 

500 message="Gate failed, no retries left", 

501 ) 

502 

503 def on_review_result( 

504 self, 

505 ctx: LifecycleContext, 

506 review_result: ReviewOutcome, 

507 new_log_offset: int, 

508 no_progress: bool = False, 

509 ) -> TransitionResult: 

510 """Handle external review result. 

511 

512 Decides whether to: 

513 - Complete with success (if review passed) 

514 - Re-run review (if parse_error and retries remain) 

515 - Retry via agent prompt (if failed with issues but retries remain) 

516 - Fail (if no retries left or no progress) 

517 

518 Args: 

519 ctx: Lifecycle context with retry state. 

520 review_result: The review outcome to process. 

521 new_log_offset: Updated log offset for next attempt. 

522 no_progress: If True, the agent made no progress since last attempt 

523 (same commit, no new validation evidence). Triggers fail-fast. 

524 """ 

525 if self._state != LifecycleState.RUNNING_REVIEW: 

526 raise ValueError(f"Unexpected state for review_result: {self._state}") 

527 

528 ctx.last_review_result = review_result 

529 

530 logger.info( 

531 "Review result: outcome=%s attempt=%d state=%s", 

532 "passed" if review_result.passed else "failed", 

533 ctx.retry_state.review_attempt, 

534 self._state.name, 

535 ) 

536 

537 # Check for blocking issues (P0/P1 only). P2/P3 issues are acceptable 

538 # and can be tracked as beads issues later. 

539 # Issues with None priority are treated as non-blocking (default to P3). 

540 blocking_issues = [ 

541 i 

542 for i in review_result.issues 

543 if i.priority is not None and i.priority <= 1 

544 ] 

545 

546 # Parse errors are always blocking - we can't determine if there are issues 

547 has_parse_error = review_result.parse_error is not None 

548 

549 if review_result.passed or (not blocking_issues and not has_parse_error): 

550 ctx.success = True 

551 self._state = LifecycleState.SUCCESS 

552 # Collect P2/P3 issues for tracking - include issues with priority > 1 

553 # or issues with None priority (treated as P3 for tracking purposes). 

554 # This ensures no review feedback is lost. 

555 low_pri_issues = [ 

556 i for i in review_result.issues if i.priority is None or i.priority > 1 

557 ] 

558 ctx.low_priority_review_issues = low_pri_issues 

559 # Include P2/P3 count in message if any exist 

560 low_pri_count = len(low_pri_issues) 

561 if low_pri_count > 0: 

562 msg = f"Review passed ({low_pri_count} P2/P3 issues noted for later)" 

563 else: 

564 msg = "Review passed" 

565 return TransitionResult( 

566 state=self._state, 

567 effect=Effect.COMPLETE_SUCCESS, 

568 message=msg, 

569 ) 

570 

571 if review_result.parse_error and review_result.fatal_error: 

572 ctx.final_result = f"External review failed: {review_result.parse_error}" 

573 ctx.success = False 

574 self._state = LifecycleState.FAILED 

575 return TransitionResult( 

576 state=self._state, 

577 effect=Effect.COMPLETE_FAILURE, 

578 message="Review failed, unrecoverable error", 

579 ) 

580 

581 # Parse error (non-fatal): re-run review tool directly, not agent prompt. 

582 # This handles infrastructure issues with the reviewer (e.g., malformed JSON). 

583 if review_result.parse_error: 

584 can_retry = ctx.retry_state.review_attempt < self.config.max_review_retries 

585 if can_retry: 

586 ctx.retry_state.review_attempt += 1 

587 # Stay in RUNNING_REVIEW state - orchestrator re-runs external review 

588 return TransitionResult( 

589 state=self._state, 

590 effect=Effect.RUN_REVIEW, 

591 message=f"Review parse error, re-running review (attempt {ctx.retry_state.review_attempt}/{self.config.max_review_retries})", 

592 ) 

593 # No retries left 

594 ctx.final_result = f"External review failed: {review_result.parse_error}" 

595 ctx.success = False 

596 self._state = LifecycleState.FAILED 

597 return TransitionResult( 

598 state=self._state, 

599 effect=Effect.COMPLETE_FAILURE, 

600 message="Review failed, no retries left", 

601 ) 

602 

603 # Review failed with blocking issues - can we retry via agent prompt? 

604 can_retry = ( 

605 ctx.retry_state.review_attempt < self.config.max_review_retries 

606 and not no_progress 

607 ) 

608 

609 if can_retry: 

610 # Prepare for retry - update offset and increment counter 

611 ctx.retry_state.log_offset = new_log_offset 

612 if ctx.last_gate_result: 

613 ctx.retry_state.previous_commit_hash = ctx.last_gate_result.commit_hash 

614 ctx.retry_state.review_attempt += 1 

615 self._state = LifecycleState.PROCESSING 

616 return TransitionResult( 

617 state=self._state, 

618 effect=Effect.SEND_REVIEW_RETRY, 

619 message=f"Review retry {ctx.retry_state.review_attempt}/{self.config.max_review_retries}", 

620 ) 

621 

622 # No retries left - fail with review error details 

623 if no_progress: 

624 ctx.final_result = "External review failed: No progress (commit unchanged, no working tree changes)" 

625 failure_message = "Review failed, no progress detected" 

626 else: 

627 # Format P0/P1 issues (these are blocking) 

628 critical_msgs = [ 

629 f"{i.file}:{i.line_start}: {i.title}" for i in blocking_issues[:3] 

630 ] 

631 if critical_msgs: 

632 ctx.final_result = f"External review failed: {'; '.join(critical_msgs)}" 

633 else: 

634 ctx.final_result = "External review failed: Unknown reason" 

635 failure_message = "Review failed, no retries left" 

636 ctx.success = False 

637 self._state = LifecycleState.FAILED 

638 return TransitionResult( 

639 state=self._state, 

640 effect=Effect.COMPLETE_FAILURE, 

641 message=failure_message, 

642 ) 

643 

644 def on_timeout( 

645 self, ctx: LifecycleContext, timeout_minutes: int 

646 ) -> TransitionResult: 

647 """Handle session timeout. 

648 

649 This can be called from any non-terminal state. 

650 """ 

651 if self.is_terminal: 

652 return TransitionResult( 

653 state=self._state, 

654 effect=Effect.COMPLETE_FAILURE, 

655 message="Timeout after terminal state", 

656 ) 

657 

658 ctx.final_result = f"Timeout after {timeout_minutes} minutes" 

659 ctx.success = False 

660 self._state = LifecycleState.FAILED 

661 return TransitionResult( 

662 state=self._state, 

663 effect=Effect.COMPLETE_FAILURE, 

664 message=f"Timeout after {timeout_minutes} minutes", 

665 ) 

666 

667 def on_error(self, ctx: LifecycleContext, error: Exception) -> TransitionResult: 

668 """Handle unexpected error. 

669 

670 This can be called from any non-terminal state. 

671 """ 

672 if self.is_terminal: 

673 return TransitionResult( 

674 state=self._state, 

675 effect=Effect.COMPLETE_FAILURE, 

676 message=f"Error after terminal state: {error}", 

677 ) 

678 

679 ctx.final_result = str(error) 

680 ctx.success = False 

681 self._state = LifecycleState.FAILED 

682 return TransitionResult( 

683 state=self._state, 

684 effect=Effect.COMPLETE_FAILURE, 

685 message=f"Error: {error}", 

686 )