Coverage for src / pipeline / agent_session_runner.py: 87%

433 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""AgentSessionRunner: Agent session execution pipeline stage. 

2 

3Extracted from MalaOrchestrator to separate SDK-specific session handling from 

4orchestration logic. This module handles: 

5- SDK streaming and message processing 

6- Session lifecycle transitions via ImplementerLifecycle 

7- Session metadata tracking (session ID, log paths) 

8 

9The AgentSessionRunner receives explicit inputs and returns explicit outputs, 

10making it testable without SDK dependencies when using the SDKClientProtocol. 

11 

12Design principles: 

13- Protocol-based SDK client for testability 

14- Explicit input/output types for clarity 

15- Lifecycle state machine drives policy decisions 

16- Callbacks for external operations (gate checks, reviews) 

17""" 

18 

19from __future__ import annotations 

20 

21import asyncio 

22import logging 

23import time 

24import uuid 

25from collections.abc import Callable, Coroutine 

26from dataclasses import dataclass, field 

27from pathlib import Path 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 cast, 

32) 

33 

34from src.infra.agent_runtime import AgentRuntimeBuilder 

35from src.domain.lifecycle import ( 

36 Effect, 

37 ImplementerLifecycle, 

38 LifecycleConfig, 

39 LifecycleContext, 

40 LifecycleState, 

41) 

42from src.domain.prompts import ( 

43 get_default_validation_commands as _get_default_validation_commands, 

44) 

45from src.pipeline.context_pressure_handler import ( 

46 ContextPressureConfig, 

47 ContextPressureHandler, 

48) 

49from src.infra.clients.cerberus_review import format_review_issues 

50from src.infra.clients.review_output_parser import ReviewResult 

51from src.pipeline.message_stream_processor import ( 

52 ContextPressureError, 

53 IdleTimeoutError, 

54 IdleTimeoutStream, 

55 MessageIterationResult, 

56 MessageIterationState, 

57 MessageStreamProcessor, 

58 StreamProcessorCallbacks, 

59 StreamProcessorConfig, 

60) 

61 

62if TYPE_CHECKING: 

63 from collections.abc import AsyncIterator 

64 

65 from src.core.models import IssueResolution 

66 from src.core.protocols import ( 

67 MalaEventSink, 

68 ReviewIssueProtocol, 

69 SDKClientFactoryProtocol, 

70 SDKClientProtocol, 

71 ) 

72 from src.domain.deadlock import DeadlockMonitor 

73 from src.domain.lifecycle import ( 

74 GateOutcome, 

75 RetryState, 

76 ReviewIssue, 

77 ReviewOutcome, 

78 TransitionResult, 

79 ) 

80 from src.domain.validation.config import PromptValidationCommands 

81 from src.infra.hooks import LintCache 

82 from src.infra.telemetry import TelemetrySpan 

83 from src.pipeline.message_stream_processor import ( 

84 AgentTextCallback, 

85 ToolUseCallback, 

86 ) 

87 

88 

89# Module-level logger for idle retry messages 

90logger = logging.getLogger(__name__) 

91 

92# Timeout for disconnect() call 

93DISCONNECT_TIMEOUT = 10.0 

94 

95 

96# Type aliases for callbacks 

97GateCheckCallback = Callable[ 

98 [str, Path, "RetryState"], 

99 Coroutine[Any, Any, tuple["GateOutcome", int]], 

100] 

101ReviewCheckCallback = Callable[ 

102 [str, str | None, str | None, str | None, "RetryState"], 

103 Coroutine[Any, Any, "ReviewOutcome"], 

104] 

105ReviewNoProgressCallback = Callable[ 

106 [Path, int, str | None, str | None], 

107 bool, 

108] 

109LogOffsetCallback = Callable[[Path, int], int] 

110 

111 

112@dataclass 

113class SessionConfig: 

114 """Derived configuration for session execution. 

115 

116 Computed from AgentSessionConfig during initialization. 

117 

118 Attributes: 

119 agent_id: Unique agent ID for this session. 

120 options: SDK client options. 

121 lint_cache: Lint command result cache. 

122 log_file_wait_timeout: Timeout for log file availability. 

123 log_file_poll_interval: Poll interval for log file. 

124 idle_timeout_seconds: Idle timeout for SDK stream. 

125 """ 

126 

127 agent_id: str 

128 options: object 

129 lint_cache: LintCache 

130 log_file_wait_timeout: float 

131 log_file_poll_interval: float 

132 idle_timeout_seconds: float | None 

133 

134 

135@dataclass 

136class SessionExecutionState: 

137 """Mutable state for session execution. 

138 

139 Bundles all session state that evolves during execution, including 

140 lifecycle context, session identifiers, and log paths. 

141 

142 Attributes: 

143 lifecycle: Lifecycle state machine. 

144 lifecycle_ctx: Lifecycle context with retry state. 

145 session_id: SDK session ID (updated when ResultMessage received). 

146 log_path: Path to session log file. 

147 final_result: Final result text from session. 

148 cerberus_review_log_path: Path to Cerberus review log (if any). 

149 msg_state: Message iteration state. 

150 """ 

151 

152 lifecycle: ImplementerLifecycle 

153 lifecycle_ctx: LifecycleContext 

154 session_id: str | None = None 

155 log_path: Path | None = None 

156 final_result: str = "" 

157 cerberus_review_log_path: str | None = None 

158 msg_state: MessageIterationState = field(default_factory=MessageIterationState) 

159 

160 

161@dataclass 

162class SessionPrompts: 

163 """Prompt templates for agent session execution. 

164 

165 Holds prompt templates loaded from files. This keeps file I/O at the 

166 orchestration boundary and allows tests to inject custom prompts. 

167 

168 Attributes: 

169 gate_followup: Template for gate failure follow-up prompts. 

170 review_followup: Template for review issues follow-up prompts. 

171 idle_resume: Template for idle timeout resume prompts. 

172 checkpoint_request: Prompt to request checkpoint from agent. 

173 continuation: Template for continuation prompt with checkpoint. 

174 """ 

175 

176 gate_followup: str 

177 review_followup: str 

178 idle_resume: str 

179 checkpoint_request: str = "" 

180 continuation: str = "" 

181 

182 

183@dataclass 

184class AgentSessionConfig: 

185 """Configuration for agent session execution. 

186 

187 Bundles all configuration needed to run an agent session. 

188 

189 Attributes: 

190 repo_path: Path to the repository. 

191 timeout_seconds: Session timeout in seconds. 

192 prompts: Provider for session prompts (gate, review, idle resume). 

193 max_gate_retries: Maximum gate retry attempts. 

194 max_review_retries: Maximum review retry attempts. 

195 review_enabled: Whether Cerberus external review is enabled. 

196 When enabled, code changes are reviewed after the gate passes. 

197 log_file_wait_timeout: Seconds to wait for log file after session 

198 completes. Default 60s allows time for SDK to flush logs under load. 

199 idle_timeout_seconds: Seconds to wait for SDK message when not waiting 

200 for tool execution. If None, defaults to min(900, max(300, timeout_seconds * 0.2)) 

201 which scales with session timeout (300-900s range). During tool execution 

202 (after ToolUseBlock, before result), timeout is disabled. Set to 0 to 

203 disable timeout entirely. 

204 lint_tools: Set of lint tool names for LintCache. If None, uses default 

205 lint tools. Populated from ValidationSpec commands. 

206 prompt_validation_commands: Validation commands for prompt templates. 

207 If None, uses default Python/uv commands. 

208 context_restart_threshold: Ratio (0.0-1.0) at which to raise 

209 ContextPressureError. Default 0.90 (90% of context_limit). 

210 context_limit: Maximum context tokens. Default 200K for Claude. 

211 """ 

212 

213 repo_path: Path 

214 timeout_seconds: int 

215 prompts: SessionPrompts 

216 max_gate_retries: int = 3 

217 max_review_retries: int = 3 

218 review_enabled: bool = True 

219 log_file_wait_timeout: float = 60.0 

220 idle_timeout_seconds: float | None = None 

221 max_idle_retries: int = 2 

222 idle_retry_backoff: tuple[float, ...] = (0.0, 5.0, 15.0) 

223 lint_tools: frozenset[str] | None = None 

224 prompt_validation_commands: PromptValidationCommands | None = None 

225 context_restart_threshold: float = 0.90 

226 context_limit: int = 200_000 

227 deadlock_monitor: DeadlockMonitor | None = None 

228 

229 

230@dataclass 

231class AgentSessionInput: 

232 """Input for running an agent session. 

233 

234 Bundles all data needed to start a session for an issue. 

235 

236 Attributes: 

237 issue_id: The issue ID being worked on. 

238 prompt: The initial prompt to send to the agent. 

239 baseline_commit: Optional baseline commit for diff comparison. 

240 issue_description: Issue description for scope verification. 

241 agent_id: Optional pre-generated agent ID for lock management. 

242 """ 

243 

244 issue_id: str 

245 prompt: str 

246 baseline_commit: str | None = None 

247 issue_description: str | None = None 

248 agent_id: str | None = None 

249 

250 

251@dataclass 

252class AgentSessionOutput: 

253 """Output from an agent session. 

254 

255 Contains all results and metadata from a completed session. 

256 

257 Attributes: 

258 success: Whether the session completed successfully. 

259 summary: Human-readable summary of the outcome. 

260 session_id: Claude SDK session ID (if available). 

261 log_path: Path to the session log file (if available). 

262 gate_attempts: Number of gate retry attempts. 

263 review_attempts: Number of review retry attempts. 

264 resolution: Issue resolution outcome (if any). 

265 duration_seconds: Total session duration. 

266 agent_id: The agent ID used for this session. 

267 review_log_path: Path to Cerberus review session log (if any). 

268 low_priority_review_issues: P2/P3 review issues to track as beads issues. 

269 """ 

270 

271 success: bool 

272 summary: str 

273 session_id: str | None = None 

274 log_path: Path | None = None 

275 gate_attempts: int = 1 

276 review_attempts: int = 0 

277 resolution: IssueResolution | None = None 

278 duration_seconds: float = 0.0 

279 agent_id: str = "" 

280 review_log_path: str | None = None 

281 low_priority_review_issues: list[ReviewIssueProtocol] | None = None 

282 

283 

284@dataclass 

285class SessionCallbacks: 

286 """Callbacks for external actions during session execution. 

287 

288 These callbacks allow the orchestrator to inject behavior for 

289 actions that require external state (gate checks, review, etc.) 

290 without coupling AgentSessionRunner to those implementations. 

291 

292 Attributes: 

293 on_gate_check: Async callback to run quality gate check. 

294 Args: (issue_id, log_path, retry_state) -> (GateResult, new_offset) 

295 on_review_check: Async callback to run external review (Cerberus). 

296 Args: (issue_id, issue_description, baseline_commit, session_id, retry_state) -> ReviewOutcome 

297 on_review_no_progress: Sync callback to check if no progress on review retry. 

298 Args: (log_path, log_offset, prev_commit, curr_commit) -> bool 

299 get_log_path: Callback to get log path from session ID. 

300 Args: (session_id) -> Path 

301 get_log_offset: Callback to get log end offset. 

302 Args: (log_path, start_offset) -> int 

303 on_abort: Callback when fatal error requires run abort. 

304 Args: (reason) -> None 

305 on_tool_use: Callback for SDK tool use events. 

306 Args: (agent_id, tool_name, arguments) -> None 

307 on_agent_text: Callback for SDK text output events. 

308 Args: (agent_id, text) -> None 

309 """ 

310 

311 on_gate_check: GateCheckCallback | None = None 

312 on_review_check: ReviewCheckCallback | None = None 

313 on_review_no_progress: ReviewNoProgressCallback | None = None 

314 get_log_path: Callable[[str], Path] | None = None 

315 get_log_offset: LogOffsetCallback | None = None 

316 on_abort: Callable[[str], None] | None = None 

317 on_tool_use: ToolUseCallback | None = None 

318 on_agent_text: AgentTextCallback | None = None 

319 

320 

321@dataclass 

322class ReviewEffectResult: 

323 """Result from _handle_review_effect method. 

324 

325 Encapsulates the multi-value return from review effect handling 

326 with named fields for clarity. 

327 """ 

328 

329 pending_query: str | None 

330 """Query to send for review retry, or None if no retry needed.""" 

331 

332 should_break: bool 

333 """Whether the caller should break out of the message iteration loop.""" 

334 

335 cerberus_log_path: str | None 

336 """Path to Cerberus review log file, if captured.""" 

337 

338 transition_result: TransitionResult 

339 """Lifecycle transition result.""" 

340 

341 

342def _emit_review_result_events( 

343 event_sink: MalaEventSink | None, 

344 input: AgentSessionInput, 

345 result: TransitionResult, 

346 review_result: ReviewOutcome, 

347 lifecycle_ctx: LifecycleContext, 

348 max_review_retries: int, 

349 blocking_count: int, 

350) -> None: 

351 """Emit events based on review result transition. 

352 

353 Handles event emission for: 

354 - COMPLETE_SUCCESS: on_review_passed 

355 - RUN_REVIEW (parse error): on_warning 

356 - SEND_REVIEW_RETRY: on_review_retry with blocking_count 

357 """ 

358 if event_sink is None: 

359 return 

360 

361 if result.effect == Effect.COMPLETE_SUCCESS: 

362 event_sink.on_review_passed( 

363 input.issue_id, 

364 issue_id=input.issue_id, 

365 ) 

366 return 

367 

368 if result.effect == Effect.RUN_REVIEW: 

369 error_detail = review_result.parse_error or "unknown error" 

370 event_sink.on_warning( 

371 f"Review tool error: {error_detail}; retrying", 

372 agent_id=input.issue_id, 

373 ) 

374 return 

375 

376 if result.effect == Effect.SEND_REVIEW_RETRY: 

377 logger.debug( 

378 "Session %s: SEND_REVIEW_RETRY triggered " 

379 "(attempt %d/%d, %d blocking issues)", 

380 input.issue_id, 

381 lifecycle_ctx.retry_state.review_attempt, 

382 max_review_retries, 

383 blocking_count, 

384 ) 

385 event_sink.on_review_retry( 

386 input.issue_id, 

387 lifecycle_ctx.retry_state.review_attempt, 

388 max_review_retries, 

389 error_count=blocking_count or None, 

390 parse_error=review_result.parse_error, 

391 issue_id=input.issue_id, 

392 ) 

393 

394 

395def _emit_gate_passed_events( 

396 event_sink: MalaEventSink | None, 

397 issue_id: str, 

398 review_attempt: int, 

399) -> None: 

400 """Emit gate passed events when first entering review. 

401 

402 Only emits events on the first review attempt (review_attempt == 1), 

403 indicating the gate has just passed. 

404 

405 Args: 

406 event_sink: Event sink to emit to, or None to skip emission. 

407 issue_id: The issue identifier. 

408 review_attempt: Current review attempt number (1-based). 

409 """ 

410 if review_attempt != 1 or event_sink is None: 

411 return 

412 

413 event_sink.on_gate_passed( 

414 issue_id, 

415 issue_id=issue_id, 

416 ) 

417 event_sink.on_validation_result( 

418 issue_id, 

419 passed=True, 

420 issue_id=issue_id, 

421 ) 

422 

423 

424def _count_blocking_issues(issues: list[ReviewIssue] | None) -> int: 

425 """Count issues with priority <= 1 (P0 or P1). 

426 

427 Args: 

428 issues: List of review issues, or None. 

429 

430 Returns: 

431 Number of blocking (high-priority) issues. 

432 """ 

433 if not issues: 

434 return 0 

435 return sum(1 for i in issues if i.priority is not None and i.priority <= 1) 

436 

437 

438def _make_review_effect_result( 

439 effect: Effect, 

440 cerberus_log_path: str | None, 

441 transition_result: TransitionResult, 

442 pending_query: str | None = None, 

443) -> ReviewEffectResult: 

444 """Build ReviewEffectResult based on effect type. 

445 

446 Args: 

447 effect: The lifecycle effect to handle. 

448 cerberus_log_path: Path to Cerberus review log. 

449 transition_result: Lifecycle transition result. 

450 pending_query: Query string for SEND_REVIEW_RETRY effect. 

451 

452 Returns: 

453 ReviewEffectResult with appropriate should_break flag. 

454 """ 

455 should_break = effect in (Effect.COMPLETE_SUCCESS, Effect.COMPLETE_FAILURE) 

456 return ReviewEffectResult( 

457 pending_query=pending_query, 

458 should_break=should_break, 

459 cerberus_log_path=cerberus_log_path, 

460 transition_result=transition_result, 

461 ) 

462 

463 

464def _build_review_retry_prompt( 

465 review_result: ReviewOutcome, 

466 lifecycle_ctx: LifecycleContext, 

467 issue_id: str, 

468 repo_path: Path, 

469 max_review_retries: int, 

470 review_followup_template: str, 

471) -> str: 

472 """Build the follow-up prompt for review retry. 

473 

474 Args: 

475 review_result: The review outcome with issues to address. 

476 lifecycle_ctx: Lifecycle context with retry state. 

477 issue_id: The issue identifier. 

478 repo_path: Repository path for formatting issue paths. 

479 max_review_retries: Maximum number of review retries. 

480 review_followup_template: Template for review follow-up prompts. 

481 

482 Returns: 

483 Formatted prompt string for the agent to address review issues. 

484 """ 

485 review_issues_text = format_review_issues( 

486 review_result.issues, # type: ignore[arg-type] 

487 base_path=repo_path, 

488 ) 

489 return review_followup_template.format( 

490 attempt=lifecycle_ctx.retry_state.review_attempt, 

491 max_attempts=max_review_retries, 

492 review_issues=review_issues_text, 

493 issue_id=issue_id, 

494 ) 

495 

496 

497@dataclass 

498class AgentSessionRunner: 

499 """Runs agent sessions with lifecycle management. 

500 

501 This class encapsulates the SDK session execution logic that was previously 

502 inline in MalaOrchestrator.run_implementer. It manages: 

503 - SDK client creation and message streaming 

504 - Lifecycle state transitions 

505 - Hook setup (lock enforcement, lint cache, etc.) 

506 - Message logging and telemetry 

507 

508 The runner uses callbacks for external operations (gate checks, reviews) 

509 to avoid tight coupling with the orchestrator's dependencies. 

510 

511 Usage: 

512 runner = AgentSessionRunner( 

513 config=AgentSessionConfig(repo_path=repo_path, ...), 

514 callbacks=SessionCallbacks(on_gate_check=..., ...), 

515 sdk_client_factory=SDKClientFactory(), 

516 ) 

517 output = await runner.run_session(input) 

518 

519 Attributes: 

520 config: Session configuration. 

521 callbacks: Callbacks for external operations. 

522 sdk_client_factory: Factory for creating SDK clients (required). 

523 event_sink: Optional event sink for structured logging. 

524 """ 

525 

526 config: AgentSessionConfig 

527 sdk_client_factory: SDKClientFactoryProtocol 

528 callbacks: SessionCallbacks = field(default_factory=SessionCallbacks) 

529 event_sink: MalaEventSink | None = None 

530 _context_pressure_handler: ContextPressureHandler = field(init=False, repr=False) 

531 

532 def __post_init__(self) -> None: 

533 """Initialize derived components.""" 

534 self._context_pressure_handler = ContextPressureHandler( 

535 config=ContextPressureConfig( 

536 checkpoint_request_prompt=self.config.prompts.checkpoint_request, 

537 continuation_template=self.config.prompts.continuation, 

538 ), 

539 sdk_client_factory=self.sdk_client_factory, 

540 ) 

541 

542 def _initialize_session( 

543 self, 

544 input: AgentSessionInput, 

545 agent_id: str | None = None, 

546 ) -> tuple[SessionConfig, SessionExecutionState]: 

547 """Initialize session components and state. 

548 

549 Creates lifecycle, hooks, SDK options, and mutable state for session. 

550 

551 Args: 

552 input: Session input with issue_id, prompt, etc. 

553 agent_id: Optional agent ID. If None, generates a new one. 

554 Pass an existing ID to preserve lock continuity across restarts. 

555 

556 Returns: 

557 Tuple of (SessionConfig, SessionExecutionState). 

558 """ 

559 # Generate agent ID if not provided 

560 if agent_id is None: 

561 agent_id = f"{input.issue_id}-{uuid.uuid4().hex[:8]}" 

562 

563 # Initialize lifecycle 

564 lifecycle_config = LifecycleConfig( 

565 max_gate_retries=self.config.max_gate_retries, 

566 max_review_retries=self.config.max_review_retries, 

567 review_enabled=self.config.review_enabled, 

568 ) 

569 lifecycle = ImplementerLifecycle(lifecycle_config) 

570 lifecycle_ctx = LifecycleContext() 

571 lifecycle_ctx.retry_state.baseline_timestamp = int(time.time()) 

572 

573 # Build session components using AgentRuntimeBuilder 

574 runtime = ( 

575 AgentRuntimeBuilder( 

576 self.config.repo_path, agent_id, self.sdk_client_factory 

577 ) 

578 .with_hooks(deadlock_monitor=self.config.deadlock_monitor) 

579 .with_env() 

580 .with_mcp() 

581 .with_disallowed_tools() 

582 .with_lint_tools(self.config.lint_tools) 

583 .build() 

584 ) 

585 

586 # Calculate idle timeout 

587 idle_timeout_seconds = self.config.idle_timeout_seconds 

588 if idle_timeout_seconds is None: 

589 derived = self.config.timeout_seconds * 0.2 

590 idle_timeout_seconds = min(900.0, max(300.0, derived)) 

591 if idle_timeout_seconds <= 0: 

592 idle_timeout_seconds = None 

593 

594 session_config = SessionConfig( 

595 agent_id=agent_id, 

596 options=runtime.options, 

597 lint_cache=runtime.lint_cache, 

598 log_file_wait_timeout=self.config.log_file_wait_timeout, 

599 log_file_poll_interval=0.5, 

600 idle_timeout_seconds=idle_timeout_seconds, 

601 ) 

602 

603 exec_state = SessionExecutionState( 

604 lifecycle=lifecycle, 

605 lifecycle_ctx=lifecycle_ctx, 

606 ) 

607 

608 return session_config, exec_state 

609 

610 async def _run_lifecycle_loop( 

611 self, 

612 input: AgentSessionInput, 

613 session_cfg: SessionConfig, 

614 state: SessionExecutionState, 

615 tracer: TelemetrySpan | None = None, 

616 ) -> None: 

617 """Run the main lifecycle loop. 

618 

619 Executes the message iteration, gate, and review loop until 

620 terminal state is reached. 

621 

622 Args: 

623 input: Session input with issue_id, prompt, etc. 

624 session_cfg: Derived session configuration. 

625 state: Mutable session execution state. 

626 tracer: Optional telemetry span context. 

627 """ 

628 lifecycle = state.lifecycle 

629 lifecycle_ctx = state.lifecycle_ctx 

630 

631 # Start lifecycle 

632 lifecycle.start() 

633 if self.event_sink is not None: 

634 self.event_sink.on_lifecycle_state(input.issue_id, lifecycle.state.name) 

635 

636 pending_query: str | None = input.prompt 

637 result: TransitionResult | None = None 

638 

639 while not lifecycle.is_terminal: 

640 # === QUERY + MESSAGE ITERATION === 

641 if pending_query is not None: 

642 iter_result = await self._run_message_iteration( 

643 query=pending_query, 

644 issue_id=input.issue_id, 

645 options=session_cfg.options, 

646 state=state.msg_state, 

647 lifecycle_ctx=lifecycle_ctx, 

648 lint_cache=session_cfg.lint_cache, 

649 idle_timeout_seconds=session_cfg.idle_timeout_seconds, 

650 tracer=tracer, 

651 ) 

652 if iter_result.session_id is not None: 

653 state.session_id = iter_result.session_id 

654 pending_query = None 

655 

656 result = lifecycle.on_messages_complete( 

657 lifecycle_ctx, has_session_id=bool(state.session_id) 

658 ) 

659 if self.event_sink is not None: 

660 self.event_sink.on_lifecycle_state( 

661 input.issue_id, lifecycle.state.name 

662 ) 

663 if result.effect == Effect.COMPLETE_FAILURE: 

664 state.final_result = lifecycle_ctx.final_result 

665 break 

666 else: 

667 assert result is not None, ( 

668 "Bug: entered loop without pending_query but result not set" 

669 ) 

670 

671 # Handle WAIT_FOR_LOG 

672 if result.effect == Effect.WAIT_FOR_LOG: 

673 state.log_path, result = await self._handle_log_waiting( 

674 state.session_id, 

675 input.issue_id, 

676 state.log_path, 

677 lifecycle, 

678 lifecycle_ctx, 

679 session_cfg.log_file_wait_timeout, 

680 session_cfg.log_file_poll_interval, 

681 ) 

682 if lifecycle.state == LifecycleState.FAILED: 

683 state.final_result = lifecycle_ctx.final_result 

684 break 

685 

686 # Handle RUN_GATE 

687 if result.effect == Effect.RUN_GATE: 

688 pending_query, gate_trans = await self._handle_gate_check( 

689 input, state, lifecycle, lifecycle_ctx 

690 ) 

691 if pending_query is not None: 

692 state.msg_state.pending_session_id = state.session_id 

693 state.msg_state.idle_retry_count = 0 

694 continue 

695 if lifecycle.is_terminal: 

696 state.final_result = lifecycle_ctx.final_result 

697 break 

698 # Update result with gate transition for next effect check 

699 result = gate_trans 

700 

701 # Handle RUN_REVIEW 

702 if result.effect == Effect.RUN_REVIEW: 

703 pending_query, review_result = await self._handle_review_check( 

704 input, state, lifecycle, lifecycle_ctx 

705 ) 

706 if review_result is not None: 

707 result = review_result 

708 if pending_query is not None: 

709 state.msg_state.pending_session_id = state.session_id 

710 state.msg_state.idle_retry_count = 0 

711 if lifecycle.is_terminal: 

712 state.final_result = lifecycle_ctx.final_result 

713 break 

714 continue 

715 

716 state.final_result = lifecycle_ctx.final_result 

717 

718 async def _handle_gate_check( 

719 self, 

720 input: AgentSessionInput, 

721 state: SessionExecutionState, 

722 lifecycle: ImplementerLifecycle, 

723 lifecycle_ctx: LifecycleContext, 

724 ) -> tuple[str | None, TransitionResult]: 

725 """Handle RUN_GATE effect - emit events and run gate check. 

726 

727 Args: 

728 input: Session input with issue_id. 

729 state: Session execution state. 

730 lifecycle: Lifecycle state machine. 

731 lifecycle_ctx: Lifecycle context. 

732 

733 Returns: 

734 Tuple of (pending query for retry or None, transition result). 

735 """ 

736 # Emit validation started BEFORE the gate check 

737 if self.event_sink is not None: 

738 self.event_sink.on_validation_started( 

739 input.issue_id, issue_id=input.issue_id 

740 ) 

741 self.event_sink.on_gate_started( 

742 input.issue_id, 

743 lifecycle_ctx.retry_state.gate_attempt, 

744 self.config.max_gate_retries, 

745 issue_id=input.issue_id, 

746 ) 

747 

748 if self.callbacks.on_gate_check is None: 

749 raise ValueError("on_gate_check callback must be set") 

750 

751 assert state.log_path is not None 

752 gate_result, new_offset = await self.callbacks.on_gate_check( 

753 input.issue_id, state.log_path, lifecycle_ctx.retry_state 

754 ) 

755 

756 retry_query, should_break, trans_result = self._handle_gate_effect( 

757 input, gate_result, lifecycle, lifecycle_ctx, new_offset 

758 ) 

759 if should_break: 

760 return None, trans_result 

761 if retry_query is not None: 

762 if state.session_id is None: 

763 raise IdleTimeoutError( 

764 "Cannot retry gate: session_id not received from SDK" 

765 ) 

766 logger.debug( 

767 "Session %s: queueing gate retry prompt (%d chars, session_id=%s)", 

768 input.issue_id, 

769 len(retry_query), 

770 state.session_id[:8], 

771 ) 

772 return retry_query, trans_result 

773 return None, trans_result 

774 

775 async def _handle_review_check( 

776 self, 

777 input: AgentSessionInput, 

778 state: SessionExecutionState, 

779 lifecycle: ImplementerLifecycle, 

780 lifecycle_ctx: LifecycleContext, 

781 ) -> tuple[str | None, TransitionResult | None]: 

782 """Handle RUN_REVIEW effect - run review and process result. 

783 

784 Args: 

785 input: Session input with issue_id, description, baseline. 

786 state: Session execution state. 

787 lifecycle: Lifecycle state machine. 

788 lifecycle_ctx: Lifecycle context. 

789 

790 Returns: 

791 Tuple of (pending_query for retry, transition_result). 

792 """ 

793 assert state.log_path is not None 

794 review_effect = await self._handle_review_effect( 

795 input, state.log_path, state.session_id, lifecycle, lifecycle_ctx 

796 ) 

797 if review_effect.cerberus_log_path is not None: 

798 state.cerberus_review_log_path = review_effect.cerberus_log_path 

799 if review_effect.should_break: 

800 return None, review_effect.transition_result 

801 if review_effect.pending_query is not None: 

802 if state.session_id is None: 

803 raise IdleTimeoutError( 

804 "Cannot retry review: session_id not received from SDK" 

805 ) 

806 logger.debug( 

807 "Session %s: queueing review retry prompt (%d chars, session_id=%s)", 

808 input.issue_id, 

809 len(review_effect.pending_query), 

810 state.session_id[:8], 

811 ) 

812 return review_effect.pending_query, review_effect.transition_result 

813 return None, review_effect.transition_result 

814 

815 def _build_session_output( 

816 self, 

817 session_cfg: SessionConfig, 

818 state: SessionExecutionState, 

819 duration: float, 

820 ) -> AgentSessionOutput: 

821 """Build session output from execution state. 

822 

823 Args: 

824 session_cfg: Session configuration with agent_id. 

825 state: Session execution state. 

826 duration: Total session duration in seconds. 

827 

828 Returns: 

829 AgentSessionOutput with all results and metadata. 

830 """ 

831 return AgentSessionOutput( 

832 success=state.lifecycle_ctx.success, 

833 summary=state.final_result, 

834 session_id=state.session_id, 

835 log_path=state.log_path, 

836 gate_attempts=state.lifecycle_ctx.retry_state.gate_attempt, 

837 review_attempts=state.lifecycle_ctx.retry_state.review_attempt, 

838 resolution=state.lifecycle_ctx.resolution, 

839 duration_seconds=duration, 

840 agent_id=session_cfg.agent_id, 

841 review_log_path=state.cerberus_review_log_path, 

842 low_priority_review_issues=cast( 

843 "list[ReviewIssueProtocol] | None", 

844 state.lifecycle_ctx.low_priority_review_issues or None, 

845 ), 

846 ) 

847 

848 async def _handle_log_waiting( 

849 self, 

850 session_id: str | None, 

851 issue_id: str, 

852 log_path: Path | None, 

853 lifecycle: ImplementerLifecycle, 

854 lifecycle_ctx: LifecycleContext, 

855 log_file_wait_timeout: float, 

856 log_file_poll_interval: float = 0.5, 

857 ) -> tuple[Path | None, TransitionResult]: 

858 """Handle WAIT_FOR_LOG effect - wait for log file to become available. 

859 

860 Args: 

861 session_id: Current SDK session ID. 

862 issue_id: Issue ID for logging. 

863 log_path: Current log path (may be None). 

864 lifecycle: Lifecycle state machine. 

865 lifecycle_ctx: Lifecycle context. 

866 log_file_wait_timeout: Max seconds to wait for log file. 

867 log_file_poll_interval: Seconds between poll attempts. 

868 

869 Returns: 

870 Tuple of (updated log_path, TransitionResult from log ready/timeout). 

871 """ 

872 if self.callbacks.get_log_path is None: 

873 raise ValueError("get_log_path callback must be set") 

874 if session_id is None: 

875 raise ValueError("session_id must be set before waiting for log") 

876 new_log_path = self.callbacks.get_log_path(session_id) 

877 

878 # Reset log_offset if log file changed (new session started) 

879 # This prevents using a stale offset from a previous session's 

880 # larger log file when parsing a new, smaller log file 

881 if log_path is not None and new_log_path != log_path: 

882 logger.info( 

883 "Session %s: log path changed from %s to %s, " 

884 "resetting log_offset from %d to 0", 

885 issue_id, 

886 log_path.name, 

887 new_log_path.name, 

888 lifecycle_ctx.retry_state.log_offset, 

889 ) 

890 lifecycle_ctx.retry_state.log_offset = 0 

891 

892 log_path = new_log_path 

893 if self.event_sink is not None: 

894 self.event_sink.on_log_waiting(issue_id) 

895 

896 # Wait for log file 

897 wait_elapsed = 0.0 

898 while not log_path.exists(): 

899 if wait_elapsed >= log_file_wait_timeout: 

900 result = lifecycle.on_log_timeout(lifecycle_ctx, str(log_path)) 

901 if self.event_sink is not None: 

902 self.event_sink.on_log_timeout(issue_id, str(log_path)) 

903 return log_path, result 

904 await asyncio.sleep(log_file_poll_interval) 

905 wait_elapsed += log_file_poll_interval 

906 

907 if log_path.exists(): 

908 if self.event_sink is not None: 

909 self.event_sink.on_log_ready(issue_id) 

910 result = lifecycle.on_log_ready(lifecycle_ctx) 

911 return log_path, result 

912 

913 def _handle_gate_effect( 

914 self, 

915 input: AgentSessionInput, 

916 gate_result: GateOutcome, 

917 lifecycle: ImplementerLifecycle, 

918 lifecycle_ctx: LifecycleContext, 

919 new_offset: int, 

920 ) -> tuple[str | None, bool, TransitionResult]: 

921 """Handle RUN_GATE effect - process gate result and emit events. 

922 

923 Args: 

924 input: Session input with issue_id. 

925 gate_result: Result from gate check callback. 

926 lifecycle: Lifecycle state machine. 

927 lifecycle_ctx: Lifecycle context. 

928 new_offset: New log offset after gate check. 

929 

930 Returns: 

931 Tuple of (pending_query for retry or None, should_break, transition_result). 

932 """ 

933 result = lifecycle.on_gate_result(lifecycle_ctx, gate_result, new_offset) 

934 

935 if result.effect == Effect.COMPLETE_SUCCESS: 

936 _emit_gate_passed_events(self.event_sink, input.issue_id, review_attempt=1) 

937 return None, True, result # break 

938 

939 if result.effect == Effect.COMPLETE_FAILURE: 

940 if self.event_sink is not None: 

941 self.event_sink.on_gate_failed( 

942 input.issue_id, 

943 lifecycle_ctx.retry_state.gate_attempt, 

944 self.config.max_gate_retries, 

945 issue_id=input.issue_id, 

946 ) 

947 self.event_sink.on_gate_result( 

948 input.issue_id, 

949 passed=False, 

950 failure_reasons=list(gate_result.failure_reasons), 

951 issue_id=input.issue_id, 

952 ) 

953 self.event_sink.on_validation_result( 

954 input.issue_id, 

955 passed=False, 

956 issue_id=input.issue_id, 

957 ) 

958 return None, True, result # break 

959 

960 if result.effect == Effect.SEND_GATE_RETRY: 

961 if self.event_sink is not None: 

962 self.event_sink.on_gate_retry( 

963 input.issue_id, 

964 lifecycle_ctx.retry_state.gate_attempt, 

965 self.config.max_gate_retries, 

966 issue_id=input.issue_id, 

967 ) 

968 self.event_sink.on_gate_result( 

969 input.issue_id, 

970 passed=False, 

971 failure_reasons=list(gate_result.failure_reasons), 

972 issue_id=input.issue_id, 

973 ) 

974 # Emit validation_result before retry so every 

975 # on_validation_started has a corresponding result 

976 self.event_sink.on_validation_result( 

977 input.issue_id, 

978 passed=False, 

979 issue_id=input.issue_id, 

980 ) 

981 # Build follow-up prompt 

982 failure_text = "\n".join(f"- {r}" for r in gate_result.failure_reasons) 

983 # Get validation commands or use defaults 

984 cmds = ( 

985 self.config.prompt_validation_commands 

986 or _get_default_validation_commands() 

987 ) 

988 pending_query = self.config.prompts.gate_followup.format( 

989 attempt=lifecycle_ctx.retry_state.gate_attempt, 

990 max_attempts=self.config.max_gate_retries, 

991 failure_reasons=failure_text, 

992 issue_id=input.issue_id, 

993 lint_command=cmds.lint, 

994 format_command=cmds.format, 

995 typecheck_command=cmds.typecheck, 

996 test_command=cmds.test, 

997 ) 

998 return pending_query, False, result # continue with retry 

999 

1000 # RUN_REVIEW or other effects - pass through 

1001 return None, False, result 

1002 

1003 def _check_review_no_progress( 

1004 self, 

1005 input: AgentSessionInput, 

1006 log_path: Path, 

1007 lifecycle: ImplementerLifecycle, 

1008 lifecycle_ctx: LifecycleContext, 

1009 cerberus_review_log_path: str | None, 

1010 ) -> ReviewEffectResult | None: 

1011 """Check if review retry has made no progress and should be skipped. 

1012 

1013 Args: 

1014 input: Session input with issue_id. 

1015 log_path: Path to log file. 

1016 lifecycle: Lifecycle state machine. 

1017 lifecycle_ctx: Lifecycle context. 

1018 cerberus_review_log_path: Path to Cerberus review log, if any. 

1019 

1020 Returns: 

1021 ReviewEffectResult if no progress detected (caller should return early), 

1022 None if review should proceed normally. 

1023 """ 

1024 # Only check on retry attempts (attempt > 1) when callback is configured 

1025 if ( 

1026 lifecycle_ctx.retry_state.review_attempt <= 1 

1027 or self.callbacks.on_review_no_progress is None 

1028 ): 

1029 return None 

1030 

1031 current_commit = ( 

1032 lifecycle_ctx.last_gate_result.commit_hash 

1033 if lifecycle_ctx.last_gate_result 

1034 else None 

1035 ) 

1036 no_progress = self.callbacks.on_review_no_progress( 

1037 log_path, 

1038 lifecycle_ctx.retry_state.log_offset, 

1039 lifecycle_ctx.retry_state.previous_commit_hash, 

1040 current_commit, 

1041 ) 

1042 

1043 if not no_progress: 

1044 return None 

1045 

1046 # Emit event for no-progress skip 

1047 if self.event_sink is not None: 

1048 self.event_sink.on_review_skipped_no_progress(input.issue_id) 

1049 

1050 # Create synthetic failed review 

1051 synthetic = ReviewResult(passed=False, issues=[], parse_error=None) 

1052 new_offset = ( 

1053 self.callbacks.get_log_offset( 

1054 log_path, 

1055 lifecycle_ctx.retry_state.log_offset, 

1056 ) 

1057 if self.callbacks.get_log_offset 

1058 else 0 

1059 ) 

1060 no_progress_result = lifecycle.on_review_result( 

1061 lifecycle_ctx, 

1062 synthetic, 

1063 new_offset, 

1064 no_progress=True, 

1065 ) 

1066 return ReviewEffectResult( 

1067 pending_query=None, 

1068 should_break=True, 

1069 cerberus_log_path=cerberus_review_log_path, 

1070 transition_result=no_progress_result, 

1071 ) 

1072 

1073 async def _handle_review_effect( 

1074 self, 

1075 input: AgentSessionInput, 

1076 log_path: Path, 

1077 session_id: str | None, 

1078 lifecycle: ImplementerLifecycle, 

1079 lifecycle_ctx: LifecycleContext, 

1080 ) -> ReviewEffectResult: 

1081 """Handle RUN_REVIEW effect - run review and process result. 

1082 

1083 Args: 

1084 input: Session input with issue_id, description, baseline. 

1085 log_path: Path to log file. 

1086 session_id: Current SDK session ID. 

1087 lifecycle: Lifecycle state machine. 

1088 lifecycle_ctx: Lifecycle context. 

1089 

1090 Returns: 

1091 ReviewEffectResult with pending_query, should_break, cerberus_log_path, 

1092 and transition_result. 

1093 """ 

1094 cerberus_review_log_path: str | None = None 

1095 

1096 # Emit gate passed events when first entering review 

1097 # (review_attempt == 1 means gate just passed) 

1098 _emit_gate_passed_events( 

1099 self.event_sink, input.issue_id, lifecycle_ctx.retry_state.review_attempt 

1100 ) 

1101 

1102 # Check no-progress before running review 

1103 if no_progress_result := self._check_review_no_progress( 

1104 input, log_path, lifecycle, lifecycle_ctx, cerberus_review_log_path 

1105 ): 

1106 return no_progress_result 

1107 

1108 if self.event_sink is not None: 

1109 self.event_sink.on_review_started( 

1110 input.issue_id, 

1111 lifecycle_ctx.retry_state.review_attempt, 

1112 self.config.max_review_retries, 

1113 issue_id=input.issue_id, 

1114 ) 

1115 

1116 if self.callbacks.on_review_check is None: 

1117 raise ValueError("on_review_check callback must be set") 

1118 

1119 logger.debug( 

1120 "Session %s: starting review (attempt %d/%d, session_id=%s)", 

1121 input.issue_id, 

1122 lifecycle_ctx.retry_state.review_attempt, 

1123 self.config.max_review_retries, 

1124 lifecycle_ctx.session_id[:8] if lifecycle_ctx.session_id else None, 

1125 ) 

1126 review_start = time.time() 

1127 review_result = await self.callbacks.on_review_check( 

1128 input.issue_id, 

1129 input.issue_description, 

1130 input.baseline_commit, 

1131 lifecycle_ctx.session_id, 

1132 lifecycle_ctx.retry_state, 

1133 ) 

1134 review_duration = time.time() - review_start 

1135 issue_count = len(review_result.issues) if review_result.issues else 0 

1136 blocking = _count_blocking_issues(review_result.issues) 

1137 logger.debug( 

1138 "Session %s: review completed in %.1fs " 

1139 "(passed=%s, issues=%d, blocking=%d, parse_error=%s)", 

1140 input.issue_id, 

1141 review_duration, 

1142 review_result.passed, 

1143 issue_count, 

1144 blocking, 

1145 review_result.parse_error, 

1146 ) 

1147 

1148 # Check for fatal error 

1149 if review_result.fatal_error: 

1150 if self.callbacks.on_abort is not None: 

1151 self.callbacks.on_abort( 

1152 review_result.parse_error or "Unrecoverable review error" 

1153 ) 

1154 

1155 # Capture Cerberus review log if available 

1156 log_attr = getattr(review_result, "review_log_path", None) 

1157 if log_attr is not None: 

1158 cerberus_review_log_path = str(log_attr) 

1159 

1160 # Get new log offset 

1161 new_offset = ( 

1162 self.callbacks.get_log_offset( 

1163 log_path, 

1164 lifecycle_ctx.retry_state.log_offset, 

1165 ) 

1166 if self.callbacks.get_log_offset 

1167 else 0 

1168 ) 

1169 

1170 result = lifecycle.on_review_result(lifecycle_ctx, review_result, new_offset) 

1171 

1172 # Emit appropriate events based on transition result 

1173 _emit_review_result_events( 

1174 self.event_sink, 

1175 input, 

1176 result, 

1177 review_result, 

1178 lifecycle_ctx, 

1179 self.config.max_review_retries, 

1180 blocking, 

1181 ) 

1182 

1183 # Build pending_query only for SEND_REVIEW_RETRY 

1184 pending_query = None 

1185 if result.effect == Effect.SEND_REVIEW_RETRY: 

1186 pending_query = _build_review_retry_prompt( 

1187 review_result, 

1188 lifecycle_ctx, 

1189 input.issue_id, 

1190 self.config.repo_path, 

1191 self.config.max_review_retries, 

1192 self.config.prompts.review_followup, 

1193 ) 

1194 

1195 return _make_review_effect_result( 

1196 result.effect, 

1197 cerberus_review_log_path, 

1198 result, 

1199 pending_query, 

1200 ) 

1201 

1202 async def _apply_retry_backoff(self, retry_count: int) -> None: 

1203 """Apply backoff delay before an idle retry attempt. 

1204 

1205 Args: 

1206 retry_count: Current retry count (1-based). 

1207 """ 

1208 if self.config.idle_retry_backoff: 

1209 backoff_idx = min( 

1210 retry_count - 1, 

1211 len(self.config.idle_retry_backoff) - 1, 

1212 ) 

1213 backoff = self.config.idle_retry_backoff[backoff_idx] 

1214 else: 

1215 backoff = 0.0 

1216 if backoff > 0: 

1217 logger.info(f"Idle retry {retry_count}: waiting {backoff}s") 

1218 await asyncio.sleep(backoff) 

1219 

1220 async def _disconnect_client_safely( 

1221 self, client: SDKClientProtocol, issue_id: str 

1222 ) -> None: 

1223 """Disconnect SDK client with timeout, logging any failures.""" 

1224 try: 

1225 await asyncio.wait_for( 

1226 client.disconnect(), 

1227 timeout=DISCONNECT_TIMEOUT, 

1228 ) 

1229 except TimeoutError: 

1230 logger.warning("disconnect() timed out, subprocess abandoned") 

1231 except Exception as e: 

1232 logger.debug(f"Error during disconnect: {e}") 

1233 

1234 def _prepare_idle_retry( 

1235 self, 

1236 state: MessageIterationState, 

1237 lifecycle_ctx: LifecycleContext, 

1238 issue_id: str, 

1239 ) -> str: 

1240 """Prepare state for idle retry and return the next query. 

1241 

1242 Updates state.idle_retry_count, state.pending_session_id, and clears 

1243 state.pending_tool_ids. 

1244 

1245 Raises: 

1246 IdleTimeoutError: If retry is not possible (max retries exceeded, 

1247 or tool calls occurred without session context). 

1248 

1249 Returns: 

1250 The query to use for the retry attempt. 

1251 """ 

1252 # Check if we can retry 

1253 if state.idle_retry_count >= self.config.max_idle_retries: 

1254 logger.error( 

1255 f"Session {issue_id}: max idle retries " 

1256 f"({self.config.max_idle_retries}) exceeded" 

1257 ) 

1258 raise IdleTimeoutError( 

1259 f"Max idle retries ({self.config.max_idle_retries}) exceeded" 

1260 ) 

1261 

1262 # Prepare for retry 

1263 state.idle_retry_count += 1 

1264 # Clear pending state from previous attempt to avoid 

1265 # hanging on stale tool IDs (they won't resolve on new stream) 

1266 state.pending_tool_ids.clear() 

1267 state.first_message_received = False 

1268 resume_id = state.session_id or lifecycle_ctx.session_id 

1269 

1270 if resume_id is not None: 

1271 state.pending_session_id = resume_id 

1272 pending_query = self.config.prompts.idle_resume.format(issue_id=issue_id) 

1273 logger.info( 

1274 f"Session {issue_id}: retrying with resume " 

1275 f"(session_id={resume_id[:8]}..., " 

1276 f"attempt {state.idle_retry_count})" 

1277 ) 

1278 # Reset tool calls after decision to preserve safety check 

1279 state.tool_calls_this_turn = 0 

1280 return pending_query 

1281 elif state.tool_calls_this_turn == 0: 

1282 state.pending_session_id = None 

1283 # Keep original query - caller must provide it 

1284 logger.info( 

1285 f"Session {issue_id}: retrying with fresh session " 

1286 f"(no session_id, no side effects, " 

1287 f"attempt {state.idle_retry_count})" 

1288 ) 

1289 # Return empty string to signal caller to keep original query 

1290 return "" 

1291 else: 

1292 logger.error( 

1293 f"Session {issue_id}: cannot retry - " 

1294 f"{state.tool_calls_this_turn} tool calls " 

1295 "occurred without session_id" 

1296 ) 

1297 raise IdleTimeoutError( 

1298 f"Cannot retry: {state.tool_calls_this_turn} tool calls " 

1299 "occurred without session context" 

1300 ) 

1301 

1302 def _get_stream_processor(self) -> MessageStreamProcessor: 

1303 """Create a MessageStreamProcessor with current config/callbacks.""" 

1304 config = StreamProcessorConfig( 

1305 context_limit=self.config.context_limit, 

1306 context_restart_threshold=self.config.context_restart_threshold, 

1307 ) 

1308 callbacks = StreamProcessorCallbacks( 

1309 on_tool_use=self.callbacks.on_tool_use, 

1310 on_agent_text=self.callbacks.on_agent_text, 

1311 ) 

1312 return MessageStreamProcessor(config, callbacks) 

1313 

1314 async def _process_message_stream( 

1315 self, 

1316 stream: AsyncIterator[Any], 

1317 issue_id: str, 

1318 state: MessageIterationState, 

1319 lifecycle_ctx: LifecycleContext, 

1320 lint_cache: LintCache, 

1321 query_start: float, 

1322 tracer: TelemetrySpan | None, 

1323 ) -> MessageIterationResult: 

1324 """Process SDK message stream and update state. 

1325 

1326 Delegates to MessageStreamProcessor for stream iteration logic. 

1327 

1328 Args: 

1329 stream: The message stream to process. 

1330 issue_id: Issue ID for logging. 

1331 state: Mutable state for the iteration. 

1332 lifecycle_ctx: Lifecycle context for session state. 

1333 lint_cache: Cache for lint command results. 

1334 query_start: Timestamp when query was sent. 

1335 tracer: Optional telemetry span context. 

1336 

1337 Returns: 

1338 MessageIterationResult with success status. 

1339 """ 

1340 processor = self._get_stream_processor() 

1341 return await processor.process_stream( 

1342 stream, issue_id, state, lifecycle_ctx, lint_cache, query_start, tracer 

1343 ) 

1344 

1345 async def _run_message_iteration( 

1346 self, 

1347 query: str, 

1348 issue_id: str, 

1349 options: object, 

1350 state: MessageIterationState, 

1351 lifecycle_ctx: LifecycleContext, 

1352 lint_cache: LintCache, 

1353 idle_timeout_seconds: float | None, 

1354 tracer: TelemetrySpan | None = None, 

1355 ) -> MessageIterationResult: 

1356 """Run a single message iteration with idle retry handling. 

1357 

1358 Sends a query to the SDK and processes the response stream. 

1359 Handles idle timeouts with automatic retry logic. 

1360 

1361 Args: 

1362 query: The query to send to the agent. 

1363 issue_id: Issue ID for logging. 

1364 options: SDK client options. 

1365 state: Mutable state for the iteration. 

1366 lifecycle_ctx: Lifecycle context for session state. 

1367 lint_cache: Cache for lint command results. 

1368 idle_timeout_seconds: Idle timeout (None to disable). 

1369 tracer: Optional telemetry span context. 

1370 

1371 Returns: 

1372 MessageIterationResult with success status and updated state. 

1373 

1374 Raises: 

1375 IdleTimeoutError: If max idle retries exceeded. 

1376 """ 

1377 pending_query: str | None = query 

1378 state.tool_calls_this_turn = 0 

1379 state.first_message_received = False 

1380 state.pending_tool_ids.clear() 

1381 

1382 while pending_query is not None: 

1383 # Backoff before retry (not on first attempt) 

1384 if state.idle_retry_count > 0: 

1385 await self._apply_retry_backoff(state.idle_retry_count) 

1386 

1387 # Create client for this attempt 

1388 client = self.sdk_client_factory.create(options) 

1389 

1390 try: 

1391 async with client: 

1392 # Send query 

1393 query_start = time.time() 

1394 if state.pending_session_id is not None: 

1395 logger.debug( 

1396 "Session %s: sending query with session_id=%s...", 

1397 issue_id, 

1398 state.pending_session_id[:8], 

1399 ) 

1400 await client.query( 

1401 pending_query, session_id=state.pending_session_id 

1402 ) 

1403 else: 

1404 logger.debug( 

1405 "Session %s: sending query (new session)", 

1406 issue_id, 

1407 ) 

1408 await client.query(pending_query) 

1409 

1410 # Wrap stream with idle timeout handling 

1411 stream = IdleTimeoutStream( 

1412 client.receive_response(), 

1413 idle_timeout_seconds, 

1414 state.pending_tool_ids, 

1415 ) 

1416 

1417 try: 

1418 return await self._process_message_stream( 

1419 stream, 

1420 issue_id, 

1421 state, 

1422 lifecycle_ctx, 

1423 lint_cache, 

1424 query_start, 

1425 tracer, 

1426 ) 

1427 

1428 except IdleTimeoutError: 

1429 # Disconnect on idle timeout 

1430 idle_duration = time.time() - query_start 

1431 logger.warning( 

1432 f"Session {issue_id}: idle timeout after " 

1433 f"{idle_duration:.1f}s, first_msg={state.first_message_received}, " 

1434 f"{state.tool_calls_this_turn} tool calls, disconnecting subprocess" 

1435 ) 

1436 await self._disconnect_client_safely(client, issue_id) 

1437 

1438 # Prepare state for retry (may raise IdleTimeoutError) 

1439 retry_query = self._prepare_idle_retry( 

1440 state, lifecycle_ctx, issue_id 

1441 ) 

1442 # Empty string means keep original query 

1443 if retry_query: 

1444 pending_query = retry_query 

1445 

1446 except IdleTimeoutError: 

1447 raise 

1448 

1449 # Should not reach here 

1450 return MessageIterationResult(success=False) 

1451 

1452 async def run_session( 

1453 self, 

1454 input: AgentSessionInput, 

1455 tracer: TelemetrySpan | None = None, 

1456 ) -> AgentSessionOutput: 

1457 """Run an agent session for the given input. 

1458 

1459 This method manages the full lifecycle of an agent session: 

1460 1. Creates SDK client with appropriate options 

1461 2. Sends initial prompt and streams responses 

1462 3. Handles lifecycle transitions (gate, review, retries) 

1463 4. Returns session output with results and metadata 

1464 5. On ContextPressureError: checkpoints, restarts with fresh lifecycle 

1465 

1466 Args: 

1467 input: AgentSessionInput with issue_id, prompt, etc. 

1468 tracer: Optional telemetry span context. 

1469 

1470 Returns: 

1471 AgentSessionOutput with success, summary, session_id, etc. 

1472 """ 

1473 start_time = asyncio.get_event_loop().time() 

1474 continuation_count = 0 

1475 current_prompt = input.prompt 

1476 # Use provided agent_id or generate one to preserve lock continuity across restarts 

1477 agent_id = input.agent_id or f"{input.issue_id}-{uuid.uuid4().hex[:8]}" 

1478 

1479 while True: 

1480 # Calculate remaining time to enforce overall session timeout 

1481 loop = asyncio.get_event_loop() 

1482 elapsed = loop.time() - start_time 

1483 remaining = self.config.timeout_seconds - elapsed 

1484 

1485 # Create fresh lifecycle for each iteration 

1486 session_input = AgentSessionInput( 

1487 issue_id=input.issue_id, 

1488 prompt=current_prompt, 

1489 baseline_commit=input.baseline_commit, 

1490 issue_description=input.issue_description, 

1491 ) 

1492 session_cfg, state = self._initialize_session(session_input, agent_id) 

1493 

1494 try: 

1495 # Check timeout inside try block so on_timeout cleanup runs 

1496 if remaining <= 0: 

1497 raise TimeoutError("Session timeout exceeded across restarts") 

1498 async with asyncio.timeout(remaining): 

1499 await self._run_lifecycle_loop( 

1500 session_input, session_cfg, state, tracer 

1501 ) 

1502 # Normal completion - exit loop 

1503 break 

1504 except ContextPressureError as e: 

1505 # Delegate to handler for checkpoint fetch and continuation prompt 

1506 checkpoint_remaining = self.config.timeout_seconds - ( 

1507 loop.time() - start_time 

1508 ) 

1509 ( 

1510 current_prompt, 

1511 continuation_count, 

1512 ) = await self._context_pressure_handler.handle_pressure_error( 

1513 error=e, 

1514 issue_id=input.issue_id, 

1515 options=session_cfg.options, 

1516 continuation_count=continuation_count, 

1517 remaining_time=checkpoint_remaining, 

1518 ) 

1519 # Loop continues with fresh lifecycle and continuation prompt 

1520 except IdleTimeoutError as e: 

1521 state.lifecycle.on_error(state.lifecycle_ctx, e) 

1522 state.final_result = state.lifecycle_ctx.final_result 

1523 break 

1524 except TimeoutError: 

1525 timeout_mins = self.config.timeout_seconds // 60 

1526 state.lifecycle.on_timeout(state.lifecycle_ctx, timeout_mins) 

1527 state.final_result = state.lifecycle_ctx.final_result 

1528 break 

1529 except Exception as e: 

1530 state.lifecycle.on_error(state.lifecycle_ctx, e) 

1531 state.final_result = state.lifecycle_ctx.final_result 

1532 break 

1533 

1534 duration = asyncio.get_event_loop().time() - start_time 

1535 return self._build_session_output(session_cfg, state, duration)