Coverage for src / orchestration / orchestrator.py: 22%

332 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""MalaOrchestrator: Orchestrates parallel issue processing using Claude Agent SDK.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import logging 

7import time 

8import uuid 

9from typing import TYPE_CHECKING 

10 

11from src.domain.validation.spec import ( 

12 ValidationScope, 

13 build_validation_spec, 

14 extract_lint_tools_from_spec, 

15) 

16from src.domain.prompts import ( 

17 build_prompt_validation_commands, 

18 format_implementer_prompt, 

19 load_prompts, 

20) 

21from src.infra.git_utils import ( 

22 get_baseline_for_issue, 

23 get_git_branch_async, 

24 get_git_commit_async, 

25) 

26from src.infra.clients.cerberus_review import DefaultReviewer 

27from src.infra.io.log_output.run_metadata import ( 

28 remove_run_marker, 

29 write_run_marker, 

30) 

31from src.infra.tools.env import ( 

32 EnvConfig, 

33 PROMPTS_DIR, 

34 SCRIPTS_DIR, 

35 get_lock_dir, 

36 get_runs_dir, 

37) 

38from src.domain.deadlock import DeadlockMonitor 

39from src.infra.tools.command_runner import CommandRunner 

40from src.infra.tools.locking import ( 

41 LockManager, 

42 cleanup_agent_locks, 

43 release_run_locks, 

44) 

45from src.infra.sdk_adapter import SDKClientFactory 

46from src.pipeline.agent_session_runner import ( 

47 AgentSessionInput, 

48 AgentSessionRunner, 

49) 

50from src.pipeline.issue_finalizer import ( 

51 IssueFinalizeInput, 

52) 

53from src.pipeline.run_coordinator import ( 

54 RunLevelValidationInput, 

55) 

56from src.orchestration.orchestration_wiring import ( 

57 WiringDependencies, 

58 FinalizerCallbackRefs, 

59 EpicCallbackRefs, 

60 build_gate_runner, 

61 build_review_runner, 

62 build_run_coordinator, 

63 build_issue_coordinator, 

64 build_finalizer_callbacks, 

65 build_epic_callbacks, 

66 build_session_callback_factory, 

67 build_session_config, 

68) 

69from src.pipeline.issue_result import IssueResult 

70from src.orchestration.review_tracking import create_review_tracking_issues 

71from src.orchestration.run_config import build_event_run_config, build_run_metadata 

72 

73if TYPE_CHECKING: 

74 from pathlib import Path 

75 

76 from src.core.protocols import ( 

77 CodeReviewer, 

78 GateChecker, 

79 IssueProvider, 

80 LogProvider, 

81 ) 

82 from src.domain.deadlock import DeadlockInfo 

83 from src.domain.prompts import PromptProvider 

84 from src.infra.epic_verifier import EpicVerifier 

85 from src.infra.io.config import MalaConfig 

86 from src.core.protocols import MalaEventSink 

87 from src.infra.io.log_output.run_metadata import RunMetadata 

88 from src.infra.telemetry import TelemetryProvider 

89 from src.pipeline.agent_session_runner import SessionCallbacks 

90 from src.pipeline.epic_verification_coordinator import EpicVerificationCoordinator 

91 from src.pipeline.issue_finalizer import IssueFinalizer 

92 

93 from .types import OrchestratorConfig, _DerivedConfig 

94 

95 

96# Version (from package metadata) 

97from importlib.metadata import version as pkg_version 

98 

99__version__ = pkg_version("mala") 

100 

101logger = logging.getLogger(__name__) 

102 

103# Bounded wait for log file (seconds) - used by AgentSessionRunner 

104# Re-exported here for backwards compatibility with tests. 

105# 60s allows time for Claude SDK to flush logs, especially under load. 

106LOG_FILE_WAIT_TIMEOUT = 60 

107LOG_FILE_POLL_INTERVAL = 0.5 

108 

109 

110class MalaOrchestrator: 

111 """Orchestrates parallel issue processing using Claude Agent SDK. 

112 

113 Use create_orchestrator() factory function to instantiate this class. 

114 The factory provides clean separation of concerns and easier testing. 

115 """ 

116 

117 # Type annotations for attributes set during initialization 

118 _max_issues: int | None 

119 

120 def __init__( 

121 self, 

122 *, 

123 _config: OrchestratorConfig, 

124 _mala_config: MalaConfig, 

125 _derived: _DerivedConfig, 

126 _issue_provider: IssueProvider, 

127 _code_reviewer: CodeReviewer, 

128 _gate_checker: GateChecker, 

129 _log_provider: LogProvider, 

130 _telemetry_provider: TelemetryProvider, 

131 _event_sink: MalaEventSink, 

132 _epic_verifier: EpicVerifier | None = None, 

133 ): 

134 self._init_from_factory( 

135 _config, 

136 _mala_config, 

137 _derived, 

138 _issue_provider, 

139 _code_reviewer, 

140 _gate_checker, 

141 _log_provider, 

142 _telemetry_provider, 

143 _event_sink, 

144 _epic_verifier, 

145 ) 

146 

147 def _init_from_factory( 

148 self, 

149 orch_config: OrchestratorConfig, 

150 mala_config: MalaConfig, 

151 derived: _DerivedConfig, 

152 issue_provider: IssueProvider, 

153 code_reviewer: CodeReviewer, 

154 gate_checker: GateChecker, 

155 log_provider: LogProvider, 

156 telemetry_provider: TelemetryProvider, 

157 event_sink: MalaEventSink, 

158 epic_verifier: EpicVerifier | None, 

159 ) -> None: 

160 """Initialize from factory-provided config and dependencies.""" 

161 self._mala_config = mala_config 

162 self.repo_path = orch_config.repo_path.resolve() 

163 self.max_agents = orch_config.max_agents 

164 self.timeout_seconds = derived.timeout_seconds 

165 self.max_issues = orch_config.max_issues 

166 self.epic_id = orch_config.epic_id 

167 self.only_ids = orch_config.only_ids 

168 self.braintrust_enabled = derived.braintrust_enabled 

169 self.max_gate_retries = orch_config.max_gate_retries 

170 self.max_review_retries = orch_config.max_review_retries 

171 self.disable_validations = orch_config.disable_validations 

172 self._disabled_validations = derived.disabled_validations 

173 self.coverage_threshold = orch_config.coverage_threshold 

174 self.prioritize_wip = orch_config.prioritize_wip 

175 self.focus = orch_config.focus 

176 self.orphans_only = orch_config.orphans_only 

177 self.cli_args = orch_config.cli_args 

178 self.epic_override_ids = orch_config.epic_override_ids or set() 

179 self.context_restart_threshold = orch_config.context_restart_threshold 

180 self.context_limit = orch_config.context_limit 

181 self.review_disabled_reason = derived.review_disabled_reason 

182 self.braintrust_disabled_reason = derived.braintrust_disabled_reason 

183 self._init_runtime_state() 

184 self.log_provider = log_provider 

185 self.quality_gate = gate_checker 

186 self.event_sink = event_sink 

187 self.beads = issue_provider 

188 self.epic_verifier = epic_verifier 

189 self.code_reviewer = code_reviewer 

190 self.telemetry_provider = telemetry_provider 

191 self._sdk_client_factory = SDKClientFactory() 

192 self._init_pipeline_runners() 

193 

194 def _init_runtime_state(self) -> None: 

195 """Initialize runtime state. 

196 

197 Note: active_tasks, failed_issues, abort_run, and abort_reason are 

198 delegated to issue_coordinator to maintain a single source of truth. 

199 See the corresponding properties. 

200 

201 Note: verified_epics and epics_being_verified are delegated to 

202 epic_verification_coordinator. 

203 

204 Note: per_issue_spec and last_gate_results are delegated to 

205 async_gate_runner. 

206 """ 

207 self.agent_ids: dict[str, str] = {} 

208 self.completed: list[IssueResult] = [] 

209 # Active session log paths for deadlock handling (cleared after session completes) 

210 self._active_session_log_paths: dict[str, Path] = {} 

211 # Track agents whose locks were cleaned during deadlock handling to avoid double cleanup 

212 self._deadlock_cleaned_agents: set[str] = set() 

213 self._lint_tools: frozenset[str] | None = None 

214 self._prompt_validation_commands = build_prompt_validation_commands( 

215 self.repo_path 

216 ) 

217 self._prompts: PromptProvider = load_prompts(PROMPTS_DIR) 

218 # Deadlock detection (T007: gate on config flag) 

219 if self._mala_config.deadlock_detection_enabled: 

220 self.deadlock_monitor: DeadlockMonitor | None = DeadlockMonitor() 

221 logger.info("Deadlock detection enabled") 

222 else: 

223 self.deadlock_monitor = None 

224 logger.info("Deadlock detection disabled by config") 

225 self._deadlock_resolution_lock = asyncio.Lock() 

226 

227 def _init_pipeline_runners(self) -> None: 

228 """Initialize pipeline runner components using wiring functions.""" 

229 deps = self._build_wiring_dependencies() 

230 

231 # Build core runners 

232 self.gate_runner, self.async_gate_runner = build_gate_runner(deps) 

233 self.review_runner = build_review_runner(deps) 

234 self.run_coordinator = build_run_coordinator(deps, self._sdk_client_factory) 

235 self.issue_coordinator = build_issue_coordinator(deps) 

236 

237 # Build coordinators with callbacks (callbacks need self references) 

238 self.issue_finalizer = self._build_issue_finalizer() 

239 self.epic_verification_coordinator = self._build_epic_verification_coordinator() 

240 

241 # Build session infrastructure 

242 self.session_callback_factory = build_session_callback_factory( 

243 deps, 

244 self.async_gate_runner, 

245 self.review_runner, 

246 lambda: self.log_provider, 

247 lambda: self.quality_gate, 

248 on_session_log_path=self._on_session_log_path, 

249 on_review_log_path=self._on_review_log_path, 

250 ) 

251 self._session_config = build_session_config( 

252 deps, review_enabled=self._is_review_enabled() 

253 ) 

254 

255 # Wire deadlock callback now that all dependencies are available 

256 if self.deadlock_monitor is not None: 

257 self.deadlock_monitor.on_deadlock = self._handle_deadlock 

258 

259 def _build_wiring_dependencies(self) -> WiringDependencies: 

260 """Build WiringDependencies from orchestrator state.""" 

261 return WiringDependencies( 

262 repo_path=self.repo_path, 

263 quality_gate=self.quality_gate, 

264 code_reviewer=self.code_reviewer, 

265 beads=self.beads, 

266 event_sink=self.event_sink, 

267 mala_config=self._mala_config, 

268 command_runner=CommandRunner(cwd=self.repo_path), 

269 env_config=EnvConfig(), 

270 lock_manager=LockManager(), 

271 max_agents=self.max_agents, 

272 max_issues=self._max_issues, 

273 timeout_seconds=self.timeout_seconds, 

274 max_gate_retries=self.max_gate_retries, 

275 max_review_retries=self.max_review_retries, 

276 coverage_threshold=self.coverage_threshold, 

277 disabled_validations=set(self._disabled_validations) 

278 if self._disabled_validations 

279 else None, 

280 epic_id=self.epic_id, 

281 only_ids=set(self.only_ids) if self.only_ids else None, 

282 prioritize_wip=self.prioritize_wip, 

283 focus=self.focus, 

284 orphans_only=self.orphans_only, 

285 epic_override_ids=self.epic_override_ids, 

286 prompt_validation_commands=self._prompt_validation_commands, 

287 prompts=self._prompts, 

288 context_restart_threshold=self.context_restart_threshold, 

289 context_limit=self.context_limit, 

290 deadlock_monitor=self.deadlock_monitor, 

291 ) 

292 

293 def _build_issue_finalizer(self) -> IssueFinalizer: 

294 """Build IssueFinalizer with callbacks.""" 

295 from src.pipeline.issue_finalizer import IssueFinalizer, IssueFinalizeConfig 

296 

297 config = IssueFinalizeConfig( 

298 track_review_issues=self._mala_config.track_review_issues, 

299 ) 

300 callbacks = build_finalizer_callbacks( 

301 FinalizerCallbackRefs( 

302 close_issue=lambda issue_id: self.beads.close_async(issue_id), 

303 mark_needs_followup=lambda issue_id, summary, log_path: ( 

304 self.beads.mark_needs_followup_async( 

305 issue_id, summary, log_path=log_path 

306 ) 

307 ), 

308 on_issue_closed=lambda agent_id, issue_id: ( 

309 self.event_sink.on_issue_closed(agent_id, issue_id) 

310 ), 

311 on_issue_completed=lambda agent_id, 

312 issue_id, 

313 success, 

314 duration, 

315 summary: ( 

316 self.event_sink.on_issue_completed( 

317 agent_id, issue_id, success, duration, summary 

318 ) 

319 ), 

320 trigger_epic_closure=lambda issue_id, run_metadata: ( 

321 self.epic_verification_coordinator.check_epic_closure( 

322 issue_id, run_metadata 

323 ) 

324 ), 

325 create_tracking_issues=self._create_review_tracking_issues, 

326 ) 

327 ) 

328 return IssueFinalizer( 

329 config=config, 

330 callbacks=callbacks, 

331 quality_gate=self.quality_gate, 

332 per_issue_spec=None, 

333 ) 

334 

335 def _build_epic_verification_coordinator(self) -> EpicVerificationCoordinator: 

336 """Build EpicVerificationCoordinator with callbacks.""" 

337 from src.pipeline.epic_verification_coordinator import ( 

338 EpicVerificationCoordinator, 

339 EpicVerificationConfig, 

340 ) 

341 

342 config = EpicVerificationConfig( 

343 max_retries=self._mala_config.max_epic_verification_retries, 

344 ) 

345 callbacks = build_epic_callbacks( 

346 EpicCallbackRefs( 

347 get_parent_epic=lambda issue_id: self.beads.get_parent_epic_async( 

348 issue_id 

349 ), 

350 verify_epic=lambda epic_id, human_override: ( 

351 self.epic_verifier.verify_and_close_epic( # type: ignore[union-attr] 

352 epic_id, human_override=human_override 

353 ) 

354 ), 

355 spawn_remediation=lambda issue_id: self.spawn_agent(issue_id), 

356 finalize_remediation=lambda issue_id, result, run_metadata: ( 

357 self._finalize_issue_result(issue_id, result, run_metadata) 

358 ), 

359 mark_completed=lambda issue_id: self.issue_coordinator.mark_completed( 

360 issue_id 

361 ), 

362 is_issue_failed=lambda issue_id: issue_id in self.failed_issues, 

363 close_eligible_epics=lambda: self.beads.close_eligible_epics_async(), 

364 on_epic_closed=lambda issue_id: self.event_sink.on_epic_closed( 

365 issue_id 

366 ), 

367 on_warning=lambda msg: self.event_sink.on_warning(msg), 

368 has_epic_verifier=lambda: self.epic_verifier is not None, 

369 get_agent_id=lambda issue_id: self.agent_ids.get(issue_id, "unknown"), 

370 ) 

371 ) 

372 return EpicVerificationCoordinator( 

373 config=config, 

374 callbacks=callbacks, 

375 epic_override_ids=self.epic_override_ids, 

376 ) 

377 

378 async def _create_review_tracking_issues( 

379 self, 

380 issue_id: str, 

381 review_issues: list, 

382 ) -> None: 

383 """Create tracking issues for P2/P3 review findings.""" 

384 # Get parent epic so review tracking issues stay in the same epic 

385 parent_epic_id = await self.beads.get_parent_epic_async(issue_id) 

386 await create_review_tracking_issues( 

387 self.beads, 

388 self.event_sink, 

389 issue_id, 

390 review_issues, 

391 parent_epic_id=parent_epic_id, 

392 ) 

393 

394 # Delegate state to issue_coordinator (single source of truth) 

395 # These properties return empty containers if accessed before coordinator init 

396 @property 

397 def active_tasks(self) -> dict[str, asyncio.Task[IssueResult]]: 

398 """Active agent tasks, delegated to issue_coordinator.""" 

399 if not hasattr(self, "issue_coordinator"): 

400 return {} 

401 return self.issue_coordinator.active_tasks # type: ignore[return-value] 

402 

403 @property 

404 def failed_issues(self) -> set[str]: 

405 """Failed issue IDs, delegated to issue_coordinator.""" 

406 if not hasattr(self, "issue_coordinator"): 

407 return set() 

408 return self.issue_coordinator.failed_issues 

409 

410 @property 

411 def max_issues(self) -> int | None: 

412 """Maximum issues to process, synced with issue_coordinator.""" 

413 if not hasattr(self, "issue_coordinator"): 

414 return self._max_issues if hasattr(self, "_max_issues") else None 

415 return self.issue_coordinator.config.max_issues 

416 

417 @max_issues.setter 

418 def max_issues(self, value: int | None) -> None: 

419 """Set max_issues and sync to issue_coordinator.""" 

420 self._max_issues = value 

421 if hasattr(self, "issue_coordinator"): 

422 self.issue_coordinator.config.max_issues = value 

423 

424 @property 

425 def abort_run(self) -> bool: 

426 """Whether run should abort, delegated to issue_coordinator.""" 

427 if not hasattr(self, "issue_coordinator"): 

428 return False 

429 return self.issue_coordinator.abort_run 

430 

431 @property 

432 def abort_reason(self) -> str | None: 

433 """Abort reason, delegated to issue_coordinator.""" 

434 if not hasattr(self, "issue_coordinator"): 

435 return None 

436 return self.issue_coordinator.abort_reason 

437 

438 def _cleanup_agent_locks(self, agent_id: str) -> None: 

439 """Remove locks held by a specific agent (crash/timeout cleanup).""" 

440 cleaned = cleanup_agent_locks(agent_id) 

441 if cleaned: 

442 logger.info("Agent locks cleaned: agent_id=%s count=%d", agent_id, cleaned) 

443 self.event_sink.on_locks_cleaned(agent_id, cleaned) 

444 # Unregister agent from deadlock monitor 

445 if self.deadlock_monitor is not None: 

446 self.deadlock_monitor.unregister_agent(agent_id) 

447 

448 async def _handle_deadlock(self, info: DeadlockInfo) -> None: 

449 """Handle a detected deadlock by cancelling victim and recording dependency. 

450 

451 Called by DeadlockMonitor when a cycle is detected. Uses an asyncio.Lock 

452 to prevent concurrent resolution of multiple deadlocks. 

453 

454 Args: 

455 info: DeadlockInfo with cycle, victim, and blocker details. 

456 """ 

457 logger.debug("Acquiring deadlock resolution lock for victim %s", info.victim_id) 

458 async with self._deadlock_resolution_lock: 

459 logger.info( 

460 "Deadlock resolution started: victim_id=%s issue_id=%s blocked_on=%s", 

461 info.victim_id, 

462 info.victim_issue_id, 

463 info.blocked_on, 

464 ) 

465 self.event_sink.on_deadlock_detected(info) 

466 

467 victim_issue_id = info.victim_issue_id 

468 task_to_cancel: asyncio.Task[object] | None = None 

469 is_self_cancel = False 

470 

471 # Identify victim task for cancellation 

472 if victim_issue_id and victim_issue_id in self.active_tasks: 

473 task = self.active_tasks[victim_issue_id] 

474 if not task.done(): 

475 task_to_cancel = task 

476 is_self_cancel = task is asyncio.current_task() 

477 

478 # Clean up victim's locks first (before any await that could raise) 

479 # Track that we cleaned this agent to avoid double cleanup in run_implementer 

480 self._cleanup_agent_locks(info.victim_id) 

481 self._deadlock_cleaned_agents.add(info.victim_id) 

482 

483 # Use shield to protect resolution from cancellation 

484 # Track whether cancellation occurred during shielded section 

485 cancelled_during_shield = False 

486 try: 

487 await asyncio.shield(self._resolve_deadlock(info)) 

488 except asyncio.CancelledError: 

489 cancelled_during_shield = True 

490 # In self-cancel case, we'll schedule deferred cancellation below, 

491 # so don't re-raise yet. For external cancellation, re-raise. 

492 if not is_self_cancel: 

493 raise 

494 

495 # Cancel victim task after resolution is complete 

496 if task_to_cancel is not None: 

497 if is_self_cancel: 

498 # Defer self-cancellation to avoid interrupting this handler 

499 loop = asyncio.get_running_loop() 

500 loop.call_soon(task_to_cancel.cancel) 

501 logger.info("Victim killed: agent_id=%s", info.victim_id) 

502 else: 

503 task_to_cancel.cancel() 

504 logger.info("Victim killed: agent_id=%s", info.victim_id) 

505 

506 # If we caught CancelledError in self-cancel case but it arrived before 

507 # we scheduled our deferred cancellation, it was from an external source. 

508 # Re-raise after scheduling our own cancellation to not mask it. 

509 if cancelled_during_shield and is_self_cancel: 

510 raise asyncio.CancelledError() 

511 

512 async def _resolve_deadlock(self, info: DeadlockInfo) -> None: 

513 """Perform dependency and needs-followup updates for deadlock resolution. 

514 

515 Separated from _handle_deadlock to allow shielding from cancellation. 

516 

517 Args: 

518 info: DeadlockInfo with cycle, victim, and blocker details. 

519 """ 

520 victim_issue_id = info.victim_issue_id 

521 

522 # Add dependency: victim issue depends on blocker issue 

523 if victim_issue_id and info.blocker_issue_id: 

524 success = await self.beads.add_dependency_async( 

525 victim_issue_id, info.blocker_issue_id 

526 ) 

527 if success: 

528 logger.info( 

529 "Added dependency: %s depends on %s", 

530 victim_issue_id, 

531 info.blocker_issue_id, 

532 ) 

533 else: 

534 logger.warning( 

535 "Failed to add dependency: %s depends on %s", 

536 victim_issue_id, 

537 info.blocker_issue_id, 

538 ) 

539 

540 # Mark victim issue as needs-followup 

541 if victim_issue_id: 

542 reason = ( 

543 f"Deadlock victim: blocked on {info.blocked_on} " 

544 f"held by {info.blocker_id}" 

545 ) 

546 log_path = self._active_session_log_paths.get(victim_issue_id) 

547 await self.beads.mark_needs_followup_async( 

548 victim_issue_id, reason, log_path=log_path 

549 ) 

550 logger.info("Marked issue %s as needs-followup", victim_issue_id) 

551 

552 def _request_abort(self, reason: str) -> None: 

553 """Signal that the current run should stop due to a fatal error.""" 

554 self.issue_coordinator.request_abort(reason) 

555 

556 def _is_review_enabled(self) -> bool: 

557 """Return whether review should run for this orchestrator instance.""" 

558 if "review" not in self._disabled_validations: 

559 return True 

560 if self.review_disabled_reason and not isinstance( 

561 self.review_runner.code_reviewer, DefaultReviewer 

562 ): 

563 return True 

564 return False 

565 

566 def _on_session_log_path(self, issue_id: str, log_path: Path) -> None: 

567 """Store session log path for an active session. 

568 

569 Called by session callback factory when log path becomes available. 

570 Used for deadlock handling during active sessions. 

571 

572 Args: 

573 issue_id: The issue ID. 

574 log_path: Path to the session log file. 

575 """ 

576 self._active_session_log_paths[issue_id] = log_path 

577 

578 def _on_review_log_path(self, issue_id: str, log_path: str) -> None: 

579 """Handle review log path notification (currently unused). 

580 

581 Review log paths are now returned via IssueResult.review_log_path. 

582 This callback exists for symmetry but can be a no-op. 

583 

584 Args: 

585 issue_id: The issue ID. 

586 log_path: Path to the review session log file. 

587 """ 

588 pass # Review log path flows through IssueResult 

589 

590 def _cleanup_active_session_path(self, issue_id: str) -> None: 

591 """Remove stored session log path for a completed issue. 

592 

593 Args: 

594 issue_id: The issue ID to clean up. 

595 """ 

596 self._active_session_log_paths.pop(issue_id, None) 

597 

598 async def _finalize_issue_result( 

599 self, 

600 issue_id: str, 

601 result: IssueResult, 

602 run_metadata: RunMetadata, 

603 ) -> None: 

604 """Record an issue result, update metadata, and emit logs. 

605 

606 Delegates to IssueFinalizer for the core finalization logic. 

607 Uses stored gate result to derive metadata, avoiding duplicate 

608 validation parsing. Gate result includes validation_evidence 

609 already parsed during quality gate check. 

610 """ 

611 stored_gate_result = self.async_gate_runner.get_last_gate_result(issue_id) 

612 

613 # Update finalizer's per_issue_spec if it has changed 

614 self.issue_finalizer.per_issue_spec = self.async_gate_runner.per_issue_spec 

615 

616 # Build finalization input - use log paths from IssueResult 

617 finalize_input = IssueFinalizeInput( 

618 issue_id=issue_id, 

619 result=result, 

620 run_metadata=run_metadata, 

621 log_path=result.session_log_path, 

622 stored_gate_result=stored_gate_result, 

623 review_log_path=result.review_log_path, 

624 ) 

625 

626 # Track failed issues before finalize to ensure they're recorded 

627 # even if finalize raises (e.g., mark_needs_followup callback fails) 

628 if not result.success: 

629 self.failed_issues.add(issue_id) 

630 

631 # Delegate to finalizer, ensuring cleanup happens even if finalize raises 

632 try: 

633 await self.issue_finalizer.finalize(finalize_input) 

634 finally: 

635 # Update tracking state (active_tasks is updated by mark_completed in finalize_callback) 

636 self.completed.append(result) 

637 

638 # Cleanup active session path and gate result 

639 self._cleanup_active_session_path(issue_id) 

640 self.async_gate_runner.clear_gate_result(issue_id) 

641 

642 # Remove agent_id from tracking now that finalization is complete 

643 # (deferred from run_implementer.finally to keep it available for get_agent_id callback) 

644 self.agent_ids.pop(issue_id, None) 

645 

646 async def _abort_active_tasks(self, run_metadata: RunMetadata) -> None: 

647 """Cancel active tasks and mark them as failed. 

648 

649 Tasks that have already completed are finalized with their real results 

650 rather than being marked as aborted. 

651 """ 

652 if not self.active_tasks: 

653 return 

654 reason = self.abort_reason or "Unrecoverable error" 

655 self.event_sink.on_tasks_aborting(len(self.active_tasks), reason) 

656 # Cancel tasks that are still running 

657 for task in self.active_tasks.values(): 

658 if not task.done(): 

659 task.cancel() 

660 

661 # Finalize each remaining issue - use real result if already done 

662 for issue_id, task in list(self.active_tasks.items()): 

663 if task.done(): 

664 # Task completed before we could cancel - use real result 

665 try: 

666 result = task.result() 

667 except asyncio.CancelledError: 

668 result = IssueResult( 

669 issue_id=issue_id, 

670 agent_id=self.agent_ids.get(issue_id, "unknown"), 

671 success=False, 

672 summary=f"Aborted due to unrecoverable error: {reason}", 

673 session_log_path=self._active_session_log_paths.get(issue_id), 

674 ) 

675 except Exception as e: 

676 result = IssueResult( 

677 issue_id=issue_id, 

678 agent_id=self.agent_ids.get(issue_id, "unknown"), 

679 success=False, 

680 summary=str(e), 

681 session_log_path=self._active_session_log_paths.get(issue_id), 

682 ) 

683 else: 

684 # Task was still running - mark as aborted 

685 result = IssueResult( 

686 issue_id=issue_id, 

687 agent_id=self.agent_ids.get(issue_id, "unknown"), 

688 success=False, 

689 summary=f"Aborted due to unrecoverable error: {reason}", 

690 session_log_path=self._active_session_log_paths.get(issue_id), 

691 ) 

692 await self._finalize_issue_result(issue_id, result, run_metadata) 

693 # Mark completed in coordinator to keep state consistent 

694 self.issue_coordinator.mark_completed(issue_id) 

695 

696 def _build_session_callbacks(self, issue_id: str) -> SessionCallbacks: 

697 """Build callbacks for session operations. 

698 

699 Delegates to SessionCallbackFactory for callback construction. 

700 

701 Args: 

702 issue_id: The issue ID for tracking state. 

703 

704 Returns: 

705 SessionCallbacks with gate, review, and logging callbacks. 

706 """ 

707 return self.session_callback_factory.build( 

708 issue_id=issue_id, 

709 on_abort=self._request_abort, 

710 ) 

711 

712 async def run_implementer(self, issue_id: str) -> IssueResult: 

713 """Run implementer agent for a single issue with gate retry support. 

714 

715 Delegates to AgentSessionRunner for SDK-specific session handling. 

716 """ 

717 temp_agent_id = f"{issue_id}-{uuid.uuid4().hex[:8]}" 

718 self.agent_ids[issue_id] = temp_agent_id 

719 

720 # Register with deadlock monitor 

721 if self.deadlock_monitor is not None: 

722 self.deadlock_monitor.register_agent( 

723 agent_id=temp_agent_id, 

724 issue_id=issue_id, 

725 start_time=time.time(), 

726 ) 

727 

728 # Prepare session inputs 

729 issue_description = await self.beads.get_issue_description_async(issue_id) 

730 baseline_commit = await get_baseline_for_issue(self.repo_path, issue_id) 

731 if baseline_commit is None: 

732 baseline_commit = await get_git_commit_async(self.repo_path) 

733 

734 prompt = format_implementer_prompt( 

735 self._prompts.implementer_prompt, 

736 issue_id=issue_id, 

737 repo_path=self.repo_path, 

738 agent_id=temp_agent_id, 

739 validation_commands=self._prompt_validation_commands, 

740 lock_dir=get_lock_dir(), 

741 scripts_dir=SCRIPTS_DIR, 

742 ) 

743 

744 session_input = AgentSessionInput( 

745 issue_id=issue_id, 

746 prompt=prompt, 

747 baseline_commit=baseline_commit, 

748 issue_description=issue_description, 

749 agent_id=temp_agent_id, 

750 ) 

751 

752 runner = AgentSessionRunner( 

753 config=self._session_config, 

754 sdk_client_factory=self._sdk_client_factory, 

755 callbacks=self._build_session_callbacks(issue_id), 

756 event_sink=self.event_sink, 

757 ) 

758 tracer = self.telemetry_provider.create_span( 

759 issue_id, 

760 metadata={ 

761 "agent_id": temp_agent_id, 

762 "mala_version": __version__, 

763 "project_dir": self.repo_path.name, 

764 "git_branch": await get_git_branch_async(self.repo_path), 

765 "git_commit": await get_git_commit_async(self.repo_path), 

766 "timeout_seconds": self.timeout_seconds, 

767 }, 

768 ) 

769 

770 try: 

771 with tracer: 

772 tracer.log_input(prompt) 

773 output = await runner.run_session(session_input, tracer=tracer) 

774 self.agent_ids[issue_id] = output.agent_id 

775 if output.success: 

776 tracer.set_success(True) 

777 else: 

778 tracer.set_error(output.summary) 

779 finally: 

780 # Get agent_id for lock cleanup but don't pop - entry needed for get_agent_id callback 

781 # until finalization completes (see _finalize_issue_result) 

782 agent_id = self.agent_ids.get(issue_id, temp_agent_id) 

783 # Skip cleanup if already done during deadlock handling (avoid double unregister) 

784 if agent_id not in self._deadlock_cleaned_agents: 

785 self._cleanup_agent_locks(agent_id) 

786 else: 

787 logger.debug( 

788 "Skipped cleanup for %s (handled during deadlock)", agent_id 

789 ) 

790 self._deadlock_cleaned_agents.discard(agent_id) 

791 

792 return IssueResult( 

793 issue_id=issue_id, 

794 agent_id=output.agent_id, 

795 success=output.success, 

796 summary=output.summary, 

797 duration_seconds=output.duration_seconds, 

798 session_id=output.session_id, 

799 gate_attempts=output.gate_attempts, 

800 review_attempts=output.review_attempts, 

801 resolution=output.resolution, 

802 low_priority_review_issues=output.low_priority_review_issues, 

803 session_log_path=output.log_path, 

804 review_log_path=output.review_log_path, 

805 ) 

806 

807 async def spawn_agent(self, issue_id: str) -> asyncio.Task | None: # type: ignore[type-arg] 

808 """Spawn a new agent task for an issue. Returns the Task if spawned, None otherwise.""" 

809 if not await self.beads.claim_async(issue_id): 

810 self.issue_coordinator.mark_failed(issue_id) 

811 self.event_sink.on_claim_failed(issue_id, issue_id) 

812 return None 

813 

814 task = asyncio.create_task(self.run_implementer(issue_id)) 

815 self.event_sink.on_agent_started(issue_id, issue_id) 

816 return task 

817 

818 async def _run_main_loop(self, run_metadata: RunMetadata) -> int: 

819 """Run the main agent spawning and completion loop. 

820 

821 Delegates to IssueExecutionCoordinator for the main loop logic. 

822 Returns the number of issues spawned. 

823 """ 

824 

825 async def finalize_callback( 

826 issue_id: str, 

827 task: asyncio.Task, # type: ignore[type-arg] 

828 ) -> None: 

829 """Finalize a completed task.""" 

830 try: 

831 result = task.result() 

832 except asyncio.CancelledError: 

833 result = IssueResult( 

834 issue_id=issue_id, 

835 agent_id=self.agent_ids.get(issue_id, "unknown"), 

836 success=False, 

837 summary=( 

838 f"Aborted due to unrecoverable error: {self.issue_coordinator.abort_reason}" 

839 if self.issue_coordinator.abort_reason 

840 else "Aborted due to unrecoverable error" 

841 ), 

842 session_log_path=self._active_session_log_paths.get(issue_id), 

843 ) 

844 except Exception as e: 

845 result = IssueResult( 

846 issue_id=issue_id, 

847 agent_id=self.agent_ids.get(issue_id, "unknown"), 

848 success=False, 

849 summary=str(e), 

850 session_log_path=self._active_session_log_paths.get(issue_id), 

851 ) 

852 await self._finalize_issue_result(issue_id, result, run_metadata) 

853 self.issue_coordinator.mark_completed(issue_id) 

854 

855 async def abort_callback() -> None: 

856 """Abort all active tasks.""" 

857 await self._abort_active_tasks(run_metadata) 

858 

859 return await self.issue_coordinator.run_loop( 

860 spawn_callback=self.spawn_agent, 

861 finalize_callback=finalize_callback, 

862 abort_callback=abort_callback, 

863 ) 

864 

865 async def _finalize_run( 

866 self, 

867 run_metadata: RunMetadata, 

868 run_validation_passed: bool, 

869 ) -> tuple[int, int]: 

870 """Log summary and return final results.""" 

871 success_count = sum(1 for r in self.completed if r.success) 

872 total = len(self.completed) 

873 

874 # Emit run completed event 

875 abort_reason = ( 

876 self.abort_reason or "Unrecoverable error" if self.abort_run else None 

877 ) 

878 self.event_sink.on_run_completed( 

879 success_count, total, run_validation_passed, abort_reason 

880 ) 

881 

882 if success_count > 0: 

883 if await self.beads.commit_issues_async(): 

884 self.event_sink.on_issues_committed() 

885 

886 if total > 0: 

887 metadata_path = run_metadata.save() 

888 self.event_sink.on_run_metadata_saved(str(metadata_path)) 

889 

890 print() 

891 if self.abort_run: 

892 return (0, total) 

893 if not run_validation_passed: 

894 return (0, total) 

895 return (success_count, total) 

896 

897 async def run(self) -> tuple[int, int]: 

898 """Main orchestration loop. Returns (success_count, total_count).""" 

899 run_config = build_event_run_config( 

900 repo_path=self.repo_path, 

901 max_agents=self.max_agents, 

902 timeout_seconds=self.timeout_seconds, 

903 max_issues=self.max_issues, 

904 max_gate_retries=self.max_gate_retries, 

905 max_review_retries=self.max_review_retries, 

906 epic_id=self.epic_id, 

907 only_ids=self.only_ids, 

908 braintrust_enabled=self.braintrust_enabled, 

909 review_enabled=self._is_review_enabled(), 

910 review_disabled_reason=self.review_disabled_reason, 

911 prioritize_wip=self.prioritize_wip, 

912 orphans_only=self.orphans_only, 

913 cli_args=self.cli_args, 

914 braintrust_disabled_reason=self.braintrust_disabled_reason, 

915 ) 

916 self.event_sink.on_run_started(run_config) 

917 

918 get_lock_dir().mkdir(parents=True, exist_ok=True) 

919 get_runs_dir().mkdir(parents=True, exist_ok=True) 

920 

921 run_metadata = build_run_metadata( 

922 repo_path=self.repo_path, 

923 max_agents=self.max_agents, 

924 timeout_seconds=self.timeout_seconds, 

925 max_issues=self.max_issues, 

926 epic_id=self.epic_id, 

927 only_ids=self.only_ids, 

928 braintrust_enabled=self.braintrust_enabled, 

929 max_gate_retries=self.max_gate_retries, 

930 max_review_retries=self.max_review_retries, 

931 review_enabled=self._is_review_enabled(), 

932 orphans_only=self.orphans_only, 

933 cli_args=self.cli_args, 

934 version=__version__, 

935 ) 

936 per_issue_spec = build_validation_spec( 

937 self.repo_path, 

938 scope=ValidationScope.PER_ISSUE, 

939 disable_validations=self._disabled_validations, 

940 ) 

941 self.async_gate_runner.per_issue_spec = per_issue_spec 

942 self._lint_tools = extract_lint_tools_from_spec(per_issue_spec) 

943 self._session_config.lint_tools = self._lint_tools 

944 write_run_marker( 

945 run_id=run_metadata.run_id, 

946 repo_path=self.repo_path, 

947 max_agents=self.max_agents, 

948 ) 

949 

950 try: 

951 try: 

952 await self._run_main_loop(run_metadata) 

953 finally: 

954 released = release_run_locks(list(self.agent_ids.values())) 

955 # Only emit event if released is a positive integer 

956 # (release_run_locks returns int, but tests may mock it) 

957 if isinstance(released, int) and released > 0: 

958 self.event_sink.on_locks_released(released) 

959 remove_run_marker(run_metadata.run_id) 

960 

961 # Run-level validation and finalization happen after lock cleanup 

962 # but before debug log cleanup (so they're captured in the debug log) 

963 success_count = sum(1 for r in self.completed if r.success) 

964 run_validation_passed = True 

965 if success_count > 0 and not self.abort_run: 

966 validation_input = RunLevelValidationInput(run_metadata=run_metadata) 

967 validation_output = await self.run_coordinator.run_validation( 

968 validation_input 

969 ) 

970 run_validation_passed = validation_output.passed 

971 

972 return await self._finalize_run(run_metadata, run_validation_passed) 

973 finally: 

974 # Clean up debug logging handler after all finalization is complete 

975 # (idempotent - safe to call even if save() is called later) 

976 run_metadata.cleanup() 

977 

978 def run_sync(self) -> tuple[int, int]: 

979 """Synchronous wrapper for run(). Returns (success_count, total_count). 

980 

981 Use this method when calling from synchronous code. It handles event loop 

982 creation and cleanup automatically. 

983 

984 Raises: 

985 RuntimeError: If called from within an async context (e.g., inside an 

986 async function or when an event loop is already running). In that 

987 case, use `await orchestrator.run()` instead. 

988 

989 Example usage:: 

990 

991 from src.orchestration.factory import create_orchestrator, OrchestratorConfig 

992 

993 # From sync code (scripts, CLI, tests) 

994 config = OrchestratorConfig(repo_path=Path(".")) 

995 orchestrator = create_orchestrator(config) 

996 success_count, total = orchestrator.run_sync() 

997 

998 # From async code 

999 async def main(): 

1000 config = OrchestratorConfig(repo_path=Path(".")) 

1001 orchestrator = create_orchestrator(config) 

1002 success_count, total = await orchestrator.run() 

1003 

1004 Returns: 

1005 Tuple of (success_count, total_count). 

1006 """ 

1007 # Check if we're already in an async context 

1008 try: 

1009 asyncio.get_running_loop() 

1010 # If we get here, there's a running event loop - can't use run_sync() 

1011 raise RuntimeError( 

1012 "run_sync() cannot be called from within an async context. " 

1013 "Use 'await orchestrator.run()' instead." 

1014 ) 

1015 except RuntimeError as e: 

1016 # Check if it's our error or the "no running event loop" error 

1017 if "run_sync() cannot be called" in str(e): 

1018 raise 

1019 # No running event loop - this is the expected case for sync usage 

1020 

1021 # Create a new event loop and run 

1022 return asyncio.run(self.run())