Coverage for src / orchestration / orchestrator.py: 22%
332 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""MalaOrchestrator: Orchestrates parallel issue processing using Claude Agent SDK."""
3from __future__ import annotations
5import asyncio
6import logging
7import time
8import uuid
9from typing import TYPE_CHECKING
11from src.domain.validation.spec import (
12 ValidationScope,
13 build_validation_spec,
14 extract_lint_tools_from_spec,
15)
16from src.domain.prompts import (
17 build_prompt_validation_commands,
18 format_implementer_prompt,
19 load_prompts,
20)
21from src.infra.git_utils import (
22 get_baseline_for_issue,
23 get_git_branch_async,
24 get_git_commit_async,
25)
26from src.infra.clients.cerberus_review import DefaultReviewer
27from src.infra.io.log_output.run_metadata import (
28 remove_run_marker,
29 write_run_marker,
30)
31from src.infra.tools.env import (
32 EnvConfig,
33 PROMPTS_DIR,
34 SCRIPTS_DIR,
35 get_lock_dir,
36 get_runs_dir,
37)
38from src.domain.deadlock import DeadlockMonitor
39from src.infra.tools.command_runner import CommandRunner
40from src.infra.tools.locking import (
41 LockManager,
42 cleanup_agent_locks,
43 release_run_locks,
44)
45from src.infra.sdk_adapter import SDKClientFactory
46from src.pipeline.agent_session_runner import (
47 AgentSessionInput,
48 AgentSessionRunner,
49)
50from src.pipeline.issue_finalizer import (
51 IssueFinalizeInput,
52)
53from src.pipeline.run_coordinator import (
54 RunLevelValidationInput,
55)
56from src.orchestration.orchestration_wiring import (
57 WiringDependencies,
58 FinalizerCallbackRefs,
59 EpicCallbackRefs,
60 build_gate_runner,
61 build_review_runner,
62 build_run_coordinator,
63 build_issue_coordinator,
64 build_finalizer_callbacks,
65 build_epic_callbacks,
66 build_session_callback_factory,
67 build_session_config,
68)
69from src.pipeline.issue_result import IssueResult
70from src.orchestration.review_tracking import create_review_tracking_issues
71from src.orchestration.run_config import build_event_run_config, build_run_metadata
73if TYPE_CHECKING:
74 from pathlib import Path
76 from src.core.protocols import (
77 CodeReviewer,
78 GateChecker,
79 IssueProvider,
80 LogProvider,
81 )
82 from src.domain.deadlock import DeadlockInfo
83 from src.domain.prompts import PromptProvider
84 from src.infra.epic_verifier import EpicVerifier
85 from src.infra.io.config import MalaConfig
86 from src.core.protocols import MalaEventSink
87 from src.infra.io.log_output.run_metadata import RunMetadata
88 from src.infra.telemetry import TelemetryProvider
89 from src.pipeline.agent_session_runner import SessionCallbacks
90 from src.pipeline.epic_verification_coordinator import EpicVerificationCoordinator
91 from src.pipeline.issue_finalizer import IssueFinalizer
93 from .types import OrchestratorConfig, _DerivedConfig
96# Version (from package metadata)
97from importlib.metadata import version as pkg_version
99__version__ = pkg_version("mala")
101logger = logging.getLogger(__name__)
103# Bounded wait for log file (seconds) - used by AgentSessionRunner
104# Re-exported here for backwards compatibility with tests.
105# 60s allows time for Claude SDK to flush logs, especially under load.
106LOG_FILE_WAIT_TIMEOUT = 60
107LOG_FILE_POLL_INTERVAL = 0.5
110class MalaOrchestrator:
111 """Orchestrates parallel issue processing using Claude Agent SDK.
113 Use create_orchestrator() factory function to instantiate this class.
114 The factory provides clean separation of concerns and easier testing.
115 """
117 # Type annotations for attributes set during initialization
118 _max_issues: int | None
120 def __init__(
121 self,
122 *,
123 _config: OrchestratorConfig,
124 _mala_config: MalaConfig,
125 _derived: _DerivedConfig,
126 _issue_provider: IssueProvider,
127 _code_reviewer: CodeReviewer,
128 _gate_checker: GateChecker,
129 _log_provider: LogProvider,
130 _telemetry_provider: TelemetryProvider,
131 _event_sink: MalaEventSink,
132 _epic_verifier: EpicVerifier | None = None,
133 ):
134 self._init_from_factory(
135 _config,
136 _mala_config,
137 _derived,
138 _issue_provider,
139 _code_reviewer,
140 _gate_checker,
141 _log_provider,
142 _telemetry_provider,
143 _event_sink,
144 _epic_verifier,
145 )
147 def _init_from_factory(
148 self,
149 orch_config: OrchestratorConfig,
150 mala_config: MalaConfig,
151 derived: _DerivedConfig,
152 issue_provider: IssueProvider,
153 code_reviewer: CodeReviewer,
154 gate_checker: GateChecker,
155 log_provider: LogProvider,
156 telemetry_provider: TelemetryProvider,
157 event_sink: MalaEventSink,
158 epic_verifier: EpicVerifier | None,
159 ) -> None:
160 """Initialize from factory-provided config and dependencies."""
161 self._mala_config = mala_config
162 self.repo_path = orch_config.repo_path.resolve()
163 self.max_agents = orch_config.max_agents
164 self.timeout_seconds = derived.timeout_seconds
165 self.max_issues = orch_config.max_issues
166 self.epic_id = orch_config.epic_id
167 self.only_ids = orch_config.only_ids
168 self.braintrust_enabled = derived.braintrust_enabled
169 self.max_gate_retries = orch_config.max_gate_retries
170 self.max_review_retries = orch_config.max_review_retries
171 self.disable_validations = orch_config.disable_validations
172 self._disabled_validations = derived.disabled_validations
173 self.coverage_threshold = orch_config.coverage_threshold
174 self.prioritize_wip = orch_config.prioritize_wip
175 self.focus = orch_config.focus
176 self.orphans_only = orch_config.orphans_only
177 self.cli_args = orch_config.cli_args
178 self.epic_override_ids = orch_config.epic_override_ids or set()
179 self.context_restart_threshold = orch_config.context_restart_threshold
180 self.context_limit = orch_config.context_limit
181 self.review_disabled_reason = derived.review_disabled_reason
182 self.braintrust_disabled_reason = derived.braintrust_disabled_reason
183 self._init_runtime_state()
184 self.log_provider = log_provider
185 self.quality_gate = gate_checker
186 self.event_sink = event_sink
187 self.beads = issue_provider
188 self.epic_verifier = epic_verifier
189 self.code_reviewer = code_reviewer
190 self.telemetry_provider = telemetry_provider
191 self._sdk_client_factory = SDKClientFactory()
192 self._init_pipeline_runners()
194 def _init_runtime_state(self) -> None:
195 """Initialize runtime state.
197 Note: active_tasks, failed_issues, abort_run, and abort_reason are
198 delegated to issue_coordinator to maintain a single source of truth.
199 See the corresponding properties.
201 Note: verified_epics and epics_being_verified are delegated to
202 epic_verification_coordinator.
204 Note: per_issue_spec and last_gate_results are delegated to
205 async_gate_runner.
206 """
207 self.agent_ids: dict[str, str] = {}
208 self.completed: list[IssueResult] = []
209 # Active session log paths for deadlock handling (cleared after session completes)
210 self._active_session_log_paths: dict[str, Path] = {}
211 # Track agents whose locks were cleaned during deadlock handling to avoid double cleanup
212 self._deadlock_cleaned_agents: set[str] = set()
213 self._lint_tools: frozenset[str] | None = None
214 self._prompt_validation_commands = build_prompt_validation_commands(
215 self.repo_path
216 )
217 self._prompts: PromptProvider = load_prompts(PROMPTS_DIR)
218 # Deadlock detection (T007: gate on config flag)
219 if self._mala_config.deadlock_detection_enabled:
220 self.deadlock_monitor: DeadlockMonitor | None = DeadlockMonitor()
221 logger.info("Deadlock detection enabled")
222 else:
223 self.deadlock_monitor = None
224 logger.info("Deadlock detection disabled by config")
225 self._deadlock_resolution_lock = asyncio.Lock()
227 def _init_pipeline_runners(self) -> None:
228 """Initialize pipeline runner components using wiring functions."""
229 deps = self._build_wiring_dependencies()
231 # Build core runners
232 self.gate_runner, self.async_gate_runner = build_gate_runner(deps)
233 self.review_runner = build_review_runner(deps)
234 self.run_coordinator = build_run_coordinator(deps, self._sdk_client_factory)
235 self.issue_coordinator = build_issue_coordinator(deps)
237 # Build coordinators with callbacks (callbacks need self references)
238 self.issue_finalizer = self._build_issue_finalizer()
239 self.epic_verification_coordinator = self._build_epic_verification_coordinator()
241 # Build session infrastructure
242 self.session_callback_factory = build_session_callback_factory(
243 deps,
244 self.async_gate_runner,
245 self.review_runner,
246 lambda: self.log_provider,
247 lambda: self.quality_gate,
248 on_session_log_path=self._on_session_log_path,
249 on_review_log_path=self._on_review_log_path,
250 )
251 self._session_config = build_session_config(
252 deps, review_enabled=self._is_review_enabled()
253 )
255 # Wire deadlock callback now that all dependencies are available
256 if self.deadlock_monitor is not None:
257 self.deadlock_monitor.on_deadlock = self._handle_deadlock
259 def _build_wiring_dependencies(self) -> WiringDependencies:
260 """Build WiringDependencies from orchestrator state."""
261 return WiringDependencies(
262 repo_path=self.repo_path,
263 quality_gate=self.quality_gate,
264 code_reviewer=self.code_reviewer,
265 beads=self.beads,
266 event_sink=self.event_sink,
267 mala_config=self._mala_config,
268 command_runner=CommandRunner(cwd=self.repo_path),
269 env_config=EnvConfig(),
270 lock_manager=LockManager(),
271 max_agents=self.max_agents,
272 max_issues=self._max_issues,
273 timeout_seconds=self.timeout_seconds,
274 max_gate_retries=self.max_gate_retries,
275 max_review_retries=self.max_review_retries,
276 coverage_threshold=self.coverage_threshold,
277 disabled_validations=set(self._disabled_validations)
278 if self._disabled_validations
279 else None,
280 epic_id=self.epic_id,
281 only_ids=set(self.only_ids) if self.only_ids else None,
282 prioritize_wip=self.prioritize_wip,
283 focus=self.focus,
284 orphans_only=self.orphans_only,
285 epic_override_ids=self.epic_override_ids,
286 prompt_validation_commands=self._prompt_validation_commands,
287 prompts=self._prompts,
288 context_restart_threshold=self.context_restart_threshold,
289 context_limit=self.context_limit,
290 deadlock_monitor=self.deadlock_monitor,
291 )
293 def _build_issue_finalizer(self) -> IssueFinalizer:
294 """Build IssueFinalizer with callbacks."""
295 from src.pipeline.issue_finalizer import IssueFinalizer, IssueFinalizeConfig
297 config = IssueFinalizeConfig(
298 track_review_issues=self._mala_config.track_review_issues,
299 )
300 callbacks = build_finalizer_callbacks(
301 FinalizerCallbackRefs(
302 close_issue=lambda issue_id: self.beads.close_async(issue_id),
303 mark_needs_followup=lambda issue_id, summary, log_path: (
304 self.beads.mark_needs_followup_async(
305 issue_id, summary, log_path=log_path
306 )
307 ),
308 on_issue_closed=lambda agent_id, issue_id: (
309 self.event_sink.on_issue_closed(agent_id, issue_id)
310 ),
311 on_issue_completed=lambda agent_id,
312 issue_id,
313 success,
314 duration,
315 summary: (
316 self.event_sink.on_issue_completed(
317 agent_id, issue_id, success, duration, summary
318 )
319 ),
320 trigger_epic_closure=lambda issue_id, run_metadata: (
321 self.epic_verification_coordinator.check_epic_closure(
322 issue_id, run_metadata
323 )
324 ),
325 create_tracking_issues=self._create_review_tracking_issues,
326 )
327 )
328 return IssueFinalizer(
329 config=config,
330 callbacks=callbacks,
331 quality_gate=self.quality_gate,
332 per_issue_spec=None,
333 )
335 def _build_epic_verification_coordinator(self) -> EpicVerificationCoordinator:
336 """Build EpicVerificationCoordinator with callbacks."""
337 from src.pipeline.epic_verification_coordinator import (
338 EpicVerificationCoordinator,
339 EpicVerificationConfig,
340 )
342 config = EpicVerificationConfig(
343 max_retries=self._mala_config.max_epic_verification_retries,
344 )
345 callbacks = build_epic_callbacks(
346 EpicCallbackRefs(
347 get_parent_epic=lambda issue_id: self.beads.get_parent_epic_async(
348 issue_id
349 ),
350 verify_epic=lambda epic_id, human_override: (
351 self.epic_verifier.verify_and_close_epic( # type: ignore[union-attr]
352 epic_id, human_override=human_override
353 )
354 ),
355 spawn_remediation=lambda issue_id: self.spawn_agent(issue_id),
356 finalize_remediation=lambda issue_id, result, run_metadata: (
357 self._finalize_issue_result(issue_id, result, run_metadata)
358 ),
359 mark_completed=lambda issue_id: self.issue_coordinator.mark_completed(
360 issue_id
361 ),
362 is_issue_failed=lambda issue_id: issue_id in self.failed_issues,
363 close_eligible_epics=lambda: self.beads.close_eligible_epics_async(),
364 on_epic_closed=lambda issue_id: self.event_sink.on_epic_closed(
365 issue_id
366 ),
367 on_warning=lambda msg: self.event_sink.on_warning(msg),
368 has_epic_verifier=lambda: self.epic_verifier is not None,
369 get_agent_id=lambda issue_id: self.agent_ids.get(issue_id, "unknown"),
370 )
371 )
372 return EpicVerificationCoordinator(
373 config=config,
374 callbacks=callbacks,
375 epic_override_ids=self.epic_override_ids,
376 )
378 async def _create_review_tracking_issues(
379 self,
380 issue_id: str,
381 review_issues: list,
382 ) -> None:
383 """Create tracking issues for P2/P3 review findings."""
384 # Get parent epic so review tracking issues stay in the same epic
385 parent_epic_id = await self.beads.get_parent_epic_async(issue_id)
386 await create_review_tracking_issues(
387 self.beads,
388 self.event_sink,
389 issue_id,
390 review_issues,
391 parent_epic_id=parent_epic_id,
392 )
394 # Delegate state to issue_coordinator (single source of truth)
395 # These properties return empty containers if accessed before coordinator init
396 @property
397 def active_tasks(self) -> dict[str, asyncio.Task[IssueResult]]:
398 """Active agent tasks, delegated to issue_coordinator."""
399 if not hasattr(self, "issue_coordinator"):
400 return {}
401 return self.issue_coordinator.active_tasks # type: ignore[return-value]
403 @property
404 def failed_issues(self) -> set[str]:
405 """Failed issue IDs, delegated to issue_coordinator."""
406 if not hasattr(self, "issue_coordinator"):
407 return set()
408 return self.issue_coordinator.failed_issues
410 @property
411 def max_issues(self) -> int | None:
412 """Maximum issues to process, synced with issue_coordinator."""
413 if not hasattr(self, "issue_coordinator"):
414 return self._max_issues if hasattr(self, "_max_issues") else None
415 return self.issue_coordinator.config.max_issues
417 @max_issues.setter
418 def max_issues(self, value: int | None) -> None:
419 """Set max_issues and sync to issue_coordinator."""
420 self._max_issues = value
421 if hasattr(self, "issue_coordinator"):
422 self.issue_coordinator.config.max_issues = value
424 @property
425 def abort_run(self) -> bool:
426 """Whether run should abort, delegated to issue_coordinator."""
427 if not hasattr(self, "issue_coordinator"):
428 return False
429 return self.issue_coordinator.abort_run
431 @property
432 def abort_reason(self) -> str | None:
433 """Abort reason, delegated to issue_coordinator."""
434 if not hasattr(self, "issue_coordinator"):
435 return None
436 return self.issue_coordinator.abort_reason
438 def _cleanup_agent_locks(self, agent_id: str) -> None:
439 """Remove locks held by a specific agent (crash/timeout cleanup)."""
440 cleaned = cleanup_agent_locks(agent_id)
441 if cleaned:
442 logger.info("Agent locks cleaned: agent_id=%s count=%d", agent_id, cleaned)
443 self.event_sink.on_locks_cleaned(agent_id, cleaned)
444 # Unregister agent from deadlock monitor
445 if self.deadlock_monitor is not None:
446 self.deadlock_monitor.unregister_agent(agent_id)
448 async def _handle_deadlock(self, info: DeadlockInfo) -> None:
449 """Handle a detected deadlock by cancelling victim and recording dependency.
451 Called by DeadlockMonitor when a cycle is detected. Uses an asyncio.Lock
452 to prevent concurrent resolution of multiple deadlocks.
454 Args:
455 info: DeadlockInfo with cycle, victim, and blocker details.
456 """
457 logger.debug("Acquiring deadlock resolution lock for victim %s", info.victim_id)
458 async with self._deadlock_resolution_lock:
459 logger.info(
460 "Deadlock resolution started: victim_id=%s issue_id=%s blocked_on=%s",
461 info.victim_id,
462 info.victim_issue_id,
463 info.blocked_on,
464 )
465 self.event_sink.on_deadlock_detected(info)
467 victim_issue_id = info.victim_issue_id
468 task_to_cancel: asyncio.Task[object] | None = None
469 is_self_cancel = False
471 # Identify victim task for cancellation
472 if victim_issue_id and victim_issue_id in self.active_tasks:
473 task = self.active_tasks[victim_issue_id]
474 if not task.done():
475 task_to_cancel = task
476 is_self_cancel = task is asyncio.current_task()
478 # Clean up victim's locks first (before any await that could raise)
479 # Track that we cleaned this agent to avoid double cleanup in run_implementer
480 self._cleanup_agent_locks(info.victim_id)
481 self._deadlock_cleaned_agents.add(info.victim_id)
483 # Use shield to protect resolution from cancellation
484 # Track whether cancellation occurred during shielded section
485 cancelled_during_shield = False
486 try:
487 await asyncio.shield(self._resolve_deadlock(info))
488 except asyncio.CancelledError:
489 cancelled_during_shield = True
490 # In self-cancel case, we'll schedule deferred cancellation below,
491 # so don't re-raise yet. For external cancellation, re-raise.
492 if not is_self_cancel:
493 raise
495 # Cancel victim task after resolution is complete
496 if task_to_cancel is not None:
497 if is_self_cancel:
498 # Defer self-cancellation to avoid interrupting this handler
499 loop = asyncio.get_running_loop()
500 loop.call_soon(task_to_cancel.cancel)
501 logger.info("Victim killed: agent_id=%s", info.victim_id)
502 else:
503 task_to_cancel.cancel()
504 logger.info("Victim killed: agent_id=%s", info.victim_id)
506 # If we caught CancelledError in self-cancel case but it arrived before
507 # we scheduled our deferred cancellation, it was from an external source.
508 # Re-raise after scheduling our own cancellation to not mask it.
509 if cancelled_during_shield and is_self_cancel:
510 raise asyncio.CancelledError()
512 async def _resolve_deadlock(self, info: DeadlockInfo) -> None:
513 """Perform dependency and needs-followup updates for deadlock resolution.
515 Separated from _handle_deadlock to allow shielding from cancellation.
517 Args:
518 info: DeadlockInfo with cycle, victim, and blocker details.
519 """
520 victim_issue_id = info.victim_issue_id
522 # Add dependency: victim issue depends on blocker issue
523 if victim_issue_id and info.blocker_issue_id:
524 success = await self.beads.add_dependency_async(
525 victim_issue_id, info.blocker_issue_id
526 )
527 if success:
528 logger.info(
529 "Added dependency: %s depends on %s",
530 victim_issue_id,
531 info.blocker_issue_id,
532 )
533 else:
534 logger.warning(
535 "Failed to add dependency: %s depends on %s",
536 victim_issue_id,
537 info.blocker_issue_id,
538 )
540 # Mark victim issue as needs-followup
541 if victim_issue_id:
542 reason = (
543 f"Deadlock victim: blocked on {info.blocked_on} "
544 f"held by {info.blocker_id}"
545 )
546 log_path = self._active_session_log_paths.get(victim_issue_id)
547 await self.beads.mark_needs_followup_async(
548 victim_issue_id, reason, log_path=log_path
549 )
550 logger.info("Marked issue %s as needs-followup", victim_issue_id)
552 def _request_abort(self, reason: str) -> None:
553 """Signal that the current run should stop due to a fatal error."""
554 self.issue_coordinator.request_abort(reason)
556 def _is_review_enabled(self) -> bool:
557 """Return whether review should run for this orchestrator instance."""
558 if "review" not in self._disabled_validations:
559 return True
560 if self.review_disabled_reason and not isinstance(
561 self.review_runner.code_reviewer, DefaultReviewer
562 ):
563 return True
564 return False
566 def _on_session_log_path(self, issue_id: str, log_path: Path) -> None:
567 """Store session log path for an active session.
569 Called by session callback factory when log path becomes available.
570 Used for deadlock handling during active sessions.
572 Args:
573 issue_id: The issue ID.
574 log_path: Path to the session log file.
575 """
576 self._active_session_log_paths[issue_id] = log_path
578 def _on_review_log_path(self, issue_id: str, log_path: str) -> None:
579 """Handle review log path notification (currently unused).
581 Review log paths are now returned via IssueResult.review_log_path.
582 This callback exists for symmetry but can be a no-op.
584 Args:
585 issue_id: The issue ID.
586 log_path: Path to the review session log file.
587 """
588 pass # Review log path flows through IssueResult
590 def _cleanup_active_session_path(self, issue_id: str) -> None:
591 """Remove stored session log path for a completed issue.
593 Args:
594 issue_id: The issue ID to clean up.
595 """
596 self._active_session_log_paths.pop(issue_id, None)
598 async def _finalize_issue_result(
599 self,
600 issue_id: str,
601 result: IssueResult,
602 run_metadata: RunMetadata,
603 ) -> None:
604 """Record an issue result, update metadata, and emit logs.
606 Delegates to IssueFinalizer for the core finalization logic.
607 Uses stored gate result to derive metadata, avoiding duplicate
608 validation parsing. Gate result includes validation_evidence
609 already parsed during quality gate check.
610 """
611 stored_gate_result = self.async_gate_runner.get_last_gate_result(issue_id)
613 # Update finalizer's per_issue_spec if it has changed
614 self.issue_finalizer.per_issue_spec = self.async_gate_runner.per_issue_spec
616 # Build finalization input - use log paths from IssueResult
617 finalize_input = IssueFinalizeInput(
618 issue_id=issue_id,
619 result=result,
620 run_metadata=run_metadata,
621 log_path=result.session_log_path,
622 stored_gate_result=stored_gate_result,
623 review_log_path=result.review_log_path,
624 )
626 # Track failed issues before finalize to ensure they're recorded
627 # even if finalize raises (e.g., mark_needs_followup callback fails)
628 if not result.success:
629 self.failed_issues.add(issue_id)
631 # Delegate to finalizer, ensuring cleanup happens even if finalize raises
632 try:
633 await self.issue_finalizer.finalize(finalize_input)
634 finally:
635 # Update tracking state (active_tasks is updated by mark_completed in finalize_callback)
636 self.completed.append(result)
638 # Cleanup active session path and gate result
639 self._cleanup_active_session_path(issue_id)
640 self.async_gate_runner.clear_gate_result(issue_id)
642 # Remove agent_id from tracking now that finalization is complete
643 # (deferred from run_implementer.finally to keep it available for get_agent_id callback)
644 self.agent_ids.pop(issue_id, None)
646 async def _abort_active_tasks(self, run_metadata: RunMetadata) -> None:
647 """Cancel active tasks and mark them as failed.
649 Tasks that have already completed are finalized with their real results
650 rather than being marked as aborted.
651 """
652 if not self.active_tasks:
653 return
654 reason = self.abort_reason or "Unrecoverable error"
655 self.event_sink.on_tasks_aborting(len(self.active_tasks), reason)
656 # Cancel tasks that are still running
657 for task in self.active_tasks.values():
658 if not task.done():
659 task.cancel()
661 # Finalize each remaining issue - use real result if already done
662 for issue_id, task in list(self.active_tasks.items()):
663 if task.done():
664 # Task completed before we could cancel - use real result
665 try:
666 result = task.result()
667 except asyncio.CancelledError:
668 result = IssueResult(
669 issue_id=issue_id,
670 agent_id=self.agent_ids.get(issue_id, "unknown"),
671 success=False,
672 summary=f"Aborted due to unrecoverable error: {reason}",
673 session_log_path=self._active_session_log_paths.get(issue_id),
674 )
675 except Exception as e:
676 result = IssueResult(
677 issue_id=issue_id,
678 agent_id=self.agent_ids.get(issue_id, "unknown"),
679 success=False,
680 summary=str(e),
681 session_log_path=self._active_session_log_paths.get(issue_id),
682 )
683 else:
684 # Task was still running - mark as aborted
685 result = IssueResult(
686 issue_id=issue_id,
687 agent_id=self.agent_ids.get(issue_id, "unknown"),
688 success=False,
689 summary=f"Aborted due to unrecoverable error: {reason}",
690 session_log_path=self._active_session_log_paths.get(issue_id),
691 )
692 await self._finalize_issue_result(issue_id, result, run_metadata)
693 # Mark completed in coordinator to keep state consistent
694 self.issue_coordinator.mark_completed(issue_id)
696 def _build_session_callbacks(self, issue_id: str) -> SessionCallbacks:
697 """Build callbacks for session operations.
699 Delegates to SessionCallbackFactory for callback construction.
701 Args:
702 issue_id: The issue ID for tracking state.
704 Returns:
705 SessionCallbacks with gate, review, and logging callbacks.
706 """
707 return self.session_callback_factory.build(
708 issue_id=issue_id,
709 on_abort=self._request_abort,
710 )
712 async def run_implementer(self, issue_id: str) -> IssueResult:
713 """Run implementer agent for a single issue with gate retry support.
715 Delegates to AgentSessionRunner for SDK-specific session handling.
716 """
717 temp_agent_id = f"{issue_id}-{uuid.uuid4().hex[:8]}"
718 self.agent_ids[issue_id] = temp_agent_id
720 # Register with deadlock monitor
721 if self.deadlock_monitor is not None:
722 self.deadlock_monitor.register_agent(
723 agent_id=temp_agent_id,
724 issue_id=issue_id,
725 start_time=time.time(),
726 )
728 # Prepare session inputs
729 issue_description = await self.beads.get_issue_description_async(issue_id)
730 baseline_commit = await get_baseline_for_issue(self.repo_path, issue_id)
731 if baseline_commit is None:
732 baseline_commit = await get_git_commit_async(self.repo_path)
734 prompt = format_implementer_prompt(
735 self._prompts.implementer_prompt,
736 issue_id=issue_id,
737 repo_path=self.repo_path,
738 agent_id=temp_agent_id,
739 validation_commands=self._prompt_validation_commands,
740 lock_dir=get_lock_dir(),
741 scripts_dir=SCRIPTS_DIR,
742 )
744 session_input = AgentSessionInput(
745 issue_id=issue_id,
746 prompt=prompt,
747 baseline_commit=baseline_commit,
748 issue_description=issue_description,
749 agent_id=temp_agent_id,
750 )
752 runner = AgentSessionRunner(
753 config=self._session_config,
754 sdk_client_factory=self._sdk_client_factory,
755 callbacks=self._build_session_callbacks(issue_id),
756 event_sink=self.event_sink,
757 )
758 tracer = self.telemetry_provider.create_span(
759 issue_id,
760 metadata={
761 "agent_id": temp_agent_id,
762 "mala_version": __version__,
763 "project_dir": self.repo_path.name,
764 "git_branch": await get_git_branch_async(self.repo_path),
765 "git_commit": await get_git_commit_async(self.repo_path),
766 "timeout_seconds": self.timeout_seconds,
767 },
768 )
770 try:
771 with tracer:
772 tracer.log_input(prompt)
773 output = await runner.run_session(session_input, tracer=tracer)
774 self.agent_ids[issue_id] = output.agent_id
775 if output.success:
776 tracer.set_success(True)
777 else:
778 tracer.set_error(output.summary)
779 finally:
780 # Get agent_id for lock cleanup but don't pop - entry needed for get_agent_id callback
781 # until finalization completes (see _finalize_issue_result)
782 agent_id = self.agent_ids.get(issue_id, temp_agent_id)
783 # Skip cleanup if already done during deadlock handling (avoid double unregister)
784 if agent_id not in self._deadlock_cleaned_agents:
785 self._cleanup_agent_locks(agent_id)
786 else:
787 logger.debug(
788 "Skipped cleanup for %s (handled during deadlock)", agent_id
789 )
790 self._deadlock_cleaned_agents.discard(agent_id)
792 return IssueResult(
793 issue_id=issue_id,
794 agent_id=output.agent_id,
795 success=output.success,
796 summary=output.summary,
797 duration_seconds=output.duration_seconds,
798 session_id=output.session_id,
799 gate_attempts=output.gate_attempts,
800 review_attempts=output.review_attempts,
801 resolution=output.resolution,
802 low_priority_review_issues=output.low_priority_review_issues,
803 session_log_path=output.log_path,
804 review_log_path=output.review_log_path,
805 )
807 async def spawn_agent(self, issue_id: str) -> asyncio.Task | None: # type: ignore[type-arg]
808 """Spawn a new agent task for an issue. Returns the Task if spawned, None otherwise."""
809 if not await self.beads.claim_async(issue_id):
810 self.issue_coordinator.mark_failed(issue_id)
811 self.event_sink.on_claim_failed(issue_id, issue_id)
812 return None
814 task = asyncio.create_task(self.run_implementer(issue_id))
815 self.event_sink.on_agent_started(issue_id, issue_id)
816 return task
818 async def _run_main_loop(self, run_metadata: RunMetadata) -> int:
819 """Run the main agent spawning and completion loop.
821 Delegates to IssueExecutionCoordinator for the main loop logic.
822 Returns the number of issues spawned.
823 """
825 async def finalize_callback(
826 issue_id: str,
827 task: asyncio.Task, # type: ignore[type-arg]
828 ) -> None:
829 """Finalize a completed task."""
830 try:
831 result = task.result()
832 except asyncio.CancelledError:
833 result = IssueResult(
834 issue_id=issue_id,
835 agent_id=self.agent_ids.get(issue_id, "unknown"),
836 success=False,
837 summary=(
838 f"Aborted due to unrecoverable error: {self.issue_coordinator.abort_reason}"
839 if self.issue_coordinator.abort_reason
840 else "Aborted due to unrecoverable error"
841 ),
842 session_log_path=self._active_session_log_paths.get(issue_id),
843 )
844 except Exception as e:
845 result = IssueResult(
846 issue_id=issue_id,
847 agent_id=self.agent_ids.get(issue_id, "unknown"),
848 success=False,
849 summary=str(e),
850 session_log_path=self._active_session_log_paths.get(issue_id),
851 )
852 await self._finalize_issue_result(issue_id, result, run_metadata)
853 self.issue_coordinator.mark_completed(issue_id)
855 async def abort_callback() -> None:
856 """Abort all active tasks."""
857 await self._abort_active_tasks(run_metadata)
859 return await self.issue_coordinator.run_loop(
860 spawn_callback=self.spawn_agent,
861 finalize_callback=finalize_callback,
862 abort_callback=abort_callback,
863 )
865 async def _finalize_run(
866 self,
867 run_metadata: RunMetadata,
868 run_validation_passed: bool,
869 ) -> tuple[int, int]:
870 """Log summary and return final results."""
871 success_count = sum(1 for r in self.completed if r.success)
872 total = len(self.completed)
874 # Emit run completed event
875 abort_reason = (
876 self.abort_reason or "Unrecoverable error" if self.abort_run else None
877 )
878 self.event_sink.on_run_completed(
879 success_count, total, run_validation_passed, abort_reason
880 )
882 if success_count > 0:
883 if await self.beads.commit_issues_async():
884 self.event_sink.on_issues_committed()
886 if total > 0:
887 metadata_path = run_metadata.save()
888 self.event_sink.on_run_metadata_saved(str(metadata_path))
890 print()
891 if self.abort_run:
892 return (0, total)
893 if not run_validation_passed:
894 return (0, total)
895 return (success_count, total)
897 async def run(self) -> tuple[int, int]:
898 """Main orchestration loop. Returns (success_count, total_count)."""
899 run_config = build_event_run_config(
900 repo_path=self.repo_path,
901 max_agents=self.max_agents,
902 timeout_seconds=self.timeout_seconds,
903 max_issues=self.max_issues,
904 max_gate_retries=self.max_gate_retries,
905 max_review_retries=self.max_review_retries,
906 epic_id=self.epic_id,
907 only_ids=self.only_ids,
908 braintrust_enabled=self.braintrust_enabled,
909 review_enabled=self._is_review_enabled(),
910 review_disabled_reason=self.review_disabled_reason,
911 prioritize_wip=self.prioritize_wip,
912 orphans_only=self.orphans_only,
913 cli_args=self.cli_args,
914 braintrust_disabled_reason=self.braintrust_disabled_reason,
915 )
916 self.event_sink.on_run_started(run_config)
918 get_lock_dir().mkdir(parents=True, exist_ok=True)
919 get_runs_dir().mkdir(parents=True, exist_ok=True)
921 run_metadata = build_run_metadata(
922 repo_path=self.repo_path,
923 max_agents=self.max_agents,
924 timeout_seconds=self.timeout_seconds,
925 max_issues=self.max_issues,
926 epic_id=self.epic_id,
927 only_ids=self.only_ids,
928 braintrust_enabled=self.braintrust_enabled,
929 max_gate_retries=self.max_gate_retries,
930 max_review_retries=self.max_review_retries,
931 review_enabled=self._is_review_enabled(),
932 orphans_only=self.orphans_only,
933 cli_args=self.cli_args,
934 version=__version__,
935 )
936 per_issue_spec = build_validation_spec(
937 self.repo_path,
938 scope=ValidationScope.PER_ISSUE,
939 disable_validations=self._disabled_validations,
940 )
941 self.async_gate_runner.per_issue_spec = per_issue_spec
942 self._lint_tools = extract_lint_tools_from_spec(per_issue_spec)
943 self._session_config.lint_tools = self._lint_tools
944 write_run_marker(
945 run_id=run_metadata.run_id,
946 repo_path=self.repo_path,
947 max_agents=self.max_agents,
948 )
950 try:
951 try:
952 await self._run_main_loop(run_metadata)
953 finally:
954 released = release_run_locks(list(self.agent_ids.values()))
955 # Only emit event if released is a positive integer
956 # (release_run_locks returns int, but tests may mock it)
957 if isinstance(released, int) and released > 0:
958 self.event_sink.on_locks_released(released)
959 remove_run_marker(run_metadata.run_id)
961 # Run-level validation and finalization happen after lock cleanup
962 # but before debug log cleanup (so they're captured in the debug log)
963 success_count = sum(1 for r in self.completed if r.success)
964 run_validation_passed = True
965 if success_count > 0 and not self.abort_run:
966 validation_input = RunLevelValidationInput(run_metadata=run_metadata)
967 validation_output = await self.run_coordinator.run_validation(
968 validation_input
969 )
970 run_validation_passed = validation_output.passed
972 return await self._finalize_run(run_metadata, run_validation_passed)
973 finally:
974 # Clean up debug logging handler after all finalization is complete
975 # (idempotent - safe to call even if save() is called later)
976 run_metadata.cleanup()
978 def run_sync(self) -> tuple[int, int]:
979 """Synchronous wrapper for run(). Returns (success_count, total_count).
981 Use this method when calling from synchronous code. It handles event loop
982 creation and cleanup automatically.
984 Raises:
985 RuntimeError: If called from within an async context (e.g., inside an
986 async function or when an event loop is already running). In that
987 case, use `await orchestrator.run()` instead.
989 Example usage::
991 from src.orchestration.factory import create_orchestrator, OrchestratorConfig
993 # From sync code (scripts, CLI, tests)
994 config = OrchestratorConfig(repo_path=Path("."))
995 orchestrator = create_orchestrator(config)
996 success_count, total = orchestrator.run_sync()
998 # From async code
999 async def main():
1000 config = OrchestratorConfig(repo_path=Path("."))
1001 orchestrator = create_orchestrator(config)
1002 success_count, total = await orchestrator.run()
1004 Returns:
1005 Tuple of (success_count, total_count).
1006 """
1007 # Check if we're already in an async context
1008 try:
1009 asyncio.get_running_loop()
1010 # If we get here, there's a running event loop - can't use run_sync()
1011 raise RuntimeError(
1012 "run_sync() cannot be called from within an async context. "
1013 "Use 'await orchestrator.run()' instead."
1014 )
1015 except RuntimeError as e:
1016 # Check if it's our error or the "no running event loop" error
1017 if "run_sync() cannot be called" in str(e):
1018 raise
1019 # No running event loop - this is the expected case for sync usage
1021 # Create a new event loop and run
1022 return asyncio.run(self.run())