Coverage for src / pipeline / agent_session_runner.py: 87%
433 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""AgentSessionRunner: Agent session execution pipeline stage.
3Extracted from MalaOrchestrator to separate SDK-specific session handling from
4orchestration logic. This module handles:
5- SDK streaming and message processing
6- Session lifecycle transitions via ImplementerLifecycle
7- Session metadata tracking (session ID, log paths)
9The AgentSessionRunner receives explicit inputs and returns explicit outputs,
10making it testable without SDK dependencies when using the SDKClientProtocol.
12Design principles:
13- Protocol-based SDK client for testability
14- Explicit input/output types for clarity
15- Lifecycle state machine drives policy decisions
16- Callbacks for external operations (gate checks, reviews)
17"""
19from __future__ import annotations
21import asyncio
22import logging
23import time
24import uuid
25from collections.abc import Callable, Coroutine
26from dataclasses import dataclass, field
27from pathlib import Path
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 cast,
32)
34from src.infra.agent_runtime import AgentRuntimeBuilder
35from src.domain.lifecycle import (
36 Effect,
37 ImplementerLifecycle,
38 LifecycleConfig,
39 LifecycleContext,
40 LifecycleState,
41)
42from src.domain.prompts import (
43 get_default_validation_commands as _get_default_validation_commands,
44)
45from src.pipeline.context_pressure_handler import (
46 ContextPressureConfig,
47 ContextPressureHandler,
48)
49from src.infra.clients.cerberus_review import format_review_issues
50from src.infra.clients.review_output_parser import ReviewResult
51from src.pipeline.message_stream_processor import (
52 ContextPressureError,
53 IdleTimeoutError,
54 IdleTimeoutStream,
55 MessageIterationResult,
56 MessageIterationState,
57 MessageStreamProcessor,
58 StreamProcessorCallbacks,
59 StreamProcessorConfig,
60)
62if TYPE_CHECKING:
63 from collections.abc import AsyncIterator
65 from src.core.models import IssueResolution
66 from src.core.protocols import (
67 MalaEventSink,
68 ReviewIssueProtocol,
69 SDKClientFactoryProtocol,
70 SDKClientProtocol,
71 )
72 from src.domain.deadlock import DeadlockMonitor
73 from src.domain.lifecycle import (
74 GateOutcome,
75 RetryState,
76 ReviewIssue,
77 ReviewOutcome,
78 TransitionResult,
79 )
80 from src.domain.validation.config import PromptValidationCommands
81 from src.infra.hooks import LintCache
82 from src.infra.telemetry import TelemetrySpan
83 from src.pipeline.message_stream_processor import (
84 AgentTextCallback,
85 ToolUseCallback,
86 )
89# Module-level logger for idle retry messages
90logger = logging.getLogger(__name__)
92# Timeout for disconnect() call
93DISCONNECT_TIMEOUT = 10.0
96# Type aliases for callbacks
97GateCheckCallback = Callable[
98 [str, Path, "RetryState"],
99 Coroutine[Any, Any, tuple["GateOutcome", int]],
100]
101ReviewCheckCallback = Callable[
102 [str, str | None, str | None, str | None, "RetryState"],
103 Coroutine[Any, Any, "ReviewOutcome"],
104]
105ReviewNoProgressCallback = Callable[
106 [Path, int, str | None, str | None],
107 bool,
108]
109LogOffsetCallback = Callable[[Path, int], int]
112@dataclass
113class SessionConfig:
114 """Derived configuration for session execution.
116 Computed from AgentSessionConfig during initialization.
118 Attributes:
119 agent_id: Unique agent ID for this session.
120 options: SDK client options.
121 lint_cache: Lint command result cache.
122 log_file_wait_timeout: Timeout for log file availability.
123 log_file_poll_interval: Poll interval for log file.
124 idle_timeout_seconds: Idle timeout for SDK stream.
125 """
127 agent_id: str
128 options: object
129 lint_cache: LintCache
130 log_file_wait_timeout: float
131 log_file_poll_interval: float
132 idle_timeout_seconds: float | None
135@dataclass
136class SessionExecutionState:
137 """Mutable state for session execution.
139 Bundles all session state that evolves during execution, including
140 lifecycle context, session identifiers, and log paths.
142 Attributes:
143 lifecycle: Lifecycle state machine.
144 lifecycle_ctx: Lifecycle context with retry state.
145 session_id: SDK session ID (updated when ResultMessage received).
146 log_path: Path to session log file.
147 final_result: Final result text from session.
148 cerberus_review_log_path: Path to Cerberus review log (if any).
149 msg_state: Message iteration state.
150 """
152 lifecycle: ImplementerLifecycle
153 lifecycle_ctx: LifecycleContext
154 session_id: str | None = None
155 log_path: Path | None = None
156 final_result: str = ""
157 cerberus_review_log_path: str | None = None
158 msg_state: MessageIterationState = field(default_factory=MessageIterationState)
161@dataclass
162class SessionPrompts:
163 """Prompt templates for agent session execution.
165 Holds prompt templates loaded from files. This keeps file I/O at the
166 orchestration boundary and allows tests to inject custom prompts.
168 Attributes:
169 gate_followup: Template for gate failure follow-up prompts.
170 review_followup: Template for review issues follow-up prompts.
171 idle_resume: Template for idle timeout resume prompts.
172 checkpoint_request: Prompt to request checkpoint from agent.
173 continuation: Template for continuation prompt with checkpoint.
174 """
176 gate_followup: str
177 review_followup: str
178 idle_resume: str
179 checkpoint_request: str = ""
180 continuation: str = ""
183@dataclass
184class AgentSessionConfig:
185 """Configuration for agent session execution.
187 Bundles all configuration needed to run an agent session.
189 Attributes:
190 repo_path: Path to the repository.
191 timeout_seconds: Session timeout in seconds.
192 prompts: Provider for session prompts (gate, review, idle resume).
193 max_gate_retries: Maximum gate retry attempts.
194 max_review_retries: Maximum review retry attempts.
195 review_enabled: Whether Cerberus external review is enabled.
196 When enabled, code changes are reviewed after the gate passes.
197 log_file_wait_timeout: Seconds to wait for log file after session
198 completes. Default 60s allows time for SDK to flush logs under load.
199 idle_timeout_seconds: Seconds to wait for SDK message when not waiting
200 for tool execution. If None, defaults to min(900, max(300, timeout_seconds * 0.2))
201 which scales with session timeout (300-900s range). During tool execution
202 (after ToolUseBlock, before result), timeout is disabled. Set to 0 to
203 disable timeout entirely.
204 lint_tools: Set of lint tool names for LintCache. If None, uses default
205 lint tools. Populated from ValidationSpec commands.
206 prompt_validation_commands: Validation commands for prompt templates.
207 If None, uses default Python/uv commands.
208 context_restart_threshold: Ratio (0.0-1.0) at which to raise
209 ContextPressureError. Default 0.90 (90% of context_limit).
210 context_limit: Maximum context tokens. Default 200K for Claude.
211 """
213 repo_path: Path
214 timeout_seconds: int
215 prompts: SessionPrompts
216 max_gate_retries: int = 3
217 max_review_retries: int = 3
218 review_enabled: bool = True
219 log_file_wait_timeout: float = 60.0
220 idle_timeout_seconds: float | None = None
221 max_idle_retries: int = 2
222 idle_retry_backoff: tuple[float, ...] = (0.0, 5.0, 15.0)
223 lint_tools: frozenset[str] | None = None
224 prompt_validation_commands: PromptValidationCommands | None = None
225 context_restart_threshold: float = 0.90
226 context_limit: int = 200_000
227 deadlock_monitor: DeadlockMonitor | None = None
230@dataclass
231class AgentSessionInput:
232 """Input for running an agent session.
234 Bundles all data needed to start a session for an issue.
236 Attributes:
237 issue_id: The issue ID being worked on.
238 prompt: The initial prompt to send to the agent.
239 baseline_commit: Optional baseline commit for diff comparison.
240 issue_description: Issue description for scope verification.
241 agent_id: Optional pre-generated agent ID for lock management.
242 """
244 issue_id: str
245 prompt: str
246 baseline_commit: str | None = None
247 issue_description: str | None = None
248 agent_id: str | None = None
251@dataclass
252class AgentSessionOutput:
253 """Output from an agent session.
255 Contains all results and metadata from a completed session.
257 Attributes:
258 success: Whether the session completed successfully.
259 summary: Human-readable summary of the outcome.
260 session_id: Claude SDK session ID (if available).
261 log_path: Path to the session log file (if available).
262 gate_attempts: Number of gate retry attempts.
263 review_attempts: Number of review retry attempts.
264 resolution: Issue resolution outcome (if any).
265 duration_seconds: Total session duration.
266 agent_id: The agent ID used for this session.
267 review_log_path: Path to Cerberus review session log (if any).
268 low_priority_review_issues: P2/P3 review issues to track as beads issues.
269 """
271 success: bool
272 summary: str
273 session_id: str | None = None
274 log_path: Path | None = None
275 gate_attempts: int = 1
276 review_attempts: int = 0
277 resolution: IssueResolution | None = None
278 duration_seconds: float = 0.0
279 agent_id: str = ""
280 review_log_path: str | None = None
281 low_priority_review_issues: list[ReviewIssueProtocol] | None = None
284@dataclass
285class SessionCallbacks:
286 """Callbacks for external actions during session execution.
288 These callbacks allow the orchestrator to inject behavior for
289 actions that require external state (gate checks, review, etc.)
290 without coupling AgentSessionRunner to those implementations.
292 Attributes:
293 on_gate_check: Async callback to run quality gate check.
294 Args: (issue_id, log_path, retry_state) -> (GateResult, new_offset)
295 on_review_check: Async callback to run external review (Cerberus).
296 Args: (issue_id, issue_description, baseline_commit, session_id, retry_state) -> ReviewOutcome
297 on_review_no_progress: Sync callback to check if no progress on review retry.
298 Args: (log_path, log_offset, prev_commit, curr_commit) -> bool
299 get_log_path: Callback to get log path from session ID.
300 Args: (session_id) -> Path
301 get_log_offset: Callback to get log end offset.
302 Args: (log_path, start_offset) -> int
303 on_abort: Callback when fatal error requires run abort.
304 Args: (reason) -> None
305 on_tool_use: Callback for SDK tool use events.
306 Args: (agent_id, tool_name, arguments) -> None
307 on_agent_text: Callback for SDK text output events.
308 Args: (agent_id, text) -> None
309 """
311 on_gate_check: GateCheckCallback | None = None
312 on_review_check: ReviewCheckCallback | None = None
313 on_review_no_progress: ReviewNoProgressCallback | None = None
314 get_log_path: Callable[[str], Path] | None = None
315 get_log_offset: LogOffsetCallback | None = None
316 on_abort: Callable[[str], None] | None = None
317 on_tool_use: ToolUseCallback | None = None
318 on_agent_text: AgentTextCallback | None = None
321@dataclass
322class ReviewEffectResult:
323 """Result from _handle_review_effect method.
325 Encapsulates the multi-value return from review effect handling
326 with named fields for clarity.
327 """
329 pending_query: str | None
330 """Query to send for review retry, or None if no retry needed."""
332 should_break: bool
333 """Whether the caller should break out of the message iteration loop."""
335 cerberus_log_path: str | None
336 """Path to Cerberus review log file, if captured."""
338 transition_result: TransitionResult
339 """Lifecycle transition result."""
342def _emit_review_result_events(
343 event_sink: MalaEventSink | None,
344 input: AgentSessionInput,
345 result: TransitionResult,
346 review_result: ReviewOutcome,
347 lifecycle_ctx: LifecycleContext,
348 max_review_retries: int,
349 blocking_count: int,
350) -> None:
351 """Emit events based on review result transition.
353 Handles event emission for:
354 - COMPLETE_SUCCESS: on_review_passed
355 - RUN_REVIEW (parse error): on_warning
356 - SEND_REVIEW_RETRY: on_review_retry with blocking_count
357 """
358 if event_sink is None:
359 return
361 if result.effect == Effect.COMPLETE_SUCCESS:
362 event_sink.on_review_passed(
363 input.issue_id,
364 issue_id=input.issue_id,
365 )
366 return
368 if result.effect == Effect.RUN_REVIEW:
369 error_detail = review_result.parse_error or "unknown error"
370 event_sink.on_warning(
371 f"Review tool error: {error_detail}; retrying",
372 agent_id=input.issue_id,
373 )
374 return
376 if result.effect == Effect.SEND_REVIEW_RETRY:
377 logger.debug(
378 "Session %s: SEND_REVIEW_RETRY triggered "
379 "(attempt %d/%d, %d blocking issues)",
380 input.issue_id,
381 lifecycle_ctx.retry_state.review_attempt,
382 max_review_retries,
383 blocking_count,
384 )
385 event_sink.on_review_retry(
386 input.issue_id,
387 lifecycle_ctx.retry_state.review_attempt,
388 max_review_retries,
389 error_count=blocking_count or None,
390 parse_error=review_result.parse_error,
391 issue_id=input.issue_id,
392 )
395def _emit_gate_passed_events(
396 event_sink: MalaEventSink | None,
397 issue_id: str,
398 review_attempt: int,
399) -> None:
400 """Emit gate passed events when first entering review.
402 Only emits events on the first review attempt (review_attempt == 1),
403 indicating the gate has just passed.
405 Args:
406 event_sink: Event sink to emit to, or None to skip emission.
407 issue_id: The issue identifier.
408 review_attempt: Current review attempt number (1-based).
409 """
410 if review_attempt != 1 or event_sink is None:
411 return
413 event_sink.on_gate_passed(
414 issue_id,
415 issue_id=issue_id,
416 )
417 event_sink.on_validation_result(
418 issue_id,
419 passed=True,
420 issue_id=issue_id,
421 )
424def _count_blocking_issues(issues: list[ReviewIssue] | None) -> int:
425 """Count issues with priority <= 1 (P0 or P1).
427 Args:
428 issues: List of review issues, or None.
430 Returns:
431 Number of blocking (high-priority) issues.
432 """
433 if not issues:
434 return 0
435 return sum(1 for i in issues if i.priority is not None and i.priority <= 1)
438def _make_review_effect_result(
439 effect: Effect,
440 cerberus_log_path: str | None,
441 transition_result: TransitionResult,
442 pending_query: str | None = None,
443) -> ReviewEffectResult:
444 """Build ReviewEffectResult based on effect type.
446 Args:
447 effect: The lifecycle effect to handle.
448 cerberus_log_path: Path to Cerberus review log.
449 transition_result: Lifecycle transition result.
450 pending_query: Query string for SEND_REVIEW_RETRY effect.
452 Returns:
453 ReviewEffectResult with appropriate should_break flag.
454 """
455 should_break = effect in (Effect.COMPLETE_SUCCESS, Effect.COMPLETE_FAILURE)
456 return ReviewEffectResult(
457 pending_query=pending_query,
458 should_break=should_break,
459 cerberus_log_path=cerberus_log_path,
460 transition_result=transition_result,
461 )
464def _build_review_retry_prompt(
465 review_result: ReviewOutcome,
466 lifecycle_ctx: LifecycleContext,
467 issue_id: str,
468 repo_path: Path,
469 max_review_retries: int,
470 review_followup_template: str,
471) -> str:
472 """Build the follow-up prompt for review retry.
474 Args:
475 review_result: The review outcome with issues to address.
476 lifecycle_ctx: Lifecycle context with retry state.
477 issue_id: The issue identifier.
478 repo_path: Repository path for formatting issue paths.
479 max_review_retries: Maximum number of review retries.
480 review_followup_template: Template for review follow-up prompts.
482 Returns:
483 Formatted prompt string for the agent to address review issues.
484 """
485 review_issues_text = format_review_issues(
486 review_result.issues, # type: ignore[arg-type]
487 base_path=repo_path,
488 )
489 return review_followup_template.format(
490 attempt=lifecycle_ctx.retry_state.review_attempt,
491 max_attempts=max_review_retries,
492 review_issues=review_issues_text,
493 issue_id=issue_id,
494 )
497@dataclass
498class AgentSessionRunner:
499 """Runs agent sessions with lifecycle management.
501 This class encapsulates the SDK session execution logic that was previously
502 inline in MalaOrchestrator.run_implementer. It manages:
503 - SDK client creation and message streaming
504 - Lifecycle state transitions
505 - Hook setup (lock enforcement, lint cache, etc.)
506 - Message logging and telemetry
508 The runner uses callbacks for external operations (gate checks, reviews)
509 to avoid tight coupling with the orchestrator's dependencies.
511 Usage:
512 runner = AgentSessionRunner(
513 config=AgentSessionConfig(repo_path=repo_path, ...),
514 callbacks=SessionCallbacks(on_gate_check=..., ...),
515 sdk_client_factory=SDKClientFactory(),
516 )
517 output = await runner.run_session(input)
519 Attributes:
520 config: Session configuration.
521 callbacks: Callbacks for external operations.
522 sdk_client_factory: Factory for creating SDK clients (required).
523 event_sink: Optional event sink for structured logging.
524 """
526 config: AgentSessionConfig
527 sdk_client_factory: SDKClientFactoryProtocol
528 callbacks: SessionCallbacks = field(default_factory=SessionCallbacks)
529 event_sink: MalaEventSink | None = None
530 _context_pressure_handler: ContextPressureHandler = field(init=False, repr=False)
532 def __post_init__(self) -> None:
533 """Initialize derived components."""
534 self._context_pressure_handler = ContextPressureHandler(
535 config=ContextPressureConfig(
536 checkpoint_request_prompt=self.config.prompts.checkpoint_request,
537 continuation_template=self.config.prompts.continuation,
538 ),
539 sdk_client_factory=self.sdk_client_factory,
540 )
542 def _initialize_session(
543 self,
544 input: AgentSessionInput,
545 agent_id: str | None = None,
546 ) -> tuple[SessionConfig, SessionExecutionState]:
547 """Initialize session components and state.
549 Creates lifecycle, hooks, SDK options, and mutable state for session.
551 Args:
552 input: Session input with issue_id, prompt, etc.
553 agent_id: Optional agent ID. If None, generates a new one.
554 Pass an existing ID to preserve lock continuity across restarts.
556 Returns:
557 Tuple of (SessionConfig, SessionExecutionState).
558 """
559 # Generate agent ID if not provided
560 if agent_id is None:
561 agent_id = f"{input.issue_id}-{uuid.uuid4().hex[:8]}"
563 # Initialize lifecycle
564 lifecycle_config = LifecycleConfig(
565 max_gate_retries=self.config.max_gate_retries,
566 max_review_retries=self.config.max_review_retries,
567 review_enabled=self.config.review_enabled,
568 )
569 lifecycle = ImplementerLifecycle(lifecycle_config)
570 lifecycle_ctx = LifecycleContext()
571 lifecycle_ctx.retry_state.baseline_timestamp = int(time.time())
573 # Build session components using AgentRuntimeBuilder
574 runtime = (
575 AgentRuntimeBuilder(
576 self.config.repo_path, agent_id, self.sdk_client_factory
577 )
578 .with_hooks(deadlock_monitor=self.config.deadlock_monitor)
579 .with_env()
580 .with_mcp()
581 .with_disallowed_tools()
582 .with_lint_tools(self.config.lint_tools)
583 .build()
584 )
586 # Calculate idle timeout
587 idle_timeout_seconds = self.config.idle_timeout_seconds
588 if idle_timeout_seconds is None:
589 derived = self.config.timeout_seconds * 0.2
590 idle_timeout_seconds = min(900.0, max(300.0, derived))
591 if idle_timeout_seconds <= 0:
592 idle_timeout_seconds = None
594 session_config = SessionConfig(
595 agent_id=agent_id,
596 options=runtime.options,
597 lint_cache=runtime.lint_cache,
598 log_file_wait_timeout=self.config.log_file_wait_timeout,
599 log_file_poll_interval=0.5,
600 idle_timeout_seconds=idle_timeout_seconds,
601 )
603 exec_state = SessionExecutionState(
604 lifecycle=lifecycle,
605 lifecycle_ctx=lifecycle_ctx,
606 )
608 return session_config, exec_state
610 async def _run_lifecycle_loop(
611 self,
612 input: AgentSessionInput,
613 session_cfg: SessionConfig,
614 state: SessionExecutionState,
615 tracer: TelemetrySpan | None = None,
616 ) -> None:
617 """Run the main lifecycle loop.
619 Executes the message iteration, gate, and review loop until
620 terminal state is reached.
622 Args:
623 input: Session input with issue_id, prompt, etc.
624 session_cfg: Derived session configuration.
625 state: Mutable session execution state.
626 tracer: Optional telemetry span context.
627 """
628 lifecycle = state.lifecycle
629 lifecycle_ctx = state.lifecycle_ctx
631 # Start lifecycle
632 lifecycle.start()
633 if self.event_sink is not None:
634 self.event_sink.on_lifecycle_state(input.issue_id, lifecycle.state.name)
636 pending_query: str | None = input.prompt
637 result: TransitionResult | None = None
639 while not lifecycle.is_terminal:
640 # === QUERY + MESSAGE ITERATION ===
641 if pending_query is not None:
642 iter_result = await self._run_message_iteration(
643 query=pending_query,
644 issue_id=input.issue_id,
645 options=session_cfg.options,
646 state=state.msg_state,
647 lifecycle_ctx=lifecycle_ctx,
648 lint_cache=session_cfg.lint_cache,
649 idle_timeout_seconds=session_cfg.idle_timeout_seconds,
650 tracer=tracer,
651 )
652 if iter_result.session_id is not None:
653 state.session_id = iter_result.session_id
654 pending_query = None
656 result = lifecycle.on_messages_complete(
657 lifecycle_ctx, has_session_id=bool(state.session_id)
658 )
659 if self.event_sink is not None:
660 self.event_sink.on_lifecycle_state(
661 input.issue_id, lifecycle.state.name
662 )
663 if result.effect == Effect.COMPLETE_FAILURE:
664 state.final_result = lifecycle_ctx.final_result
665 break
666 else:
667 assert result is not None, (
668 "Bug: entered loop without pending_query but result not set"
669 )
671 # Handle WAIT_FOR_LOG
672 if result.effect == Effect.WAIT_FOR_LOG:
673 state.log_path, result = await self._handle_log_waiting(
674 state.session_id,
675 input.issue_id,
676 state.log_path,
677 lifecycle,
678 lifecycle_ctx,
679 session_cfg.log_file_wait_timeout,
680 session_cfg.log_file_poll_interval,
681 )
682 if lifecycle.state == LifecycleState.FAILED:
683 state.final_result = lifecycle_ctx.final_result
684 break
686 # Handle RUN_GATE
687 if result.effect == Effect.RUN_GATE:
688 pending_query, gate_trans = await self._handle_gate_check(
689 input, state, lifecycle, lifecycle_ctx
690 )
691 if pending_query is not None:
692 state.msg_state.pending_session_id = state.session_id
693 state.msg_state.idle_retry_count = 0
694 continue
695 if lifecycle.is_terminal:
696 state.final_result = lifecycle_ctx.final_result
697 break
698 # Update result with gate transition for next effect check
699 result = gate_trans
701 # Handle RUN_REVIEW
702 if result.effect == Effect.RUN_REVIEW:
703 pending_query, review_result = await self._handle_review_check(
704 input, state, lifecycle, lifecycle_ctx
705 )
706 if review_result is not None:
707 result = review_result
708 if pending_query is not None:
709 state.msg_state.pending_session_id = state.session_id
710 state.msg_state.idle_retry_count = 0
711 if lifecycle.is_terminal:
712 state.final_result = lifecycle_ctx.final_result
713 break
714 continue
716 state.final_result = lifecycle_ctx.final_result
718 async def _handle_gate_check(
719 self,
720 input: AgentSessionInput,
721 state: SessionExecutionState,
722 lifecycle: ImplementerLifecycle,
723 lifecycle_ctx: LifecycleContext,
724 ) -> tuple[str | None, TransitionResult]:
725 """Handle RUN_GATE effect - emit events and run gate check.
727 Args:
728 input: Session input with issue_id.
729 state: Session execution state.
730 lifecycle: Lifecycle state machine.
731 lifecycle_ctx: Lifecycle context.
733 Returns:
734 Tuple of (pending query for retry or None, transition result).
735 """
736 # Emit validation started BEFORE the gate check
737 if self.event_sink is not None:
738 self.event_sink.on_validation_started(
739 input.issue_id, issue_id=input.issue_id
740 )
741 self.event_sink.on_gate_started(
742 input.issue_id,
743 lifecycle_ctx.retry_state.gate_attempt,
744 self.config.max_gate_retries,
745 issue_id=input.issue_id,
746 )
748 if self.callbacks.on_gate_check is None:
749 raise ValueError("on_gate_check callback must be set")
751 assert state.log_path is not None
752 gate_result, new_offset = await self.callbacks.on_gate_check(
753 input.issue_id, state.log_path, lifecycle_ctx.retry_state
754 )
756 retry_query, should_break, trans_result = self._handle_gate_effect(
757 input, gate_result, lifecycle, lifecycle_ctx, new_offset
758 )
759 if should_break:
760 return None, trans_result
761 if retry_query is not None:
762 if state.session_id is None:
763 raise IdleTimeoutError(
764 "Cannot retry gate: session_id not received from SDK"
765 )
766 logger.debug(
767 "Session %s: queueing gate retry prompt (%d chars, session_id=%s)",
768 input.issue_id,
769 len(retry_query),
770 state.session_id[:8],
771 )
772 return retry_query, trans_result
773 return None, trans_result
775 async def _handle_review_check(
776 self,
777 input: AgentSessionInput,
778 state: SessionExecutionState,
779 lifecycle: ImplementerLifecycle,
780 lifecycle_ctx: LifecycleContext,
781 ) -> tuple[str | None, TransitionResult | None]:
782 """Handle RUN_REVIEW effect - run review and process result.
784 Args:
785 input: Session input with issue_id, description, baseline.
786 state: Session execution state.
787 lifecycle: Lifecycle state machine.
788 lifecycle_ctx: Lifecycle context.
790 Returns:
791 Tuple of (pending_query for retry, transition_result).
792 """
793 assert state.log_path is not None
794 review_effect = await self._handle_review_effect(
795 input, state.log_path, state.session_id, lifecycle, lifecycle_ctx
796 )
797 if review_effect.cerberus_log_path is not None:
798 state.cerberus_review_log_path = review_effect.cerberus_log_path
799 if review_effect.should_break:
800 return None, review_effect.transition_result
801 if review_effect.pending_query is not None:
802 if state.session_id is None:
803 raise IdleTimeoutError(
804 "Cannot retry review: session_id not received from SDK"
805 )
806 logger.debug(
807 "Session %s: queueing review retry prompt (%d chars, session_id=%s)",
808 input.issue_id,
809 len(review_effect.pending_query),
810 state.session_id[:8],
811 )
812 return review_effect.pending_query, review_effect.transition_result
813 return None, review_effect.transition_result
815 def _build_session_output(
816 self,
817 session_cfg: SessionConfig,
818 state: SessionExecutionState,
819 duration: float,
820 ) -> AgentSessionOutput:
821 """Build session output from execution state.
823 Args:
824 session_cfg: Session configuration with agent_id.
825 state: Session execution state.
826 duration: Total session duration in seconds.
828 Returns:
829 AgentSessionOutput with all results and metadata.
830 """
831 return AgentSessionOutput(
832 success=state.lifecycle_ctx.success,
833 summary=state.final_result,
834 session_id=state.session_id,
835 log_path=state.log_path,
836 gate_attempts=state.lifecycle_ctx.retry_state.gate_attempt,
837 review_attempts=state.lifecycle_ctx.retry_state.review_attempt,
838 resolution=state.lifecycle_ctx.resolution,
839 duration_seconds=duration,
840 agent_id=session_cfg.agent_id,
841 review_log_path=state.cerberus_review_log_path,
842 low_priority_review_issues=cast(
843 "list[ReviewIssueProtocol] | None",
844 state.lifecycle_ctx.low_priority_review_issues or None,
845 ),
846 )
848 async def _handle_log_waiting(
849 self,
850 session_id: str | None,
851 issue_id: str,
852 log_path: Path | None,
853 lifecycle: ImplementerLifecycle,
854 lifecycle_ctx: LifecycleContext,
855 log_file_wait_timeout: float,
856 log_file_poll_interval: float = 0.5,
857 ) -> tuple[Path | None, TransitionResult]:
858 """Handle WAIT_FOR_LOG effect - wait for log file to become available.
860 Args:
861 session_id: Current SDK session ID.
862 issue_id: Issue ID for logging.
863 log_path: Current log path (may be None).
864 lifecycle: Lifecycle state machine.
865 lifecycle_ctx: Lifecycle context.
866 log_file_wait_timeout: Max seconds to wait for log file.
867 log_file_poll_interval: Seconds between poll attempts.
869 Returns:
870 Tuple of (updated log_path, TransitionResult from log ready/timeout).
871 """
872 if self.callbacks.get_log_path is None:
873 raise ValueError("get_log_path callback must be set")
874 if session_id is None:
875 raise ValueError("session_id must be set before waiting for log")
876 new_log_path = self.callbacks.get_log_path(session_id)
878 # Reset log_offset if log file changed (new session started)
879 # This prevents using a stale offset from a previous session's
880 # larger log file when parsing a new, smaller log file
881 if log_path is not None and new_log_path != log_path:
882 logger.info(
883 "Session %s: log path changed from %s to %s, "
884 "resetting log_offset from %d to 0",
885 issue_id,
886 log_path.name,
887 new_log_path.name,
888 lifecycle_ctx.retry_state.log_offset,
889 )
890 lifecycle_ctx.retry_state.log_offset = 0
892 log_path = new_log_path
893 if self.event_sink is not None:
894 self.event_sink.on_log_waiting(issue_id)
896 # Wait for log file
897 wait_elapsed = 0.0
898 while not log_path.exists():
899 if wait_elapsed >= log_file_wait_timeout:
900 result = lifecycle.on_log_timeout(lifecycle_ctx, str(log_path))
901 if self.event_sink is not None:
902 self.event_sink.on_log_timeout(issue_id, str(log_path))
903 return log_path, result
904 await asyncio.sleep(log_file_poll_interval)
905 wait_elapsed += log_file_poll_interval
907 if log_path.exists():
908 if self.event_sink is not None:
909 self.event_sink.on_log_ready(issue_id)
910 result = lifecycle.on_log_ready(lifecycle_ctx)
911 return log_path, result
913 def _handle_gate_effect(
914 self,
915 input: AgentSessionInput,
916 gate_result: GateOutcome,
917 lifecycle: ImplementerLifecycle,
918 lifecycle_ctx: LifecycleContext,
919 new_offset: int,
920 ) -> tuple[str | None, bool, TransitionResult]:
921 """Handle RUN_GATE effect - process gate result and emit events.
923 Args:
924 input: Session input with issue_id.
925 gate_result: Result from gate check callback.
926 lifecycle: Lifecycle state machine.
927 lifecycle_ctx: Lifecycle context.
928 new_offset: New log offset after gate check.
930 Returns:
931 Tuple of (pending_query for retry or None, should_break, transition_result).
932 """
933 result = lifecycle.on_gate_result(lifecycle_ctx, gate_result, new_offset)
935 if result.effect == Effect.COMPLETE_SUCCESS:
936 _emit_gate_passed_events(self.event_sink, input.issue_id, review_attempt=1)
937 return None, True, result # break
939 if result.effect == Effect.COMPLETE_FAILURE:
940 if self.event_sink is not None:
941 self.event_sink.on_gate_failed(
942 input.issue_id,
943 lifecycle_ctx.retry_state.gate_attempt,
944 self.config.max_gate_retries,
945 issue_id=input.issue_id,
946 )
947 self.event_sink.on_gate_result(
948 input.issue_id,
949 passed=False,
950 failure_reasons=list(gate_result.failure_reasons),
951 issue_id=input.issue_id,
952 )
953 self.event_sink.on_validation_result(
954 input.issue_id,
955 passed=False,
956 issue_id=input.issue_id,
957 )
958 return None, True, result # break
960 if result.effect == Effect.SEND_GATE_RETRY:
961 if self.event_sink is not None:
962 self.event_sink.on_gate_retry(
963 input.issue_id,
964 lifecycle_ctx.retry_state.gate_attempt,
965 self.config.max_gate_retries,
966 issue_id=input.issue_id,
967 )
968 self.event_sink.on_gate_result(
969 input.issue_id,
970 passed=False,
971 failure_reasons=list(gate_result.failure_reasons),
972 issue_id=input.issue_id,
973 )
974 # Emit validation_result before retry so every
975 # on_validation_started has a corresponding result
976 self.event_sink.on_validation_result(
977 input.issue_id,
978 passed=False,
979 issue_id=input.issue_id,
980 )
981 # Build follow-up prompt
982 failure_text = "\n".join(f"- {r}" for r in gate_result.failure_reasons)
983 # Get validation commands or use defaults
984 cmds = (
985 self.config.prompt_validation_commands
986 or _get_default_validation_commands()
987 )
988 pending_query = self.config.prompts.gate_followup.format(
989 attempt=lifecycle_ctx.retry_state.gate_attempt,
990 max_attempts=self.config.max_gate_retries,
991 failure_reasons=failure_text,
992 issue_id=input.issue_id,
993 lint_command=cmds.lint,
994 format_command=cmds.format,
995 typecheck_command=cmds.typecheck,
996 test_command=cmds.test,
997 )
998 return pending_query, False, result # continue with retry
1000 # RUN_REVIEW or other effects - pass through
1001 return None, False, result
1003 def _check_review_no_progress(
1004 self,
1005 input: AgentSessionInput,
1006 log_path: Path,
1007 lifecycle: ImplementerLifecycle,
1008 lifecycle_ctx: LifecycleContext,
1009 cerberus_review_log_path: str | None,
1010 ) -> ReviewEffectResult | None:
1011 """Check if review retry has made no progress and should be skipped.
1013 Args:
1014 input: Session input with issue_id.
1015 log_path: Path to log file.
1016 lifecycle: Lifecycle state machine.
1017 lifecycle_ctx: Lifecycle context.
1018 cerberus_review_log_path: Path to Cerberus review log, if any.
1020 Returns:
1021 ReviewEffectResult if no progress detected (caller should return early),
1022 None if review should proceed normally.
1023 """
1024 # Only check on retry attempts (attempt > 1) when callback is configured
1025 if (
1026 lifecycle_ctx.retry_state.review_attempt <= 1
1027 or self.callbacks.on_review_no_progress is None
1028 ):
1029 return None
1031 current_commit = (
1032 lifecycle_ctx.last_gate_result.commit_hash
1033 if lifecycle_ctx.last_gate_result
1034 else None
1035 )
1036 no_progress = self.callbacks.on_review_no_progress(
1037 log_path,
1038 lifecycle_ctx.retry_state.log_offset,
1039 lifecycle_ctx.retry_state.previous_commit_hash,
1040 current_commit,
1041 )
1043 if not no_progress:
1044 return None
1046 # Emit event for no-progress skip
1047 if self.event_sink is not None:
1048 self.event_sink.on_review_skipped_no_progress(input.issue_id)
1050 # Create synthetic failed review
1051 synthetic = ReviewResult(passed=False, issues=[], parse_error=None)
1052 new_offset = (
1053 self.callbacks.get_log_offset(
1054 log_path,
1055 lifecycle_ctx.retry_state.log_offset,
1056 )
1057 if self.callbacks.get_log_offset
1058 else 0
1059 )
1060 no_progress_result = lifecycle.on_review_result(
1061 lifecycle_ctx,
1062 synthetic,
1063 new_offset,
1064 no_progress=True,
1065 )
1066 return ReviewEffectResult(
1067 pending_query=None,
1068 should_break=True,
1069 cerberus_log_path=cerberus_review_log_path,
1070 transition_result=no_progress_result,
1071 )
1073 async def _handle_review_effect(
1074 self,
1075 input: AgentSessionInput,
1076 log_path: Path,
1077 session_id: str | None,
1078 lifecycle: ImplementerLifecycle,
1079 lifecycle_ctx: LifecycleContext,
1080 ) -> ReviewEffectResult:
1081 """Handle RUN_REVIEW effect - run review and process result.
1083 Args:
1084 input: Session input with issue_id, description, baseline.
1085 log_path: Path to log file.
1086 session_id: Current SDK session ID.
1087 lifecycle: Lifecycle state machine.
1088 lifecycle_ctx: Lifecycle context.
1090 Returns:
1091 ReviewEffectResult with pending_query, should_break, cerberus_log_path,
1092 and transition_result.
1093 """
1094 cerberus_review_log_path: str | None = None
1096 # Emit gate passed events when first entering review
1097 # (review_attempt == 1 means gate just passed)
1098 _emit_gate_passed_events(
1099 self.event_sink, input.issue_id, lifecycle_ctx.retry_state.review_attempt
1100 )
1102 # Check no-progress before running review
1103 if no_progress_result := self._check_review_no_progress(
1104 input, log_path, lifecycle, lifecycle_ctx, cerberus_review_log_path
1105 ):
1106 return no_progress_result
1108 if self.event_sink is not None:
1109 self.event_sink.on_review_started(
1110 input.issue_id,
1111 lifecycle_ctx.retry_state.review_attempt,
1112 self.config.max_review_retries,
1113 issue_id=input.issue_id,
1114 )
1116 if self.callbacks.on_review_check is None:
1117 raise ValueError("on_review_check callback must be set")
1119 logger.debug(
1120 "Session %s: starting review (attempt %d/%d, session_id=%s)",
1121 input.issue_id,
1122 lifecycle_ctx.retry_state.review_attempt,
1123 self.config.max_review_retries,
1124 lifecycle_ctx.session_id[:8] if lifecycle_ctx.session_id else None,
1125 )
1126 review_start = time.time()
1127 review_result = await self.callbacks.on_review_check(
1128 input.issue_id,
1129 input.issue_description,
1130 input.baseline_commit,
1131 lifecycle_ctx.session_id,
1132 lifecycle_ctx.retry_state,
1133 )
1134 review_duration = time.time() - review_start
1135 issue_count = len(review_result.issues) if review_result.issues else 0
1136 blocking = _count_blocking_issues(review_result.issues)
1137 logger.debug(
1138 "Session %s: review completed in %.1fs "
1139 "(passed=%s, issues=%d, blocking=%d, parse_error=%s)",
1140 input.issue_id,
1141 review_duration,
1142 review_result.passed,
1143 issue_count,
1144 blocking,
1145 review_result.parse_error,
1146 )
1148 # Check for fatal error
1149 if review_result.fatal_error:
1150 if self.callbacks.on_abort is not None:
1151 self.callbacks.on_abort(
1152 review_result.parse_error or "Unrecoverable review error"
1153 )
1155 # Capture Cerberus review log if available
1156 log_attr = getattr(review_result, "review_log_path", None)
1157 if log_attr is not None:
1158 cerberus_review_log_path = str(log_attr)
1160 # Get new log offset
1161 new_offset = (
1162 self.callbacks.get_log_offset(
1163 log_path,
1164 lifecycle_ctx.retry_state.log_offset,
1165 )
1166 if self.callbacks.get_log_offset
1167 else 0
1168 )
1170 result = lifecycle.on_review_result(lifecycle_ctx, review_result, new_offset)
1172 # Emit appropriate events based on transition result
1173 _emit_review_result_events(
1174 self.event_sink,
1175 input,
1176 result,
1177 review_result,
1178 lifecycle_ctx,
1179 self.config.max_review_retries,
1180 blocking,
1181 )
1183 # Build pending_query only for SEND_REVIEW_RETRY
1184 pending_query = None
1185 if result.effect == Effect.SEND_REVIEW_RETRY:
1186 pending_query = _build_review_retry_prompt(
1187 review_result,
1188 lifecycle_ctx,
1189 input.issue_id,
1190 self.config.repo_path,
1191 self.config.max_review_retries,
1192 self.config.prompts.review_followup,
1193 )
1195 return _make_review_effect_result(
1196 result.effect,
1197 cerberus_review_log_path,
1198 result,
1199 pending_query,
1200 )
1202 async def _apply_retry_backoff(self, retry_count: int) -> None:
1203 """Apply backoff delay before an idle retry attempt.
1205 Args:
1206 retry_count: Current retry count (1-based).
1207 """
1208 if self.config.idle_retry_backoff:
1209 backoff_idx = min(
1210 retry_count - 1,
1211 len(self.config.idle_retry_backoff) - 1,
1212 )
1213 backoff = self.config.idle_retry_backoff[backoff_idx]
1214 else:
1215 backoff = 0.0
1216 if backoff > 0:
1217 logger.info(f"Idle retry {retry_count}: waiting {backoff}s")
1218 await asyncio.sleep(backoff)
1220 async def _disconnect_client_safely(
1221 self, client: SDKClientProtocol, issue_id: str
1222 ) -> None:
1223 """Disconnect SDK client with timeout, logging any failures."""
1224 try:
1225 await asyncio.wait_for(
1226 client.disconnect(),
1227 timeout=DISCONNECT_TIMEOUT,
1228 )
1229 except TimeoutError:
1230 logger.warning("disconnect() timed out, subprocess abandoned")
1231 except Exception as e:
1232 logger.debug(f"Error during disconnect: {e}")
1234 def _prepare_idle_retry(
1235 self,
1236 state: MessageIterationState,
1237 lifecycle_ctx: LifecycleContext,
1238 issue_id: str,
1239 ) -> str:
1240 """Prepare state for idle retry and return the next query.
1242 Updates state.idle_retry_count, state.pending_session_id, and clears
1243 state.pending_tool_ids.
1245 Raises:
1246 IdleTimeoutError: If retry is not possible (max retries exceeded,
1247 or tool calls occurred without session context).
1249 Returns:
1250 The query to use for the retry attempt.
1251 """
1252 # Check if we can retry
1253 if state.idle_retry_count >= self.config.max_idle_retries:
1254 logger.error(
1255 f"Session {issue_id}: max idle retries "
1256 f"({self.config.max_idle_retries}) exceeded"
1257 )
1258 raise IdleTimeoutError(
1259 f"Max idle retries ({self.config.max_idle_retries}) exceeded"
1260 )
1262 # Prepare for retry
1263 state.idle_retry_count += 1
1264 # Clear pending state from previous attempt to avoid
1265 # hanging on stale tool IDs (they won't resolve on new stream)
1266 state.pending_tool_ids.clear()
1267 state.first_message_received = False
1268 resume_id = state.session_id or lifecycle_ctx.session_id
1270 if resume_id is not None:
1271 state.pending_session_id = resume_id
1272 pending_query = self.config.prompts.idle_resume.format(issue_id=issue_id)
1273 logger.info(
1274 f"Session {issue_id}: retrying with resume "
1275 f"(session_id={resume_id[:8]}..., "
1276 f"attempt {state.idle_retry_count})"
1277 )
1278 # Reset tool calls after decision to preserve safety check
1279 state.tool_calls_this_turn = 0
1280 return pending_query
1281 elif state.tool_calls_this_turn == 0:
1282 state.pending_session_id = None
1283 # Keep original query - caller must provide it
1284 logger.info(
1285 f"Session {issue_id}: retrying with fresh session "
1286 f"(no session_id, no side effects, "
1287 f"attempt {state.idle_retry_count})"
1288 )
1289 # Return empty string to signal caller to keep original query
1290 return ""
1291 else:
1292 logger.error(
1293 f"Session {issue_id}: cannot retry - "
1294 f"{state.tool_calls_this_turn} tool calls "
1295 "occurred without session_id"
1296 )
1297 raise IdleTimeoutError(
1298 f"Cannot retry: {state.tool_calls_this_turn} tool calls "
1299 "occurred without session context"
1300 )
1302 def _get_stream_processor(self) -> MessageStreamProcessor:
1303 """Create a MessageStreamProcessor with current config/callbacks."""
1304 config = StreamProcessorConfig(
1305 context_limit=self.config.context_limit,
1306 context_restart_threshold=self.config.context_restart_threshold,
1307 )
1308 callbacks = StreamProcessorCallbacks(
1309 on_tool_use=self.callbacks.on_tool_use,
1310 on_agent_text=self.callbacks.on_agent_text,
1311 )
1312 return MessageStreamProcessor(config, callbacks)
1314 async def _process_message_stream(
1315 self,
1316 stream: AsyncIterator[Any],
1317 issue_id: str,
1318 state: MessageIterationState,
1319 lifecycle_ctx: LifecycleContext,
1320 lint_cache: LintCache,
1321 query_start: float,
1322 tracer: TelemetrySpan | None,
1323 ) -> MessageIterationResult:
1324 """Process SDK message stream and update state.
1326 Delegates to MessageStreamProcessor for stream iteration logic.
1328 Args:
1329 stream: The message stream to process.
1330 issue_id: Issue ID for logging.
1331 state: Mutable state for the iteration.
1332 lifecycle_ctx: Lifecycle context for session state.
1333 lint_cache: Cache for lint command results.
1334 query_start: Timestamp when query was sent.
1335 tracer: Optional telemetry span context.
1337 Returns:
1338 MessageIterationResult with success status.
1339 """
1340 processor = self._get_stream_processor()
1341 return await processor.process_stream(
1342 stream, issue_id, state, lifecycle_ctx, lint_cache, query_start, tracer
1343 )
1345 async def _run_message_iteration(
1346 self,
1347 query: str,
1348 issue_id: str,
1349 options: object,
1350 state: MessageIterationState,
1351 lifecycle_ctx: LifecycleContext,
1352 lint_cache: LintCache,
1353 idle_timeout_seconds: float | None,
1354 tracer: TelemetrySpan | None = None,
1355 ) -> MessageIterationResult:
1356 """Run a single message iteration with idle retry handling.
1358 Sends a query to the SDK and processes the response stream.
1359 Handles idle timeouts with automatic retry logic.
1361 Args:
1362 query: The query to send to the agent.
1363 issue_id: Issue ID for logging.
1364 options: SDK client options.
1365 state: Mutable state for the iteration.
1366 lifecycle_ctx: Lifecycle context for session state.
1367 lint_cache: Cache for lint command results.
1368 idle_timeout_seconds: Idle timeout (None to disable).
1369 tracer: Optional telemetry span context.
1371 Returns:
1372 MessageIterationResult with success status and updated state.
1374 Raises:
1375 IdleTimeoutError: If max idle retries exceeded.
1376 """
1377 pending_query: str | None = query
1378 state.tool_calls_this_turn = 0
1379 state.first_message_received = False
1380 state.pending_tool_ids.clear()
1382 while pending_query is not None:
1383 # Backoff before retry (not on first attempt)
1384 if state.idle_retry_count > 0:
1385 await self._apply_retry_backoff(state.idle_retry_count)
1387 # Create client for this attempt
1388 client = self.sdk_client_factory.create(options)
1390 try:
1391 async with client:
1392 # Send query
1393 query_start = time.time()
1394 if state.pending_session_id is not None:
1395 logger.debug(
1396 "Session %s: sending query with session_id=%s...",
1397 issue_id,
1398 state.pending_session_id[:8],
1399 )
1400 await client.query(
1401 pending_query, session_id=state.pending_session_id
1402 )
1403 else:
1404 logger.debug(
1405 "Session %s: sending query (new session)",
1406 issue_id,
1407 )
1408 await client.query(pending_query)
1410 # Wrap stream with idle timeout handling
1411 stream = IdleTimeoutStream(
1412 client.receive_response(),
1413 idle_timeout_seconds,
1414 state.pending_tool_ids,
1415 )
1417 try:
1418 return await self._process_message_stream(
1419 stream,
1420 issue_id,
1421 state,
1422 lifecycle_ctx,
1423 lint_cache,
1424 query_start,
1425 tracer,
1426 )
1428 except IdleTimeoutError:
1429 # Disconnect on idle timeout
1430 idle_duration = time.time() - query_start
1431 logger.warning(
1432 f"Session {issue_id}: idle timeout after "
1433 f"{idle_duration:.1f}s, first_msg={state.first_message_received}, "
1434 f"{state.tool_calls_this_turn} tool calls, disconnecting subprocess"
1435 )
1436 await self._disconnect_client_safely(client, issue_id)
1438 # Prepare state for retry (may raise IdleTimeoutError)
1439 retry_query = self._prepare_idle_retry(
1440 state, lifecycle_ctx, issue_id
1441 )
1442 # Empty string means keep original query
1443 if retry_query:
1444 pending_query = retry_query
1446 except IdleTimeoutError:
1447 raise
1449 # Should not reach here
1450 return MessageIterationResult(success=False)
1452 async def run_session(
1453 self,
1454 input: AgentSessionInput,
1455 tracer: TelemetrySpan | None = None,
1456 ) -> AgentSessionOutput:
1457 """Run an agent session for the given input.
1459 This method manages the full lifecycle of an agent session:
1460 1. Creates SDK client with appropriate options
1461 2. Sends initial prompt and streams responses
1462 3. Handles lifecycle transitions (gate, review, retries)
1463 4. Returns session output with results and metadata
1464 5. On ContextPressureError: checkpoints, restarts with fresh lifecycle
1466 Args:
1467 input: AgentSessionInput with issue_id, prompt, etc.
1468 tracer: Optional telemetry span context.
1470 Returns:
1471 AgentSessionOutput with success, summary, session_id, etc.
1472 """
1473 start_time = asyncio.get_event_loop().time()
1474 continuation_count = 0
1475 current_prompt = input.prompt
1476 # Use provided agent_id or generate one to preserve lock continuity across restarts
1477 agent_id = input.agent_id or f"{input.issue_id}-{uuid.uuid4().hex[:8]}"
1479 while True:
1480 # Calculate remaining time to enforce overall session timeout
1481 loop = asyncio.get_event_loop()
1482 elapsed = loop.time() - start_time
1483 remaining = self.config.timeout_seconds - elapsed
1485 # Create fresh lifecycle for each iteration
1486 session_input = AgentSessionInput(
1487 issue_id=input.issue_id,
1488 prompt=current_prompt,
1489 baseline_commit=input.baseline_commit,
1490 issue_description=input.issue_description,
1491 )
1492 session_cfg, state = self._initialize_session(session_input, agent_id)
1494 try:
1495 # Check timeout inside try block so on_timeout cleanup runs
1496 if remaining <= 0:
1497 raise TimeoutError("Session timeout exceeded across restarts")
1498 async with asyncio.timeout(remaining):
1499 await self._run_lifecycle_loop(
1500 session_input, session_cfg, state, tracer
1501 )
1502 # Normal completion - exit loop
1503 break
1504 except ContextPressureError as e:
1505 # Delegate to handler for checkpoint fetch and continuation prompt
1506 checkpoint_remaining = self.config.timeout_seconds - (
1507 loop.time() - start_time
1508 )
1509 (
1510 current_prompt,
1511 continuation_count,
1512 ) = await self._context_pressure_handler.handle_pressure_error(
1513 error=e,
1514 issue_id=input.issue_id,
1515 options=session_cfg.options,
1516 continuation_count=continuation_count,
1517 remaining_time=checkpoint_remaining,
1518 )
1519 # Loop continues with fresh lifecycle and continuation prompt
1520 except IdleTimeoutError as e:
1521 state.lifecycle.on_error(state.lifecycle_ctx, e)
1522 state.final_result = state.lifecycle_ctx.final_result
1523 break
1524 except TimeoutError:
1525 timeout_mins = self.config.timeout_seconds // 60
1526 state.lifecycle.on_timeout(state.lifecycle_ctx, timeout_mins)
1527 state.final_result = state.lifecycle_ctx.final_result
1528 break
1529 except Exception as e:
1530 state.lifecycle.on_error(state.lifecycle_ctx, e)
1531 state.final_result = state.lifecycle_ctx.final_result
1532 break
1534 duration = asyncio.get_event_loop().time() - start_time
1535 return self._build_session_output(session_cfg, state, duration)