Coverage for src / orchestration / orchestration_wiring.py: 82%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""OrchestrationWiring: Pipeline component initialization and wiring. 

2 

3This module extracts pipeline component creation from MalaOrchestrator, 

4providing a clean separation between wiring logic and runtime orchestration. 

5 

6The wiring module handles: 

7- Creating and configuring pipeline runners (GateRunner, ReviewRunner, etc.) 

8- Building callback structures for coordinators 

9- Initializing the session callback factory 

10 

11Design principles: 

12- Pure initialization logic, no runtime state 

13- All dependencies passed explicitly 

14- Returns fully configured components 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass 

20from typing import TYPE_CHECKING 

21 

22from src.infra.io.log_output.console import is_verbose_enabled 

23from src.pipeline.agent_session_runner import AgentSessionConfig, SessionPrompts 

24from src.pipeline.gate_runner import ( 

25 AsyncGateRunner, 

26 GateRunner, 

27 GateRunnerConfig, 

28) 

29from src.pipeline.issue_finalizer import IssueFinalizeCallbacks 

30from src.pipeline.epic_verification_coordinator import EpicVerificationCallbacks 

31from src.pipeline.review_runner import ( 

32 ReviewRunner, 

33 ReviewRunnerConfig, 

34) 

35from src.pipeline.session_callback_factory import SessionCallbackFactory 

36from src.pipeline.issue_execution_coordinator import ( 

37 CoordinatorConfig, 

38 IssueExecutionCoordinator, 

39) 

40from src.pipeline.run_coordinator import ( 

41 RunCoordinator, 

42 RunCoordinatorConfig, 

43) 

44 

45if TYPE_CHECKING: 

46 import asyncio 

47 

48 from collections.abc import Awaitable, Callable 

49 from pathlib import Path 

50 

51 from src.core.protocols import ( 

52 CodeReviewer, 

53 CommandRunnerPort, 

54 EnvConfigPort, 

55 GateChecker, 

56 IssueProvider, 

57 LockManagerPort, 

58 MalaEventSink, 

59 ReviewIssueProtocol, 

60 SDKClientFactoryProtocol, 

61 ) 

62 from src.domain.prompts import ( 

63 PromptProvider, 

64 PromptValidationCommands, 

65 ) 

66 from src.infra.epic_verifier import EpicVerificationResult 

67 from src.infra.io.config import MalaConfig 

68 from src.domain.deadlock import DeadlockMonitor 

69 from src.infra.io.log_output.run_metadata import RunMetadata 

70 from src.pipeline.issue_result import IssueResult 

71 

72 

73@dataclass 

74class WiringDependencies: 

75 """Dependencies required for pipeline component wiring. 

76 

77 All dependencies are provided by the orchestrator factory and passed 

78 through to the wiring functions. 

79 """ 

80 

81 repo_path: Path 

82 quality_gate: GateChecker 

83 code_reviewer: CodeReviewer 

84 beads: IssueProvider 

85 event_sink: MalaEventSink 

86 mala_config: MalaConfig 

87 command_runner: CommandRunnerPort 

88 env_config: EnvConfigPort 

89 lock_manager: LockManagerPort 

90 # Config values 

91 max_agents: int | None 

92 max_issues: int | None 

93 timeout_seconds: int 

94 max_gate_retries: int 

95 max_review_retries: int 

96 coverage_threshold: float | None 

97 disabled_validations: set[str] | None 

98 epic_id: str | None 

99 only_ids: set[str] | None 

100 prioritize_wip: bool 

101 focus: bool 

102 orphans_only: bool 

103 epic_override_ids: set[str] 

104 prompt_validation_commands: PromptValidationCommands 

105 prompts: PromptProvider 

106 context_restart_threshold: float 

107 context_limit: int 

108 # Deadlock detection (None until T004 wires DeadlockMonitor into orchestrator) 

109 deadlock_monitor: DeadlockMonitor | None = None 

110 

111 

112def build_gate_runner(deps: WiringDependencies) -> tuple[GateRunner, AsyncGateRunner]: 

113 """Build GateRunner and AsyncGateRunner.""" 

114 config = GateRunnerConfig( 

115 max_gate_retries=deps.max_gate_retries, 

116 disable_validations=deps.disabled_validations, 

117 coverage_threshold=deps.coverage_threshold, 

118 ) 

119 gate_runner = GateRunner( 

120 gate_checker=deps.quality_gate, 

121 repo_path=deps.repo_path, 

122 config=config, 

123 ) 

124 async_gate_runner = AsyncGateRunner(gate_runner=gate_runner) 

125 return gate_runner, async_gate_runner 

126 

127 

128def build_review_runner(deps: WiringDependencies) -> ReviewRunner: 

129 """Build ReviewRunner.""" 

130 config = ReviewRunnerConfig( 

131 max_review_retries=deps.max_review_retries, 

132 capture_session_log=False, 

133 review_timeout=deps.mala_config.review_timeout, 

134 ) 

135 return ReviewRunner( 

136 code_reviewer=deps.code_reviewer, 

137 config=config, 

138 gate_checker=deps.quality_gate, 

139 ) 

140 

141 

142def build_run_coordinator( 

143 deps: WiringDependencies, 

144 sdk_client_factory: SDKClientFactoryProtocol, 

145) -> RunCoordinator: 

146 """Build RunCoordinator.""" 

147 config = RunCoordinatorConfig( 

148 repo_path=deps.repo_path, 

149 timeout_seconds=deps.timeout_seconds, 

150 max_gate_retries=deps.max_gate_retries, 

151 disable_validations=deps.disabled_validations, 

152 coverage_threshold=deps.coverage_threshold, 

153 fixer_prompt=deps.prompts.fixer_prompt, 

154 ) 

155 return RunCoordinator( 

156 config=config, 

157 gate_checker=deps.quality_gate, 

158 command_runner=deps.command_runner, 

159 env_config=deps.env_config, 

160 lock_manager=deps.lock_manager, 

161 sdk_client_factory=sdk_client_factory, 

162 event_sink=deps.event_sink, 

163 ) 

164 

165 

166def build_issue_coordinator(deps: WiringDependencies) -> IssueExecutionCoordinator: 

167 """Build IssueExecutionCoordinator.""" 

168 config = CoordinatorConfig( 

169 max_agents=deps.max_agents, 

170 max_issues=deps.max_issues, 

171 epic_id=deps.epic_id, 

172 only_ids=deps.only_ids, 

173 prioritize_wip=deps.prioritize_wip, 

174 focus=deps.focus, 

175 orphans_only=deps.orphans_only, 

176 ) 

177 return IssueExecutionCoordinator( 

178 beads=deps.beads, 

179 event_sink=deps.event_sink, 

180 config=config, 

181 ) 

182 

183 

184@dataclass 

185class FinalizerCallbackRefs: 

186 """References for building finalizer callbacks. 

187 

188 These are callable getters that allow late binding to orchestrator state. 

189 """ 

190 

191 close_issue: Callable[[str], Awaitable[bool]] 

192 mark_needs_followup: Callable[[str, str, Path | None], Awaitable[None]] 

193 on_issue_closed: Callable[[str, str], None] 

194 on_issue_completed: Callable[[str, str, bool, float, str], None] 

195 trigger_epic_closure: Callable[[str, RunMetadata], Awaitable[None]] 

196 create_tracking_issues: Callable[[str, list[ReviewIssueProtocol]], Awaitable[None]] 

197 

198 

199def build_finalizer_callbacks(refs: FinalizerCallbackRefs) -> IssueFinalizeCallbacks: 

200 """Build IssueFinalizeCallbacks from callback references.""" 

201 return IssueFinalizeCallbacks( 

202 close_issue=refs.close_issue, 

203 mark_needs_followup=refs.mark_needs_followup, 

204 on_issue_closed=refs.on_issue_closed, 

205 on_issue_completed=refs.on_issue_completed, 

206 trigger_epic_closure=refs.trigger_epic_closure, 

207 create_tracking_issues=refs.create_tracking_issues, 

208 ) 

209 

210 

211@dataclass 

212class EpicCallbackRefs: 

213 """References for building epic verification callbacks.""" 

214 

215 get_parent_epic: Callable[[str], Awaitable[str | None]] 

216 verify_epic: Callable[[str, bool], Awaitable[EpicVerificationResult]] 

217 spawn_remediation: Callable[[str], Awaitable[asyncio.Task[IssueResult] | None]] 

218 finalize_remediation: Callable[[str, IssueResult, RunMetadata], Awaitable[None]] 

219 mark_completed: Callable[[str], None] 

220 is_issue_failed: Callable[[str], bool] 

221 close_eligible_epics: Callable[[], Awaitable[bool]] 

222 on_epic_closed: Callable[[str], None] 

223 on_warning: Callable[[str], None] 

224 has_epic_verifier: Callable[[], bool] 

225 get_agent_id: Callable[[str], str] 

226 

227 

228def build_epic_callbacks(refs: EpicCallbackRefs) -> EpicVerificationCallbacks: 

229 """Build EpicVerificationCallbacks from callback references.""" 

230 return EpicVerificationCallbacks( 

231 get_parent_epic=refs.get_parent_epic, 

232 verify_epic=refs.verify_epic, 

233 spawn_remediation=refs.spawn_remediation, 

234 finalize_remediation=refs.finalize_remediation, 

235 mark_completed=refs.mark_completed, 

236 is_issue_failed=refs.is_issue_failed, 

237 close_eligible_epics=refs.close_eligible_epics, 

238 on_epic_closed=refs.on_epic_closed, 

239 on_warning=refs.on_warning, 

240 has_epic_verifier=refs.has_epic_verifier, 

241 get_agent_id=refs.get_agent_id, 

242 ) 

243 

244 

245def build_session_callback_factory( 

246 deps: WiringDependencies, 

247 async_gate_runner: AsyncGateRunner, 

248 review_runner: ReviewRunner, 

249 log_provider_getter: Callable, 

250 quality_gate_getter: Callable, 

251 on_session_log_path: Callable[[str, Path], None], 

252 on_review_log_path: Callable[[str, str], None], 

253) -> SessionCallbackFactory: 

254 """Build SessionCallbackFactory.""" 

255 return SessionCallbackFactory( 

256 gate_async_runner=async_gate_runner, 

257 review_runner=review_runner, 

258 log_provider=log_provider_getter, 

259 event_sink=lambda: deps.event_sink, 

260 quality_gate=quality_gate_getter, 

261 repo_path=deps.repo_path, 

262 on_session_log_path=on_session_log_path, 

263 on_review_log_path=on_review_log_path, 

264 get_per_issue_spec=lambda: async_gate_runner.per_issue_spec, 

265 is_verbose=is_verbose_enabled, 

266 ) 

267 

268 

269def build_session_config( 

270 deps: WiringDependencies, 

271 review_enabled: bool, 

272) -> AgentSessionConfig: 

273 """Build AgentSessionConfig for agent sessions.""" 

274 prompts = SessionPrompts( 

275 gate_followup=deps.prompts.gate_followup_prompt, 

276 review_followup=deps.prompts.review_followup_prompt, 

277 idle_resume=deps.prompts.idle_resume_prompt, 

278 checkpoint_request=deps.prompts.checkpoint_request_prompt, 

279 continuation=deps.prompts.continuation_prompt, 

280 ) 

281 return AgentSessionConfig( 

282 repo_path=deps.repo_path, 

283 timeout_seconds=deps.timeout_seconds, 

284 prompts=prompts, 

285 max_gate_retries=deps.max_gate_retries, 

286 max_review_retries=deps.max_review_retries, 

287 review_enabled=review_enabled, 

288 lint_tools=None, # Set at run start 

289 prompt_validation_commands=deps.prompt_validation_commands, 

290 context_restart_threshold=deps.context_restart_threshold, 

291 context_limit=deps.context_limit, 

292 deadlock_monitor=deps.deadlock_monitor, 

293 )