Coverage for src / pipeline / run_coordinator.py: 26%
185 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""RunCoordinator: Main run orchestration pipeline stage.
3Extracted from MalaOrchestrator to separate run-level coordination from
4the main class. This module handles:
5- Main run loop (spawning agents, waiting for completion)
6- Run-level validation (Gate 4)
7- Fixer agent spawning for validation failures
9The RunCoordinator receives explicit inputs and returns explicit outputs,
10making it testable without SDK or subprocess dependencies.
12Design principles:
13- Protocol-based dependencies for testability
14- Explicit input/output types for clarity
15- Pure functions where possible
16"""
18from __future__ import annotations
20import asyncio
21import uuid
22from dataclasses import dataclass, field
23from typing import TYPE_CHECKING
25from src.infra.agent_runtime import AgentRuntimeBuilder
26from src.infra.tools.locking import cleanup_agent_locks
27from src.domain.validation.e2e import E2EStatus
28from src.domain.validation.spec import (
29 ValidationContext,
30 ValidationScope,
31 build_validation_spec,
32)
33from src.domain.validation.spec import extract_lint_tools_from_spec
34from src.domain.validation.spec_runner import SpecValidationRunner
36if TYPE_CHECKING:
37 from src.core.protocols import MalaEventSink, SDKClientFactoryProtocol
38 from src.infra.io.log_output.run_metadata import (
39 RunMetadata,
40 ValidationResult as MetaValidationResult,
41 )
42 from src.core.protocols import (
43 CommandRunnerPort,
44 EnvConfigPort,
45 GateChecker,
46 LockManagerPort,
47 )
48 from src.domain.validation.result import ValidationResult
49 from src.domain.validation.spec import ValidationSpec
50 from pathlib import Path
53class _FixerPromptNotSet:
54 """Sentinel indicating fixer_prompt was not set.
56 Raises RuntimeError if used, preventing silent failures from empty prompts.
57 """
59 def format(self, **kwargs: object) -> str:
60 raise RuntimeError(
61 "fixer_prompt not configured. "
62 "Pass fixer_prompt to RunCoordinatorConfig or use build_run_coordinator()."
63 )
66_FIXER_PROMPT_NOT_SET = _FixerPromptNotSet()
69@dataclass
70class RunCoordinatorConfig:
71 """Configuration for RunCoordinator.
73 Attributes:
74 repo_path: Path to the repository.
75 timeout_seconds: Session timeout in seconds.
76 max_gate_retries: Maximum gate retry attempts.
77 disable_validations: Set of validation names to disable.
78 coverage_threshold: Optional coverage threshold override.
79 fixer_prompt: Template for the fixer agent prompt.
80 """
82 repo_path: Path
83 timeout_seconds: int
84 max_gate_retries: int = 3
85 disable_validations: set[str] | None = None
86 coverage_threshold: float | None = None
87 fixer_prompt: str | _FixerPromptNotSet = _FIXER_PROMPT_NOT_SET
90@dataclass
91class RunLevelValidationInput:
92 """Input for run-level validation.
94 Attributes:
95 run_metadata: Run metadata tracker.
96 """
98 run_metadata: RunMetadata
101@dataclass
102class RunLevelValidationOutput:
103 """Output from run-level validation.
105 Attributes:
106 passed: Whether validation passed.
107 """
109 passed: bool
112@dataclass
113class SpecResultBuilder:
114 """Builds ValidationResult to MetaValidationResult conversions.
116 Encapsulates the logic for deriving e2e_passed status and building
117 metadata results from validation results.
118 """
120 @staticmethod
121 def derive_e2e_passed(result: ValidationResult) -> bool | None:
122 """Derive e2e_passed from E2E execution result.
124 Args:
125 result: ValidationResult to check.
127 Returns:
128 None if E2E was not executed (disabled or skipped)
129 True if E2E was executed and passed
130 False if E2E was executed and failed
131 """
132 if result.e2e_result is None:
133 return None
134 if result.e2e_result.status == E2EStatus.SKIPPED:
135 return None
136 return result.e2e_result.passed
138 @staticmethod
139 def build_meta_result(
140 result: ValidationResult,
141 passed: bool,
142 ) -> MetaValidationResult:
143 """Build a MetaValidationResult from a ValidationResult.
145 Args:
146 result: The validation result.
147 passed: Whether validation passed overall.
149 Returns:
150 MetaValidationResult for run metadata.
151 """
152 from src.infra.io.log_output.run_metadata import (
153 ValidationResult as MetaValidationResult,
154 )
156 e2e_passed = SpecResultBuilder.derive_e2e_passed(result)
157 failed_commands = (
158 [s.name for s in result.steps if not s.ok] if not passed else []
159 )
161 return MetaValidationResult(
162 passed=passed,
163 commands_run=[s.name for s in result.steps],
164 commands_failed=failed_commands,
165 artifacts=result.artifacts,
166 coverage_percent=result.coverage_result.percent
167 if result.coverage_result
168 else None,
169 e2e_passed=e2e_passed,
170 )
173@dataclass
174class RunCoordinator:
175 """Run-level coordination for MalaOrchestrator.
177 This class encapsulates the run-level orchestration logic that was
178 previously inline in MalaOrchestrator. It handles:
179 - Run-level validation (Gate 4) with fixer retries
180 - Fixer agent spawning
182 The orchestrator delegates to this class for run-level operations
183 while retaining per-issue coordination.
185 Attributes:
186 config: Configuration for run behavior.
187 gate_checker: GateChecker for run-level validation.
188 command_runner: CommandRunner for executing validation commands.
189 env_config: Environment configuration for paths.
190 lock_manager: Lock manager for file locking.
191 sdk_client_factory: Factory for creating SDK clients (required).
192 event_sink: Optional event sink for structured logging.
193 """
195 config: RunCoordinatorConfig
196 gate_checker: GateChecker
197 command_runner: CommandRunnerPort
198 env_config: EnvConfigPort
199 lock_manager: LockManagerPort
200 sdk_client_factory: SDKClientFactoryProtocol
201 event_sink: MalaEventSink | None = None
202 _active_fixer_ids: list[str] = field(default_factory=list, init=False)
204 async def run_validation(
205 self, input: RunLevelValidationInput
206 ) -> RunLevelValidationOutput:
207 """Run Gate 4 validation after all issues complete.
209 This runs validation with RUN_LEVEL scope, which includes E2E tests.
210 On failure, spawns a fixer agent and retries up to max_gate_retries.
212 Args:
213 input: RunLevelValidationInput with run metadata.
215 Returns:
216 RunLevelValidationOutput indicating pass/fail.
217 """
218 from src.infra.git_utils import get_git_commit_async
220 # Check if run-level validation is disabled
221 if "run-level-validate" in (self.config.disable_validations or set()):
222 if self.event_sink is not None:
223 self.event_sink.on_run_level_validation_disabled()
224 return RunLevelValidationOutput(passed=True)
226 # Get current HEAD commit
227 commit_hash = await get_git_commit_async(self.config.repo_path)
228 if not commit_hash:
229 if self.event_sink is not None:
230 self.event_sink.on_warning(
231 "Could not get HEAD commit for run-level validation"
232 )
233 return RunLevelValidationOutput(passed=True)
235 # Build run-level validation spec
236 spec = build_validation_spec(
237 self.config.repo_path,
238 scope=ValidationScope.RUN_LEVEL,
239 disable_validations=self.config.disable_validations,
240 )
242 # Build validation context
243 context = ValidationContext(
244 issue_id=None,
245 repo_path=self.config.repo_path,
246 commit_hash=commit_hash,
247 changed_files=[],
248 scope=ValidationScope.RUN_LEVEL,
249 )
251 # Create validation runner
252 runner = SpecValidationRunner(
253 self.config.repo_path,
254 event_sink=self.event_sink,
255 command_runner=self.command_runner,
256 env_config=self.env_config,
257 lock_manager=self.lock_manager,
258 )
260 # Retry loop with fixer agent
261 for attempt in range(1, self.config.max_gate_retries + 1):
262 if self.event_sink is not None:
263 self.event_sink.on_gate_started(
264 None, attempt, self.config.max_gate_retries
265 )
267 # Run validation
268 try:
269 result = await runner.run_spec(spec, context)
270 except Exception as e:
271 if self.event_sink is not None:
272 self.event_sink.on_warning(f"Validation runner error: {e}")
273 result = None
275 if result and result.passed:
276 if self.event_sink is not None:
277 self.event_sink.on_gate_passed(None)
278 meta_result = SpecResultBuilder.build_meta_result(result, passed=True)
279 input.run_metadata.record_run_validation(meta_result)
280 return RunLevelValidationOutput(passed=True)
282 # Validation failed - build failure output for fixer
283 failure_output = self._build_validation_failure_output(result)
285 # Record failure in metadata
286 if result:
287 meta_result = SpecResultBuilder.build_meta_result(result, passed=False)
288 input.run_metadata.record_run_validation(meta_result)
290 # Check if we have retries left
291 if attempt >= self.config.max_gate_retries:
292 if self.event_sink is not None:
293 self.event_sink.on_gate_failed(
294 None, attempt, self.config.max_gate_retries
295 )
296 failure_reasons = (
297 result.failure_reasons
298 if result and result.failure_reasons
299 else []
300 )
301 self.event_sink.on_gate_result(
302 None, passed=False, failure_reasons=failure_reasons
303 )
304 return RunLevelValidationOutput(passed=False)
306 # Spawn fixer agent
307 if self.event_sink is not None:
308 self.event_sink.on_fixer_started(attempt, self.config.max_gate_retries)
310 fixer_success = await self._run_fixer_agent(
311 failure_output=failure_output,
312 attempt=attempt,
313 spec=spec,
314 )
316 if not fixer_success:
317 # Fixer failure is logged via on_fixer_failed in _run_fixer_agent
318 pass # Continue to retry validation anyway
320 # Update commit hash for next validation attempt
321 new_commit = await get_git_commit_async(self.config.repo_path)
322 if new_commit and new_commit != commit_hash:
323 commit_hash = new_commit
324 context = ValidationContext(
325 issue_id=None,
326 repo_path=self.config.repo_path,
327 commit_hash=commit_hash,
328 changed_files=[],
329 scope=ValidationScope.RUN_LEVEL,
330 )
332 return RunLevelValidationOutput(passed=False)
334 def _build_validation_failure_output(self, result: ValidationResult | None) -> str:
335 """Build failure output string for fixer agent prompt.
337 Args:
338 result: Validation result, or None if validation crashed.
340 Returns:
341 Human-readable failure output.
342 """
343 if result is None:
344 return "Validation crashed - check logs for details."
346 lines: list[str] = []
347 if result.failure_reasons:
348 lines.append("**Failure reasons:**")
349 for reason in result.failure_reasons:
350 lines.append(f"- {reason}")
351 lines.append("")
353 # Add step details for failed steps
354 failed_steps = [s for s in result.steps if not s.ok]
355 if failed_steps:
356 lines.append("**Failed validation steps:**")
357 for step in failed_steps:
358 lines.append(f"\n### {step.name} (exit {step.returncode})")
359 if step.stderr_tail:
360 lines.append("```")
361 lines.append(step.stderr_tail[:2000])
362 lines.append("```")
363 elif step.stdout_tail:
364 lines.append("```")
365 lines.append(step.stdout_tail[:2000])
366 lines.append("```")
368 return (
369 "\n".join(lines) if lines else "Validation failed (no details available)."
370 )
372 async def _run_fixer_agent(
373 self,
374 failure_output: str,
375 attempt: int,
376 spec: ValidationSpec | None = None,
377 ) -> bool:
378 """Spawn a fixer agent to address run-level validation failures.
380 Args:
381 failure_output: Human-readable description of what failed.
382 attempt: Current attempt number.
383 spec: Optional ValidationSpec for extracting lint tool names.
385 Returns:
386 True if fixer agent completed successfully, False otherwise.
387 """
388 agent_id = f"fixer-{uuid.uuid4().hex[:8]}"
389 self._active_fixer_ids.append(agent_id)
391 prompt = self.config.fixer_prompt.format(
392 attempt=attempt,
393 max_attempts=self.config.max_gate_retries,
394 failure_output=failure_output,
395 )
397 fixer_cwd = self.config.repo_path
399 # Build runtime using AgentRuntimeBuilder
400 # Note: include_mala_disallowed_tools_hook=False matches original fixer behavior
401 lint_tools = extract_lint_tools_from_spec(spec)
402 runtime = (
403 AgentRuntimeBuilder(fixer_cwd, agent_id, self.sdk_client_factory)
404 .with_hooks(
405 deadlock_monitor=None,
406 include_stop_hook=True,
407 include_mala_disallowed_tools_hook=False,
408 )
409 .with_env()
410 .with_mcp()
411 .with_disallowed_tools()
412 .with_lint_tools(lint_tools)
413 .build()
414 )
415 client = self.sdk_client_factory.create(runtime.options)
417 pending_lint_commands: dict[str, tuple[str, str]] = {}
419 try:
420 async with asyncio.timeout(self.config.timeout_seconds):
421 async with client:
422 await client.query(prompt)
424 async for message in client.receive_response():
425 # Use duck typing to avoid SDK imports
426 msg_type = type(message).__name__
427 if msg_type == "AssistantMessage":
428 content = getattr(message, "content", [])
429 for block in content:
430 block_type = type(block).__name__
431 if block_type == "TextBlock":
432 text = getattr(block, "text", "")
433 if self.event_sink is not None:
434 self.event_sink.on_fixer_text(attempt, text)
435 elif block_type == "ToolUseBlock":
436 name = getattr(block, "name", "")
437 block_input = getattr(block, "input", {})
438 if self.event_sink is not None:
439 self.event_sink.on_fixer_tool_use(
440 attempt, name, block_input
441 )
442 if name.lower() == "bash":
443 cmd = block_input.get("command", "")
444 lint_type = (
445 runtime.lint_cache.detect_lint_command(cmd)
446 )
447 if lint_type:
448 block_id = getattr(block, "id", "")
449 pending_lint_commands[block_id] = (
450 lint_type,
451 cmd,
452 )
453 elif block_type == "ToolResultBlock":
454 tool_use_id = getattr(block, "tool_use_id", None)
455 if tool_use_id in pending_lint_commands:
456 lint_type, cmd = pending_lint_commands.pop(
457 tool_use_id
458 )
459 if not getattr(block, "is_error", False):
460 runtime.lint_cache.mark_success(
461 lint_type, cmd
462 )
463 elif msg_type == "ResultMessage":
464 result = getattr(message, "result", "") or ""
465 if self.event_sink is not None:
466 self.event_sink.on_fixer_completed(result)
468 return True
470 except TimeoutError:
471 if self.event_sink is not None:
472 self.event_sink.on_fixer_failed("timeout")
473 return False
474 except Exception as e:
475 if self.event_sink is not None:
476 self.event_sink.on_fixer_failed(str(e))
477 return False
478 finally:
479 self._active_fixer_ids.remove(agent_id)
480 cleanup_agent_locks(agent_id)
482 def cleanup_fixer_locks(self) -> None:
483 """Clean up any remaining fixer agent locks."""
484 for agent_id in self._active_fixer_ids:
485 cleanup_agent_locks(agent_id)
486 self._active_fixer_ids.clear()