Coverage for src / orchestration / orchestration_wiring.py: 82%
85 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""OrchestrationWiring: Pipeline component initialization and wiring.
3This module extracts pipeline component creation from MalaOrchestrator,
4providing a clean separation between wiring logic and runtime orchestration.
6The wiring module handles:
7- Creating and configuring pipeline runners (GateRunner, ReviewRunner, etc.)
8- Building callback structures for coordinators
9- Initializing the session callback factory
11Design principles:
12- Pure initialization logic, no runtime state
13- All dependencies passed explicitly
14- Returns fully configured components
15"""
17from __future__ import annotations
19from dataclasses import dataclass
20from typing import TYPE_CHECKING
22from src.infra.io.log_output.console import is_verbose_enabled
23from src.pipeline.agent_session_runner import AgentSessionConfig, SessionPrompts
24from src.pipeline.gate_runner import (
25 AsyncGateRunner,
26 GateRunner,
27 GateRunnerConfig,
28)
29from src.pipeline.issue_finalizer import IssueFinalizeCallbacks
30from src.pipeline.epic_verification_coordinator import EpicVerificationCallbacks
31from src.pipeline.review_runner import (
32 ReviewRunner,
33 ReviewRunnerConfig,
34)
35from src.pipeline.session_callback_factory import SessionCallbackFactory
36from src.pipeline.issue_execution_coordinator import (
37 CoordinatorConfig,
38 IssueExecutionCoordinator,
39)
40from src.pipeline.run_coordinator import (
41 RunCoordinator,
42 RunCoordinatorConfig,
43)
45if TYPE_CHECKING:
46 import asyncio
48 from collections.abc import Awaitable, Callable
49 from pathlib import Path
51 from src.core.protocols import (
52 CodeReviewer,
53 CommandRunnerPort,
54 EnvConfigPort,
55 GateChecker,
56 IssueProvider,
57 LockManagerPort,
58 MalaEventSink,
59 ReviewIssueProtocol,
60 SDKClientFactoryProtocol,
61 )
62 from src.domain.prompts import (
63 PromptProvider,
64 PromptValidationCommands,
65 )
66 from src.infra.epic_verifier import EpicVerificationResult
67 from src.infra.io.config import MalaConfig
68 from src.domain.deadlock import DeadlockMonitor
69 from src.infra.io.log_output.run_metadata import RunMetadata
70 from src.pipeline.issue_result import IssueResult
73@dataclass
74class WiringDependencies:
75 """Dependencies required for pipeline component wiring.
77 All dependencies are provided by the orchestrator factory and passed
78 through to the wiring functions.
79 """
81 repo_path: Path
82 quality_gate: GateChecker
83 code_reviewer: CodeReviewer
84 beads: IssueProvider
85 event_sink: MalaEventSink
86 mala_config: MalaConfig
87 command_runner: CommandRunnerPort
88 env_config: EnvConfigPort
89 lock_manager: LockManagerPort
90 # Config values
91 max_agents: int | None
92 max_issues: int | None
93 timeout_seconds: int
94 max_gate_retries: int
95 max_review_retries: int
96 coverage_threshold: float | None
97 disabled_validations: set[str] | None
98 epic_id: str | None
99 only_ids: set[str] | None
100 prioritize_wip: bool
101 focus: bool
102 orphans_only: bool
103 epic_override_ids: set[str]
104 prompt_validation_commands: PromptValidationCommands
105 prompts: PromptProvider
106 context_restart_threshold: float
107 context_limit: int
108 # Deadlock detection (None until T004 wires DeadlockMonitor into orchestrator)
109 deadlock_monitor: DeadlockMonitor | None = None
112def build_gate_runner(deps: WiringDependencies) -> tuple[GateRunner, AsyncGateRunner]:
113 """Build GateRunner and AsyncGateRunner."""
114 config = GateRunnerConfig(
115 max_gate_retries=deps.max_gate_retries,
116 disable_validations=deps.disabled_validations,
117 coverage_threshold=deps.coverage_threshold,
118 )
119 gate_runner = GateRunner(
120 gate_checker=deps.quality_gate,
121 repo_path=deps.repo_path,
122 config=config,
123 )
124 async_gate_runner = AsyncGateRunner(gate_runner=gate_runner)
125 return gate_runner, async_gate_runner
128def build_review_runner(deps: WiringDependencies) -> ReviewRunner:
129 """Build ReviewRunner."""
130 config = ReviewRunnerConfig(
131 max_review_retries=deps.max_review_retries,
132 capture_session_log=False,
133 review_timeout=deps.mala_config.review_timeout,
134 )
135 return ReviewRunner(
136 code_reviewer=deps.code_reviewer,
137 config=config,
138 gate_checker=deps.quality_gate,
139 )
142def build_run_coordinator(
143 deps: WiringDependencies,
144 sdk_client_factory: SDKClientFactoryProtocol,
145) -> RunCoordinator:
146 """Build RunCoordinator."""
147 config = RunCoordinatorConfig(
148 repo_path=deps.repo_path,
149 timeout_seconds=deps.timeout_seconds,
150 max_gate_retries=deps.max_gate_retries,
151 disable_validations=deps.disabled_validations,
152 coverage_threshold=deps.coverage_threshold,
153 fixer_prompt=deps.prompts.fixer_prompt,
154 )
155 return RunCoordinator(
156 config=config,
157 gate_checker=deps.quality_gate,
158 command_runner=deps.command_runner,
159 env_config=deps.env_config,
160 lock_manager=deps.lock_manager,
161 sdk_client_factory=sdk_client_factory,
162 event_sink=deps.event_sink,
163 )
166def build_issue_coordinator(deps: WiringDependencies) -> IssueExecutionCoordinator:
167 """Build IssueExecutionCoordinator."""
168 config = CoordinatorConfig(
169 max_agents=deps.max_agents,
170 max_issues=deps.max_issues,
171 epic_id=deps.epic_id,
172 only_ids=deps.only_ids,
173 prioritize_wip=deps.prioritize_wip,
174 focus=deps.focus,
175 orphans_only=deps.orphans_only,
176 )
177 return IssueExecutionCoordinator(
178 beads=deps.beads,
179 event_sink=deps.event_sink,
180 config=config,
181 )
184@dataclass
185class FinalizerCallbackRefs:
186 """References for building finalizer callbacks.
188 These are callable getters that allow late binding to orchestrator state.
189 """
191 close_issue: Callable[[str], Awaitable[bool]]
192 mark_needs_followup: Callable[[str, str, Path | None], Awaitable[None]]
193 on_issue_closed: Callable[[str, str], None]
194 on_issue_completed: Callable[[str, str, bool, float, str], None]
195 trigger_epic_closure: Callable[[str, RunMetadata], Awaitable[None]]
196 create_tracking_issues: Callable[[str, list[ReviewIssueProtocol]], Awaitable[None]]
199def build_finalizer_callbacks(refs: FinalizerCallbackRefs) -> IssueFinalizeCallbacks:
200 """Build IssueFinalizeCallbacks from callback references."""
201 return IssueFinalizeCallbacks(
202 close_issue=refs.close_issue,
203 mark_needs_followup=refs.mark_needs_followup,
204 on_issue_closed=refs.on_issue_closed,
205 on_issue_completed=refs.on_issue_completed,
206 trigger_epic_closure=refs.trigger_epic_closure,
207 create_tracking_issues=refs.create_tracking_issues,
208 )
211@dataclass
212class EpicCallbackRefs:
213 """References for building epic verification callbacks."""
215 get_parent_epic: Callable[[str], Awaitable[str | None]]
216 verify_epic: Callable[[str, bool], Awaitable[EpicVerificationResult]]
217 spawn_remediation: Callable[[str], Awaitable[asyncio.Task[IssueResult] | None]]
218 finalize_remediation: Callable[[str, IssueResult, RunMetadata], Awaitable[None]]
219 mark_completed: Callable[[str], None]
220 is_issue_failed: Callable[[str], bool]
221 close_eligible_epics: Callable[[], Awaitable[bool]]
222 on_epic_closed: Callable[[str], None]
223 on_warning: Callable[[str], None]
224 has_epic_verifier: Callable[[], bool]
225 get_agent_id: Callable[[str], str]
228def build_epic_callbacks(refs: EpicCallbackRefs) -> EpicVerificationCallbacks:
229 """Build EpicVerificationCallbacks from callback references."""
230 return EpicVerificationCallbacks(
231 get_parent_epic=refs.get_parent_epic,
232 verify_epic=refs.verify_epic,
233 spawn_remediation=refs.spawn_remediation,
234 finalize_remediation=refs.finalize_remediation,
235 mark_completed=refs.mark_completed,
236 is_issue_failed=refs.is_issue_failed,
237 close_eligible_epics=refs.close_eligible_epics,
238 on_epic_closed=refs.on_epic_closed,
239 on_warning=refs.on_warning,
240 has_epic_verifier=refs.has_epic_verifier,
241 get_agent_id=refs.get_agent_id,
242 )
245def build_session_callback_factory(
246 deps: WiringDependencies,
247 async_gate_runner: AsyncGateRunner,
248 review_runner: ReviewRunner,
249 log_provider_getter: Callable,
250 quality_gate_getter: Callable,
251 on_session_log_path: Callable[[str, Path], None],
252 on_review_log_path: Callable[[str, str], None],
253) -> SessionCallbackFactory:
254 """Build SessionCallbackFactory."""
255 return SessionCallbackFactory(
256 gate_async_runner=async_gate_runner,
257 review_runner=review_runner,
258 log_provider=log_provider_getter,
259 event_sink=lambda: deps.event_sink,
260 quality_gate=quality_gate_getter,
261 repo_path=deps.repo_path,
262 on_session_log_path=on_session_log_path,
263 on_review_log_path=on_review_log_path,
264 get_per_issue_spec=lambda: async_gate_runner.per_issue_spec,
265 is_verbose=is_verbose_enabled,
266 )
269def build_session_config(
270 deps: WiringDependencies,
271 review_enabled: bool,
272) -> AgentSessionConfig:
273 """Build AgentSessionConfig for agent sessions."""
274 prompts = SessionPrompts(
275 gate_followup=deps.prompts.gate_followup_prompt,
276 review_followup=deps.prompts.review_followup_prompt,
277 idle_resume=deps.prompts.idle_resume_prompt,
278 checkpoint_request=deps.prompts.checkpoint_request_prompt,
279 continuation=deps.prompts.continuation_prompt,
280 )
281 return AgentSessionConfig(
282 repo_path=deps.repo_path,
283 timeout_seconds=deps.timeout_seconds,
284 prompts=prompts,
285 max_gate_retries=deps.max_gate_retries,
286 max_review_retries=deps.max_review_retries,
287 review_enabled=review_enabled,
288 lint_tools=None, # Set at run start
289 prompt_validation_commands=deps.prompt_validation_commands,
290 context_restart_threshold=deps.context_restart_threshold,
291 context_limit=deps.context_limit,
292 deadlock_monitor=deps.deadlock_monitor,
293 )