Coverage for src / domain / lifecycle.py: 80%
237 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Implementer lifecycle state machine for orchestrator control flow.
3Extracts the retry/gate/review policy as a pure state machine that can be
4tested without Claude SDK or subprocess dependencies.
6The state machine is data-in/data-out: it receives events and returns
7effects (actions the orchestrator should take). This separation allows
8the orchestrator to remain responsible for I/O while the lifecycle
9handles all policy decisions.
11This module provides the canonical RetryState and lifecycle state machine:
131. **Testing policy in isolation**: Use ImplementerLifecycle directly to verify
14 retry/gate/review transitions without mocking SDK or subprocesses.
162. **Integration with orchestrator**: The orchestrator imports RetryState from
17 this module and uses lifecycle_ctx.retry_state directly for gate checks.
18"""
20from __future__ import annotations
22import logging
23from dataclasses import dataclass, field
24from enum import Enum, auto
25from typing import TYPE_CHECKING, Protocol, runtime_checkable
27from .validation.spec import ResolutionOutcome
29logger = logging.getLogger(__name__)
31if TYPE_CHECKING:
32 from .validation.spec import IssueResolution
34# Resolution outcomes that skip review (no new code to review)
35_SKIP_REVIEW_OUTCOMES = frozenset(
36 {
37 ResolutionOutcome.NO_CHANGE,
38 ResolutionOutcome.OBSOLETE,
39 ResolutionOutcome.ALREADY_COMPLETE,
40 }
41)
44# ---------------------------------------------------------------------------
45# Local outcome protocols: define the interface lifecycle needs from infra
46# ---------------------------------------------------------------------------
49@runtime_checkable
50class GateOutcome(Protocol):
51 """Protocol defining what lifecycle needs from a gate result.
53 Callers (orchestrator) pass infra GateResult objects that satisfy this
54 protocol. Lifecycle only accesses these fields.
55 """
57 @property
58 def passed(self) -> bool:
59 """Whether the gate check passed."""
60 ...
62 @property
63 def failure_reasons(self) -> list[str]:
64 """Reasons for failure (empty if passed)."""
65 ...
67 @property
68 def commit_hash(self) -> str | None:
69 """Commit hash if a commit was found."""
70 ...
72 @property
73 def no_progress(self) -> bool:
74 """Whether no progress was detected since last attempt."""
75 ...
77 @property
78 def resolution(self) -> IssueResolution | None:
79 """Issue resolution if a resolution marker was found."""
80 ...
83class ReviewIssue(Protocol):
84 """Protocol for a single issue from an external review.
86 Matches the fields lifecycle needs from review issues for building
87 failure messages. Uses Protocol to allow structural subtyping with
88 cerberus_review.ReviewIssue.
89 """
91 @property
92 def file(self) -> str:
93 """File path where the issue was found."""
94 ...
96 @property
97 def line_start(self) -> int:
98 """Starting line number of the issue."""
99 ...
101 @property
102 def line_end(self) -> int:
103 """Ending line number of the issue."""
104 ...
106 @property
107 def priority(self) -> int | None:
108 """Priority level: 0=P0, 1=P1, 2=P2, 3=P3, or None if unknown."""
109 ...
111 @property
112 def title(self) -> str:
113 """Short title of the issue."""
114 ...
116 @property
117 def body(self) -> str:
118 """Detailed description of the issue."""
119 ...
121 @property
122 def reviewer(self) -> str:
123 """Identifier of the reviewer that found this issue."""
124 ...
127@runtime_checkable
128class ReviewOutcome(Protocol):
129 """Protocol defining what lifecycle needs from an external review result.
131 Callers (orchestrator) pass infra review result objects (e.g., ReviewResult
132 from cerberus_review) that satisfy this protocol. Lifecycle only accesses
133 these fields.
134 """
136 @property
137 def passed(self) -> bool:
138 """Whether the review passed."""
139 ...
141 @property
142 def parse_error(self) -> str | None:
143 """Parse error message if JSON parsing failed."""
144 ...
146 @property
147 def fatal_error(self) -> bool:
148 """Whether the review failure is unrecoverable."""
149 ...
151 @property
152 def issues(self) -> list[ReviewIssue]:
153 """List of issues found during review."""
154 ...
157class LifecycleState(Enum):
158 """States in the implementer lifecycle."""
160 # Initial state - agent session starting
161 INITIAL = auto()
162 # Processing messages from SDK stream
163 PROCESSING = auto()
164 # Waiting for log file to appear
165 AWAITING_LOG = auto()
166 # Running quality gate check
167 RUNNING_GATE = auto()
168 # Running external review (after gate passed)
169 RUNNING_REVIEW = auto()
170 # Terminal: success
171 SUCCESS = auto()
172 # Terminal: failed
173 FAILED = auto()
176class Effect(Enum):
177 """Effects/actions the orchestrator should perform.
179 These are returned by state transitions to tell the orchestrator
180 what I/O actions to take.
181 """
183 # Continue processing SDK messages
184 CONTINUE = auto()
185 # Wait for log file to appear
186 WAIT_FOR_LOG = auto()
187 # Run quality gate check
188 RUN_GATE = auto()
189 # Run external review
190 RUN_REVIEW = auto()
191 # Send gate retry follow-up prompt to SDK
192 SEND_GATE_RETRY = auto()
193 # Send review retry follow-up prompt to SDK
194 SEND_REVIEW_RETRY = auto()
195 # Complete with success
196 COMPLETE_SUCCESS = auto()
197 # Complete with failure
198 COMPLETE_FAILURE = auto()
201@dataclass
202class LifecycleConfig:
203 """Configuration for lifecycle behavior."""
205 max_gate_retries: int = 3
206 max_review_retries: int = 3
207 review_enabled: bool = True
210@dataclass
211class RetryState:
212 """Tracks retry attempts for gate and review.
214 This is mutable state that the lifecycle updates during transitions.
215 The orchestrator can read it to format follow-up prompts.
216 """
218 gate_attempt: int = 1
219 review_attempt: int = 0
220 log_offset: int = 0
221 previous_commit_hash: str | None = None
222 baseline_timestamp: int = 0
225# Sentinel value indicating usage tracking is disabled (e.g., SDK returned no usage)
226TRACKING_DISABLED: int = -1
229@dataclass
230class ContextUsage:
231 """Tracks token usage for context exhaustion detection.
233 The SDK provides cumulative input_tokens in ResultMessage.usage.
234 We track usage to detect when approaching the 200K context limit.
236 When usage is unavailable (SDK doesn't provide it), input_tokens is set
237 to TRACKING_DISABLED (-1) to distinguish from zero usage.
238 """
240 input_tokens: int = 0
241 output_tokens: int = 0
242 cache_read_tokens: int = 0
244 @property
245 def tracking_disabled(self) -> bool:
246 """Return True if usage tracking is disabled."""
247 return self.input_tokens == TRACKING_DISABLED
249 def disable_tracking(self) -> None:
250 """Mark tracking as disabled by setting sentinel value."""
251 self.input_tokens = TRACKING_DISABLED
253 def pressure_ratio(self, limit: int) -> float:
254 """Return ratio of total tokens used to the limit.
256 Note: cache_read_tokens are already included in input_tokens
257 per the Anthropic API, so we only sum input + output.
259 Args:
260 limit: Maximum context tokens (e.g., 200_000)
262 Returns:
263 Ratio from 0.0 to 1.0+ (e.g., 90000/200000 = 0.45)
264 Returns 0.0 if limit is 0, or if tracking is disabled.
265 """
266 if limit <= 0 or self.tracking_disabled:
267 return 0.0
268 return (self.input_tokens + self.output_tokens) / limit
271@dataclass
272class LifecycleContext:
273 """Context passed to and updated by state transitions.
275 This bundles the mutable state and accumulated results that
276 the lifecycle tracks across transitions.
277 """
279 retry_state: RetryState = field(default_factory=RetryState)
280 session_id: str | None = None
281 final_result: str = ""
282 success: bool = False
283 # Last gate result for building failure messages
284 last_gate_result: GateOutcome | None = None
285 # Last review result for building follow-up prompts
286 last_review_result: ReviewOutcome | None = None
287 # Resolution for issue (no-op, obsolete, etc.)
288 resolution: IssueResolution | None = None
289 # P2/P3 issues from review to create as tracking issues
290 # These are low-priority issues that don't block the review but should be tracked
291 low_priority_review_issues: list[ReviewIssue] = field(default_factory=list)
292 # Token usage for context exhaustion detection
293 context_usage: ContextUsage = field(default_factory=ContextUsage)
296@dataclass
297class TransitionResult:
298 """Result of a state transition.
300 Contains the new state and the effect the orchestrator should perform.
301 """
303 state: LifecycleState
304 effect: Effect
305 # Optional message explaining the transition (for logging)
306 message: str | None = None
309class ImplementerLifecycle:
310 """Pure state machine for implementer agent lifecycle.
312 This class encapsulates all policy decisions about:
313 - When to run gate vs review
314 - When to retry vs fail
315 - How to track attempt counts
317 It does NOT perform any I/O - the orchestrator handles that based
318 on the Effect returned by each transition.
319 """
321 def __init__(self, config: LifecycleConfig):
322 self.config = config
323 self._state = LifecycleState.INITIAL
325 @property
326 def state(self) -> LifecycleState:
327 """Current lifecycle state."""
328 return self._state
330 @property
331 def is_terminal(self) -> bool:
332 """Whether the lifecycle has reached a terminal state."""
333 return self._state in (LifecycleState.SUCCESS, LifecycleState.FAILED)
335 def start(self) -> TransitionResult:
336 """Begin the lifecycle - transition from INITIAL to PROCESSING."""
337 if self._state != LifecycleState.INITIAL:
338 raise ValueError(f"Cannot start from state {self._state}")
339 self._state = LifecycleState.PROCESSING
340 logger.info("Lifecycle started: state=%s", self._state.name)
341 return TransitionResult(
342 state=self._state,
343 effect=Effect.CONTINUE,
344 message="Agent session started",
345 )
347 def on_messages_complete(
348 self, ctx: LifecycleContext, has_session_id: bool
349 ) -> TransitionResult:
350 """Handle completion of message processing.
352 Called when the SDK receive_response iterator completes.
353 Transitions to AWAITING_LOG if we have a session ID.
354 """
355 if self._state != LifecycleState.PROCESSING:
356 raise ValueError(f"Unexpected state for messages_complete: {self._state}")
358 if not has_session_id:
359 # No session ID means no log to check
360 ctx.final_result = "No session ID received from agent"
361 ctx.success = False
362 self._state = LifecycleState.FAILED
363 return TransitionResult(
364 state=self._state,
365 effect=Effect.COMPLETE_FAILURE,
366 message="No session ID",
367 )
369 self._state = LifecycleState.AWAITING_LOG
370 logger.debug("Messages complete: effect=%s", Effect.WAIT_FOR_LOG.name)
371 return TransitionResult(
372 state=self._state,
373 effect=Effect.WAIT_FOR_LOG,
374 )
376 def on_log_ready(self, ctx: LifecycleContext) -> TransitionResult:
377 """Handle log file becoming available.
379 Transitions to RUNNING_GATE.
380 """
381 if self._state != LifecycleState.AWAITING_LOG:
382 raise ValueError(f"Unexpected state for log_ready: {self._state}")
384 self._state = LifecycleState.RUNNING_GATE
385 return TransitionResult(
386 state=self._state,
387 effect=Effect.RUN_GATE,
388 )
390 def on_log_timeout(self, ctx: LifecycleContext, log_path: str) -> TransitionResult:
391 """Handle timeout waiting for log file.
393 Transitions to FAILED.
394 """
395 if self._state != LifecycleState.AWAITING_LOG:
396 raise ValueError(f"Unexpected state for log_timeout: {self._state}")
398 ctx.final_result = f"Session log missing after timeout: {log_path}"
399 ctx.success = False
400 self._state = LifecycleState.FAILED
401 return TransitionResult(
402 state=self._state,
403 effect=Effect.COMPLETE_FAILURE,
404 message="Log file timeout",
405 )
407 def on_gate_result(
408 self, ctx: LifecycleContext, gate_result: GateOutcome, new_log_offset: int
409 ) -> TransitionResult:
410 """Handle quality gate result.
412 Decides whether to:
413 - Proceed to review (if gate passed and review enabled)
414 - Complete with success (if gate passed and review disabled)
415 - Retry gate (if failed but retries remain)
416 - Fail (if no retries left or no progress)
417 """
418 if self._state != LifecycleState.RUNNING_GATE:
419 raise ValueError(f"Unexpected state for gate_result: {self._state}")
421 ctx.last_gate_result = gate_result
422 ctx.resolution = gate_result.resolution
424 logger.info(
425 "Gate result: outcome=%s attempt=%d state=%s",
426 "passed" if gate_result.passed else "failed",
427 ctx.retry_state.gate_attempt,
428 self._state.name,
429 )
431 if gate_result.passed:
432 # Gate passed - should we run review?
433 # Skip review for resolutions with no new code (no_change, obsolete, already_complete)
434 resolution_skips_review = (
435 gate_result.resolution is not None
436 and gate_result.resolution.outcome in _SKIP_REVIEW_OUTCOMES
437 )
438 if (
439 self.config.review_enabled
440 and gate_result.commit_hash
441 and not resolution_skips_review
442 ):
443 # Only initialize review_attempt if not already started
444 # (preserves count across gate re-runs after review retry)
445 if ctx.retry_state.review_attempt == 0:
446 ctx.retry_state.review_attempt = 1
447 self._state = LifecycleState.RUNNING_REVIEW
448 return TransitionResult(
449 state=self._state,
450 effect=Effect.RUN_REVIEW,
451 message=f"Gate passed, running review (attempt {ctx.retry_state.review_attempt}/{self.config.max_review_retries})",
452 )
453 else:
454 # No review needed - success!
455 ctx.success = True
456 self._state = LifecycleState.SUCCESS
457 return TransitionResult(
458 state=self._state,
459 effect=Effect.COMPLETE_SUCCESS,
460 message="Gate passed, no review required",
461 )
463 # Gate failed - can we retry?
464 can_retry = (
465 ctx.retry_state.gate_attempt < self.config.max_gate_retries
466 and not gate_result.no_progress
467 )
469 if can_retry:
470 # Prepare for retry
471 ctx.retry_state.gate_attempt += 1
472 ctx.retry_state.log_offset = new_log_offset
473 ctx.retry_state.previous_commit_hash = gate_result.commit_hash
474 self._state = LifecycleState.PROCESSING
475 logger.debug(
476 "Retry triggered: reason=gate_failed attempt=%d/%d",
477 ctx.retry_state.gate_attempt,
478 self.config.max_gate_retries,
479 )
480 return TransitionResult(
481 state=self._state,
482 effect=Effect.SEND_GATE_RETRY,
483 message=f"Gate retry {ctx.retry_state.gate_attempt}/{self.config.max_gate_retries}",
484 )
486 # No retries left or no progress - fail
487 ctx.final_result = (
488 f"Quality gate failed: {'; '.join(gate_result.failure_reasons)}"
489 )
490 ctx.success = False
491 self._state = LifecycleState.FAILED
492 logger.info(
493 "Lifecycle terminal: state=%s message=%s",
494 self._state.name,
495 "Gate failed, no retries left",
496 )
497 return TransitionResult(
498 state=self._state,
499 effect=Effect.COMPLETE_FAILURE,
500 message="Gate failed, no retries left",
501 )
503 def on_review_result(
504 self,
505 ctx: LifecycleContext,
506 review_result: ReviewOutcome,
507 new_log_offset: int,
508 no_progress: bool = False,
509 ) -> TransitionResult:
510 """Handle external review result.
512 Decides whether to:
513 - Complete with success (if review passed)
514 - Re-run review (if parse_error and retries remain)
515 - Retry via agent prompt (if failed with issues but retries remain)
516 - Fail (if no retries left or no progress)
518 Args:
519 ctx: Lifecycle context with retry state.
520 review_result: The review outcome to process.
521 new_log_offset: Updated log offset for next attempt.
522 no_progress: If True, the agent made no progress since last attempt
523 (same commit, no new validation evidence). Triggers fail-fast.
524 """
525 if self._state != LifecycleState.RUNNING_REVIEW:
526 raise ValueError(f"Unexpected state for review_result: {self._state}")
528 ctx.last_review_result = review_result
530 logger.info(
531 "Review result: outcome=%s attempt=%d state=%s",
532 "passed" if review_result.passed else "failed",
533 ctx.retry_state.review_attempt,
534 self._state.name,
535 )
537 # Check for blocking issues (P0/P1 only). P2/P3 issues are acceptable
538 # and can be tracked as beads issues later.
539 # Issues with None priority are treated as non-blocking (default to P3).
540 blocking_issues = [
541 i
542 for i in review_result.issues
543 if i.priority is not None and i.priority <= 1
544 ]
546 # Parse errors are always blocking - we can't determine if there are issues
547 has_parse_error = review_result.parse_error is not None
549 if review_result.passed or (not blocking_issues and not has_parse_error):
550 ctx.success = True
551 self._state = LifecycleState.SUCCESS
552 # Collect P2/P3 issues for tracking - include issues with priority > 1
553 # or issues with None priority (treated as P3 for tracking purposes).
554 # This ensures no review feedback is lost.
555 low_pri_issues = [
556 i for i in review_result.issues if i.priority is None or i.priority > 1
557 ]
558 ctx.low_priority_review_issues = low_pri_issues
559 # Include P2/P3 count in message if any exist
560 low_pri_count = len(low_pri_issues)
561 if low_pri_count > 0:
562 msg = f"Review passed ({low_pri_count} P2/P3 issues noted for later)"
563 else:
564 msg = "Review passed"
565 return TransitionResult(
566 state=self._state,
567 effect=Effect.COMPLETE_SUCCESS,
568 message=msg,
569 )
571 if review_result.parse_error and review_result.fatal_error:
572 ctx.final_result = f"External review failed: {review_result.parse_error}"
573 ctx.success = False
574 self._state = LifecycleState.FAILED
575 return TransitionResult(
576 state=self._state,
577 effect=Effect.COMPLETE_FAILURE,
578 message="Review failed, unrecoverable error",
579 )
581 # Parse error (non-fatal): re-run review tool directly, not agent prompt.
582 # This handles infrastructure issues with the reviewer (e.g., malformed JSON).
583 if review_result.parse_error:
584 can_retry = ctx.retry_state.review_attempt < self.config.max_review_retries
585 if can_retry:
586 ctx.retry_state.review_attempt += 1
587 # Stay in RUNNING_REVIEW state - orchestrator re-runs external review
588 return TransitionResult(
589 state=self._state,
590 effect=Effect.RUN_REVIEW,
591 message=f"Review parse error, re-running review (attempt {ctx.retry_state.review_attempt}/{self.config.max_review_retries})",
592 )
593 # No retries left
594 ctx.final_result = f"External review failed: {review_result.parse_error}"
595 ctx.success = False
596 self._state = LifecycleState.FAILED
597 return TransitionResult(
598 state=self._state,
599 effect=Effect.COMPLETE_FAILURE,
600 message="Review failed, no retries left",
601 )
603 # Review failed with blocking issues - can we retry via agent prompt?
604 can_retry = (
605 ctx.retry_state.review_attempt < self.config.max_review_retries
606 and not no_progress
607 )
609 if can_retry:
610 # Prepare for retry - update offset and increment counter
611 ctx.retry_state.log_offset = new_log_offset
612 if ctx.last_gate_result:
613 ctx.retry_state.previous_commit_hash = ctx.last_gate_result.commit_hash
614 ctx.retry_state.review_attempt += 1
615 self._state = LifecycleState.PROCESSING
616 return TransitionResult(
617 state=self._state,
618 effect=Effect.SEND_REVIEW_RETRY,
619 message=f"Review retry {ctx.retry_state.review_attempt}/{self.config.max_review_retries}",
620 )
622 # No retries left - fail with review error details
623 if no_progress:
624 ctx.final_result = "External review failed: No progress (commit unchanged, no working tree changes)"
625 failure_message = "Review failed, no progress detected"
626 else:
627 # Format P0/P1 issues (these are blocking)
628 critical_msgs = [
629 f"{i.file}:{i.line_start}: {i.title}" for i in blocking_issues[:3]
630 ]
631 if critical_msgs:
632 ctx.final_result = f"External review failed: {'; '.join(critical_msgs)}"
633 else:
634 ctx.final_result = "External review failed: Unknown reason"
635 failure_message = "Review failed, no retries left"
636 ctx.success = False
637 self._state = LifecycleState.FAILED
638 return TransitionResult(
639 state=self._state,
640 effect=Effect.COMPLETE_FAILURE,
641 message=failure_message,
642 )
644 def on_timeout(
645 self, ctx: LifecycleContext, timeout_minutes: int
646 ) -> TransitionResult:
647 """Handle session timeout.
649 This can be called from any non-terminal state.
650 """
651 if self.is_terminal:
652 return TransitionResult(
653 state=self._state,
654 effect=Effect.COMPLETE_FAILURE,
655 message="Timeout after terminal state",
656 )
658 ctx.final_result = f"Timeout after {timeout_minutes} minutes"
659 ctx.success = False
660 self._state = LifecycleState.FAILED
661 return TransitionResult(
662 state=self._state,
663 effect=Effect.COMPLETE_FAILURE,
664 message=f"Timeout after {timeout_minutes} minutes",
665 )
667 def on_error(self, ctx: LifecycleContext, error: Exception) -> TransitionResult:
668 """Handle unexpected error.
670 This can be called from any non-terminal state.
671 """
672 if self.is_terminal:
673 return TransitionResult(
674 state=self._state,
675 effect=Effect.COMPLETE_FAILURE,
676 message=f"Error after terminal state: {error}",
677 )
679 ctx.final_result = str(error)
680 ctx.success = False
681 self._state = LifecycleState.FAILED
682 return TransitionResult(
683 state=self._state,
684 effect=Effect.COMPLETE_FAILURE,
685 message=f"Error: {error}",
686 )