Coverage for src / pipeline / issue_execution_coordinator.py: 32%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Issue execution coordination for MalaOrchestrator. 

2 

3This module contains the IssueExecutionCoordinator which manages the main agent 

4spawning loop and issue lifecycle during a run. 

5 

6Design principles: 

7- Protocol-based dependencies for testability without SDK 

8- Callback-based agent spawning (SDK logic stays in orchestrator) 

9- Clear separation: coordinator handles scheduling, orchestrator handles SDK 

10""" 

11 

12from __future__ import annotations 

13 

14import asyncio 

15import logging 

16from dataclasses import dataclass 

17from typing import TYPE_CHECKING, Protocol 

18 

19if TYPE_CHECKING: 

20 from src.core.protocols import IssueProvider, MalaEventSink 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class SpawnCallback(Protocol): 

26 """Callback for spawning an agent for an issue. 

27 

28 Returns the spawned Task on success, or None if spawn failed. 

29 The coordinator automatically registers the returned task. 

30 """ 

31 

32 async def __call__(self, issue_id: str) -> asyncio.Task | None: # type: ignore[type-arg] 

33 """Spawn an agent for the given issue.""" 

34 ... 

35 

36 

37class FinalizeCallback(Protocol): 

38 """Callback for finalizing an issue result. 

39 

40 Takes the issue_id and the task that completed. 

41 """ 

42 

43 async def __call__(self, issue_id: str, task: asyncio.Task) -> None: # type: ignore[type-arg] 

44 """Finalize the result of a completed task.""" 

45 ... 

46 

47 

48class AbortCallback(Protocol): 

49 """Callback for aborting active tasks.""" 

50 

51 async def __call__(self) -> None: 

52 """Abort all active tasks.""" 

53 ... 

54 

55 

56@dataclass 

57class CoordinatorConfig: 

58 """Configuration for IssueExecutionCoordinator. 

59 

60 Attributes: 

61 max_agents: Maximum concurrent agents (None = unlimited). 

62 max_issues: Maximum issues to process (None = unlimited). 

63 epic_id: Only process tasks under this epic. 

64 only_ids: Set of issue IDs to process exclusively. 

65 prioritize_wip: Prioritize in_progress issues before open issues. 

66 focus: Group tasks by epic for focused work. 

67 orphans_only: Only process issues with no parent epic. 

68 """ 

69 

70 max_agents: int | None = None 

71 max_issues: int | None = None 

72 epic_id: str | None = None 

73 only_ids: set[str] | None = None 

74 prioritize_wip: bool = False 

75 focus: bool = True 

76 orphans_only: bool = False 

77 

78 

79class IssueExecutionCoordinator: 

80 """Coordinates issue execution without SDK dependencies. 

81 

82 This class manages the main agent spawning loop, handling: 

83 - Issue fetching from IssueProvider 

84 - Concurrent agent limiting (max_agents) 

85 - Issue count limiting (max_issues) 

86 - Epic and only_ids filtering 

87 - WIP prioritization and focus grouping 

88 

89 The coordinator uses callbacks for agent spawning and finalization, 

90 keeping SDK-specific logic in the orchestrator. 

91 

92 Example: 

93 coordinator = IssueExecutionCoordinator( 

94 beads=beads_client, 

95 event_sink=console_sink, 

96 config=CoordinatorConfig(max_agents=2), 

97 ) 

98 issues_spawned = await coordinator.run_loop( 

99 spawn_callback=orchestrator.spawn_agent, 

100 finalize_callback=orchestrator._finalize_issue_result, 

101 abort_callback=orchestrator._abort_active_tasks, 

102 ) 

103 """ 

104 

105 def __init__( 

106 self, 

107 beads: IssueProvider, 

108 event_sink: MalaEventSink, 

109 config: CoordinatorConfig, 

110 ) -> None: 

111 """Initialize the coordinator. 

112 

113 Args: 

114 beads: Issue provider for fetching ready issues. 

115 event_sink: Event sink for logging lifecycle events. 

116 config: Coordinator configuration. 

117 """ 

118 self.beads = beads 

119 self.event_sink = event_sink 

120 self.config = config 

121 

122 # Runtime state 

123 self.active_tasks: dict[str, asyncio.Task] = {} # type: ignore[type-arg] 

124 self.completed_ids: set[str] = set() 

125 self.failed_issues: set[str] = set() 

126 self.abort_run: bool = False 

127 self.abort_reason: str | None = None 

128 

129 def request_abort(self, reason: str) -> None: 

130 """Signal that the current run should stop due to a fatal error. 

131 

132 Args: 

133 reason: Description of why the run should abort. 

134 """ 

135 if self.abort_run: 

136 return 

137 self.abort_run = True 

138 self.abort_reason = reason 

139 logger.warning("Abort requested: reason=%s", reason) 

140 self.event_sink.on_abort_requested(reason) 

141 

142 async def run_loop( 

143 self, 

144 spawn_callback: SpawnCallback, 

145 finalize_callback: FinalizeCallback, 

146 abort_callback: AbortCallback, 

147 ) -> int: 

148 """Run the main agent spawning and completion loop. 

149 

150 Args: 

151 spawn_callback: Called to spawn an agent for an issue. 

152 Returns the spawned Task on success, or None if spawn failed. 

153 The coordinator automatically registers the returned task. 

154 finalize_callback: Called when a task completes. 

155 Receives issue_id and the completed task. 

156 abort_callback: Called when abort is triggered. 

157 Should cancel and finalize all active tasks. 

158 

159 Returns: 

160 Number of issues spawned. 

161 """ 

162 issues_spawned = 0 

163 

164 while True: 

165 logger.debug( 

166 "Loop iteration: active=%d completed=%d", 

167 len(self.active_tasks), 

168 len(self.completed_ids), 

169 ) 

170 # Check for abort 

171 if self.abort_run: 

172 await abort_callback() 

173 break 

174 

175 # Check if we've hit the issue limit 

176 limit_reached = ( 

177 self.config.max_issues is not None 

178 and issues_spawned >= self.config.max_issues 

179 ) 

180 

181 # Build suppress_warn_ids for only_ids mode 

182 suppress_warn_ids = None 

183 if self.config.only_ids: 

184 suppress_warn_ids = ( 

185 self.failed_issues 

186 | set(self.active_tasks.keys()) 

187 | self.completed_ids 

188 ) 

189 

190 # Fetch ready issues (unless we've hit the limit) 

191 ready = ( 

192 await self.beads.get_ready_async( 

193 self.failed_issues, 

194 epic_id=self.config.epic_id, 

195 only_ids=self.config.only_ids, 

196 suppress_warn_ids=suppress_warn_ids, 

197 prioritize_wip=self.config.prioritize_wip, 

198 focus=self.config.focus, 

199 orphans_only=self.config.orphans_only, 

200 ) 

201 if not limit_reached 

202 else [] 

203 ) 

204 

205 if ready: 

206 self.event_sink.on_ready_issues(list(ready)) 

207 

208 # Spawn agents while we have capacity and ready issues 

209 while ( 

210 self.config.max_agents is None 

211 or len(self.active_tasks) < self.config.max_agents 

212 ) and ready: 

213 issue_id = ready.pop(0) 

214 if issue_id not in self.active_tasks: 

215 task = await spawn_callback(issue_id) 

216 if task is not None: 

217 self.register_task(issue_id, task) 

218 issues_spawned += 1 

219 if ( 

220 self.config.max_issues is not None 

221 and issues_spawned >= self.config.max_issues 

222 ): 

223 break 

224 

225 # Exit if no active work 

226 if not self.active_tasks: 

227 if limit_reached: 

228 self.event_sink.on_no_more_issues( 

229 f"limit_reached ({self.config.max_issues})" 

230 ) 

231 elif not ready: 

232 self.event_sink.on_no_more_issues("none_ready") 

233 break 

234 

235 # Wait for at least one task to complete 

236 self.event_sink.on_waiting_for_agents(len(self.active_tasks)) 

237 done, _ = await asyncio.wait( 

238 self.active_tasks.values(), 

239 return_when=asyncio.FIRST_COMPLETED, 

240 ) 

241 

242 # Finalize completed tasks 

243 for task in done: 

244 for issue_id, t in list(self.active_tasks.items()): 

245 if t is task: 

246 await finalize_callback(issue_id, task) 

247 break 

248 

249 # Check for abort after processing completions 

250 if self.abort_run: 

251 await abort_callback() 

252 break 

253 

254 return issues_spawned 

255 

256 def register_task(self, issue_id: str, task: asyncio.Task) -> None: # type: ignore[type-arg] 

257 """Register an active task for an issue. 

258 

259 Called by spawn_callback after successfully creating a task. 

260 

261 Args: 

262 issue_id: The issue ID. 

263 task: The asyncio task running the agent. 

264 """ 

265 self.active_tasks[issue_id] = task 

266 logger.debug("Task registered: issue_id=%s", issue_id) 

267 

268 def mark_failed(self, issue_id: str) -> None: 

269 """Mark an issue as failed (e.g., claim failed). 

270 

271 Args: 

272 issue_id: The issue ID that failed. 

273 """ 

274 self.failed_issues.add(issue_id) 

275 logger.info("Issue marked failed: issue_id=%s", issue_id) 

276 

277 def mark_completed(self, issue_id: str) -> None: 

278 """Mark an issue as completed and remove from active. 

279 

280 Args: 

281 issue_id: The issue ID that completed. 

282 """ 

283 self.completed_ids.add(issue_id) 

284 self.active_tasks.pop(issue_id, None) 

285 logger.debug("Issue marked completed: issue_id=%s", issue_id)