Coverage for src / pipeline / issue_execution_coordinator.py: 32%
88 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Issue execution coordination for MalaOrchestrator.
3This module contains the IssueExecutionCoordinator which manages the main agent
4spawning loop and issue lifecycle during a run.
6Design principles:
7- Protocol-based dependencies for testability without SDK
8- Callback-based agent spawning (SDK logic stays in orchestrator)
9- Clear separation: coordinator handles scheduling, orchestrator handles SDK
10"""
12from __future__ import annotations
14import asyncio
15import logging
16from dataclasses import dataclass
17from typing import TYPE_CHECKING, Protocol
19if TYPE_CHECKING:
20 from src.core.protocols import IssueProvider, MalaEventSink
22logger = logging.getLogger(__name__)
25class SpawnCallback(Protocol):
26 """Callback for spawning an agent for an issue.
28 Returns the spawned Task on success, or None if spawn failed.
29 The coordinator automatically registers the returned task.
30 """
32 async def __call__(self, issue_id: str) -> asyncio.Task | None: # type: ignore[type-arg]
33 """Spawn an agent for the given issue."""
34 ...
37class FinalizeCallback(Protocol):
38 """Callback for finalizing an issue result.
40 Takes the issue_id and the task that completed.
41 """
43 async def __call__(self, issue_id: str, task: asyncio.Task) -> None: # type: ignore[type-arg]
44 """Finalize the result of a completed task."""
45 ...
48class AbortCallback(Protocol):
49 """Callback for aborting active tasks."""
51 async def __call__(self) -> None:
52 """Abort all active tasks."""
53 ...
56@dataclass
57class CoordinatorConfig:
58 """Configuration for IssueExecutionCoordinator.
60 Attributes:
61 max_agents: Maximum concurrent agents (None = unlimited).
62 max_issues: Maximum issues to process (None = unlimited).
63 epic_id: Only process tasks under this epic.
64 only_ids: Set of issue IDs to process exclusively.
65 prioritize_wip: Prioritize in_progress issues before open issues.
66 focus: Group tasks by epic for focused work.
67 orphans_only: Only process issues with no parent epic.
68 """
70 max_agents: int | None = None
71 max_issues: int | None = None
72 epic_id: str | None = None
73 only_ids: set[str] | None = None
74 prioritize_wip: bool = False
75 focus: bool = True
76 orphans_only: bool = False
79class IssueExecutionCoordinator:
80 """Coordinates issue execution without SDK dependencies.
82 This class manages the main agent spawning loop, handling:
83 - Issue fetching from IssueProvider
84 - Concurrent agent limiting (max_agents)
85 - Issue count limiting (max_issues)
86 - Epic and only_ids filtering
87 - WIP prioritization and focus grouping
89 The coordinator uses callbacks for agent spawning and finalization,
90 keeping SDK-specific logic in the orchestrator.
92 Example:
93 coordinator = IssueExecutionCoordinator(
94 beads=beads_client,
95 event_sink=console_sink,
96 config=CoordinatorConfig(max_agents=2),
97 )
98 issues_spawned = await coordinator.run_loop(
99 spawn_callback=orchestrator.spawn_agent,
100 finalize_callback=orchestrator._finalize_issue_result,
101 abort_callback=orchestrator._abort_active_tasks,
102 )
103 """
105 def __init__(
106 self,
107 beads: IssueProvider,
108 event_sink: MalaEventSink,
109 config: CoordinatorConfig,
110 ) -> None:
111 """Initialize the coordinator.
113 Args:
114 beads: Issue provider for fetching ready issues.
115 event_sink: Event sink for logging lifecycle events.
116 config: Coordinator configuration.
117 """
118 self.beads = beads
119 self.event_sink = event_sink
120 self.config = config
122 # Runtime state
123 self.active_tasks: dict[str, asyncio.Task] = {} # type: ignore[type-arg]
124 self.completed_ids: set[str] = set()
125 self.failed_issues: set[str] = set()
126 self.abort_run: bool = False
127 self.abort_reason: str | None = None
129 def request_abort(self, reason: str) -> None:
130 """Signal that the current run should stop due to a fatal error.
132 Args:
133 reason: Description of why the run should abort.
134 """
135 if self.abort_run:
136 return
137 self.abort_run = True
138 self.abort_reason = reason
139 logger.warning("Abort requested: reason=%s", reason)
140 self.event_sink.on_abort_requested(reason)
142 async def run_loop(
143 self,
144 spawn_callback: SpawnCallback,
145 finalize_callback: FinalizeCallback,
146 abort_callback: AbortCallback,
147 ) -> int:
148 """Run the main agent spawning and completion loop.
150 Args:
151 spawn_callback: Called to spawn an agent for an issue.
152 Returns the spawned Task on success, or None if spawn failed.
153 The coordinator automatically registers the returned task.
154 finalize_callback: Called when a task completes.
155 Receives issue_id and the completed task.
156 abort_callback: Called when abort is triggered.
157 Should cancel and finalize all active tasks.
159 Returns:
160 Number of issues spawned.
161 """
162 issues_spawned = 0
164 while True:
165 logger.debug(
166 "Loop iteration: active=%d completed=%d",
167 len(self.active_tasks),
168 len(self.completed_ids),
169 )
170 # Check for abort
171 if self.abort_run:
172 await abort_callback()
173 break
175 # Check if we've hit the issue limit
176 limit_reached = (
177 self.config.max_issues is not None
178 and issues_spawned >= self.config.max_issues
179 )
181 # Build suppress_warn_ids for only_ids mode
182 suppress_warn_ids = None
183 if self.config.only_ids:
184 suppress_warn_ids = (
185 self.failed_issues
186 | set(self.active_tasks.keys())
187 | self.completed_ids
188 )
190 # Fetch ready issues (unless we've hit the limit)
191 ready = (
192 await self.beads.get_ready_async(
193 self.failed_issues,
194 epic_id=self.config.epic_id,
195 only_ids=self.config.only_ids,
196 suppress_warn_ids=suppress_warn_ids,
197 prioritize_wip=self.config.prioritize_wip,
198 focus=self.config.focus,
199 orphans_only=self.config.orphans_only,
200 )
201 if not limit_reached
202 else []
203 )
205 if ready:
206 self.event_sink.on_ready_issues(list(ready))
208 # Spawn agents while we have capacity and ready issues
209 while (
210 self.config.max_agents is None
211 or len(self.active_tasks) < self.config.max_agents
212 ) and ready:
213 issue_id = ready.pop(0)
214 if issue_id not in self.active_tasks:
215 task = await spawn_callback(issue_id)
216 if task is not None:
217 self.register_task(issue_id, task)
218 issues_spawned += 1
219 if (
220 self.config.max_issues is not None
221 and issues_spawned >= self.config.max_issues
222 ):
223 break
225 # Exit if no active work
226 if not self.active_tasks:
227 if limit_reached:
228 self.event_sink.on_no_more_issues(
229 f"limit_reached ({self.config.max_issues})"
230 )
231 elif not ready:
232 self.event_sink.on_no_more_issues("none_ready")
233 break
235 # Wait for at least one task to complete
236 self.event_sink.on_waiting_for_agents(len(self.active_tasks))
237 done, _ = await asyncio.wait(
238 self.active_tasks.values(),
239 return_when=asyncio.FIRST_COMPLETED,
240 )
242 # Finalize completed tasks
243 for task in done:
244 for issue_id, t in list(self.active_tasks.items()):
245 if t is task:
246 await finalize_callback(issue_id, task)
247 break
249 # Check for abort after processing completions
250 if self.abort_run:
251 await abort_callback()
252 break
254 return issues_spawned
256 def register_task(self, issue_id: str, task: asyncio.Task) -> None: # type: ignore[type-arg]
257 """Register an active task for an issue.
259 Called by spawn_callback after successfully creating a task.
261 Args:
262 issue_id: The issue ID.
263 task: The asyncio task running the agent.
264 """
265 self.active_tasks[issue_id] = task
266 logger.debug("Task registered: issue_id=%s", issue_id)
268 def mark_failed(self, issue_id: str) -> None:
269 """Mark an issue as failed (e.g., claim failed).
271 Args:
272 issue_id: The issue ID that failed.
273 """
274 self.failed_issues.add(issue_id)
275 logger.info("Issue marked failed: issue_id=%s", issue_id)
277 def mark_completed(self, issue_id: str) -> None:
278 """Mark an issue as completed and remove from active.
280 Args:
281 issue_id: The issue ID that completed.
282 """
283 self.completed_ids.add(issue_id)
284 self.active_tasks.pop(issue_id, None)
285 logger.debug("Issue marked completed: issue_id=%s", issue_id)