Coverage for src / pipeline / run_coordinator.py: 26%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""RunCoordinator: Main run orchestration pipeline stage. 

2 

3Extracted from MalaOrchestrator to separate run-level coordination from 

4the main class. This module handles: 

5- Main run loop (spawning agents, waiting for completion) 

6- Run-level validation (Gate 4) 

7- Fixer agent spawning for validation failures 

8 

9The RunCoordinator receives explicit inputs and returns explicit outputs, 

10making it testable without SDK or subprocess dependencies. 

11 

12Design principles: 

13- Protocol-based dependencies for testability 

14- Explicit input/output types for clarity 

15- Pure functions where possible 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import uuid 

22from dataclasses import dataclass, field 

23from typing import TYPE_CHECKING 

24 

25from src.infra.agent_runtime import AgentRuntimeBuilder 

26from src.infra.tools.locking import cleanup_agent_locks 

27from src.domain.validation.e2e import E2EStatus 

28from src.domain.validation.spec import ( 

29 ValidationContext, 

30 ValidationScope, 

31 build_validation_spec, 

32) 

33from src.domain.validation.spec import extract_lint_tools_from_spec 

34from src.domain.validation.spec_runner import SpecValidationRunner 

35 

36if TYPE_CHECKING: 

37 from src.core.protocols import MalaEventSink, SDKClientFactoryProtocol 

38 from src.infra.io.log_output.run_metadata import ( 

39 RunMetadata, 

40 ValidationResult as MetaValidationResult, 

41 ) 

42 from src.core.protocols import ( 

43 CommandRunnerPort, 

44 EnvConfigPort, 

45 GateChecker, 

46 LockManagerPort, 

47 ) 

48 from src.domain.validation.result import ValidationResult 

49 from src.domain.validation.spec import ValidationSpec 

50 from pathlib import Path 

51 

52 

53class _FixerPromptNotSet: 

54 """Sentinel indicating fixer_prompt was not set. 

55 

56 Raises RuntimeError if used, preventing silent failures from empty prompts. 

57 """ 

58 

59 def format(self, **kwargs: object) -> str: 

60 raise RuntimeError( 

61 "fixer_prompt not configured. " 

62 "Pass fixer_prompt to RunCoordinatorConfig or use build_run_coordinator()." 

63 ) 

64 

65 

66_FIXER_PROMPT_NOT_SET = _FixerPromptNotSet() 

67 

68 

69@dataclass 

70class RunCoordinatorConfig: 

71 """Configuration for RunCoordinator. 

72 

73 Attributes: 

74 repo_path: Path to the repository. 

75 timeout_seconds: Session timeout in seconds. 

76 max_gate_retries: Maximum gate retry attempts. 

77 disable_validations: Set of validation names to disable. 

78 coverage_threshold: Optional coverage threshold override. 

79 fixer_prompt: Template for the fixer agent prompt. 

80 """ 

81 

82 repo_path: Path 

83 timeout_seconds: int 

84 max_gate_retries: int = 3 

85 disable_validations: set[str] | None = None 

86 coverage_threshold: float | None = None 

87 fixer_prompt: str | _FixerPromptNotSet = _FIXER_PROMPT_NOT_SET 

88 

89 

90@dataclass 

91class RunLevelValidationInput: 

92 """Input for run-level validation. 

93 

94 Attributes: 

95 run_metadata: Run metadata tracker. 

96 """ 

97 

98 run_metadata: RunMetadata 

99 

100 

101@dataclass 

102class RunLevelValidationOutput: 

103 """Output from run-level validation. 

104 

105 Attributes: 

106 passed: Whether validation passed. 

107 """ 

108 

109 passed: bool 

110 

111 

112@dataclass 

113class SpecResultBuilder: 

114 """Builds ValidationResult to MetaValidationResult conversions. 

115 

116 Encapsulates the logic for deriving e2e_passed status and building 

117 metadata results from validation results. 

118 """ 

119 

120 @staticmethod 

121 def derive_e2e_passed(result: ValidationResult) -> bool | None: 

122 """Derive e2e_passed from E2E execution result. 

123 

124 Args: 

125 result: ValidationResult to check. 

126 

127 Returns: 

128 None if E2E was not executed (disabled or skipped) 

129 True if E2E was executed and passed 

130 False if E2E was executed and failed 

131 """ 

132 if result.e2e_result is None: 

133 return None 

134 if result.e2e_result.status == E2EStatus.SKIPPED: 

135 return None 

136 return result.e2e_result.passed 

137 

138 @staticmethod 

139 def build_meta_result( 

140 result: ValidationResult, 

141 passed: bool, 

142 ) -> MetaValidationResult: 

143 """Build a MetaValidationResult from a ValidationResult. 

144 

145 Args: 

146 result: The validation result. 

147 passed: Whether validation passed overall. 

148 

149 Returns: 

150 MetaValidationResult for run metadata. 

151 """ 

152 from src.infra.io.log_output.run_metadata import ( 

153 ValidationResult as MetaValidationResult, 

154 ) 

155 

156 e2e_passed = SpecResultBuilder.derive_e2e_passed(result) 

157 failed_commands = ( 

158 [s.name for s in result.steps if not s.ok] if not passed else [] 

159 ) 

160 

161 return MetaValidationResult( 

162 passed=passed, 

163 commands_run=[s.name for s in result.steps], 

164 commands_failed=failed_commands, 

165 artifacts=result.artifacts, 

166 coverage_percent=result.coverage_result.percent 

167 if result.coverage_result 

168 else None, 

169 e2e_passed=e2e_passed, 

170 ) 

171 

172 

173@dataclass 

174class RunCoordinator: 

175 """Run-level coordination for MalaOrchestrator. 

176 

177 This class encapsulates the run-level orchestration logic that was 

178 previously inline in MalaOrchestrator. It handles: 

179 - Run-level validation (Gate 4) with fixer retries 

180 - Fixer agent spawning 

181 

182 The orchestrator delegates to this class for run-level operations 

183 while retaining per-issue coordination. 

184 

185 Attributes: 

186 config: Configuration for run behavior. 

187 gate_checker: GateChecker for run-level validation. 

188 command_runner: CommandRunner for executing validation commands. 

189 env_config: Environment configuration for paths. 

190 lock_manager: Lock manager for file locking. 

191 sdk_client_factory: Factory for creating SDK clients (required). 

192 event_sink: Optional event sink for structured logging. 

193 """ 

194 

195 config: RunCoordinatorConfig 

196 gate_checker: GateChecker 

197 command_runner: CommandRunnerPort 

198 env_config: EnvConfigPort 

199 lock_manager: LockManagerPort 

200 sdk_client_factory: SDKClientFactoryProtocol 

201 event_sink: MalaEventSink | None = None 

202 _active_fixer_ids: list[str] = field(default_factory=list, init=False) 

203 

204 async def run_validation( 

205 self, input: RunLevelValidationInput 

206 ) -> RunLevelValidationOutput: 

207 """Run Gate 4 validation after all issues complete. 

208 

209 This runs validation with RUN_LEVEL scope, which includes E2E tests. 

210 On failure, spawns a fixer agent and retries up to max_gate_retries. 

211 

212 Args: 

213 input: RunLevelValidationInput with run metadata. 

214 

215 Returns: 

216 RunLevelValidationOutput indicating pass/fail. 

217 """ 

218 from src.infra.git_utils import get_git_commit_async 

219 

220 # Check if run-level validation is disabled 

221 if "run-level-validate" in (self.config.disable_validations or set()): 

222 if self.event_sink is not None: 

223 self.event_sink.on_run_level_validation_disabled() 

224 return RunLevelValidationOutput(passed=True) 

225 

226 # Get current HEAD commit 

227 commit_hash = await get_git_commit_async(self.config.repo_path) 

228 if not commit_hash: 

229 if self.event_sink is not None: 

230 self.event_sink.on_warning( 

231 "Could not get HEAD commit for run-level validation" 

232 ) 

233 return RunLevelValidationOutput(passed=True) 

234 

235 # Build run-level validation spec 

236 spec = build_validation_spec( 

237 self.config.repo_path, 

238 scope=ValidationScope.RUN_LEVEL, 

239 disable_validations=self.config.disable_validations, 

240 ) 

241 

242 # Build validation context 

243 context = ValidationContext( 

244 issue_id=None, 

245 repo_path=self.config.repo_path, 

246 commit_hash=commit_hash, 

247 changed_files=[], 

248 scope=ValidationScope.RUN_LEVEL, 

249 ) 

250 

251 # Create validation runner 

252 runner = SpecValidationRunner( 

253 self.config.repo_path, 

254 event_sink=self.event_sink, 

255 command_runner=self.command_runner, 

256 env_config=self.env_config, 

257 lock_manager=self.lock_manager, 

258 ) 

259 

260 # Retry loop with fixer agent 

261 for attempt in range(1, self.config.max_gate_retries + 1): 

262 if self.event_sink is not None: 

263 self.event_sink.on_gate_started( 

264 None, attempt, self.config.max_gate_retries 

265 ) 

266 

267 # Run validation 

268 try: 

269 result = await runner.run_spec(spec, context) 

270 except Exception as e: 

271 if self.event_sink is not None: 

272 self.event_sink.on_warning(f"Validation runner error: {e}") 

273 result = None 

274 

275 if result and result.passed: 

276 if self.event_sink is not None: 

277 self.event_sink.on_gate_passed(None) 

278 meta_result = SpecResultBuilder.build_meta_result(result, passed=True) 

279 input.run_metadata.record_run_validation(meta_result) 

280 return RunLevelValidationOutput(passed=True) 

281 

282 # Validation failed - build failure output for fixer 

283 failure_output = self._build_validation_failure_output(result) 

284 

285 # Record failure in metadata 

286 if result: 

287 meta_result = SpecResultBuilder.build_meta_result(result, passed=False) 

288 input.run_metadata.record_run_validation(meta_result) 

289 

290 # Check if we have retries left 

291 if attempt >= self.config.max_gate_retries: 

292 if self.event_sink is not None: 

293 self.event_sink.on_gate_failed( 

294 None, attempt, self.config.max_gate_retries 

295 ) 

296 failure_reasons = ( 

297 result.failure_reasons 

298 if result and result.failure_reasons 

299 else [] 

300 ) 

301 self.event_sink.on_gate_result( 

302 None, passed=False, failure_reasons=failure_reasons 

303 ) 

304 return RunLevelValidationOutput(passed=False) 

305 

306 # Spawn fixer agent 

307 if self.event_sink is not None: 

308 self.event_sink.on_fixer_started(attempt, self.config.max_gate_retries) 

309 

310 fixer_success = await self._run_fixer_agent( 

311 failure_output=failure_output, 

312 attempt=attempt, 

313 spec=spec, 

314 ) 

315 

316 if not fixer_success: 

317 # Fixer failure is logged via on_fixer_failed in _run_fixer_agent 

318 pass # Continue to retry validation anyway 

319 

320 # Update commit hash for next validation attempt 

321 new_commit = await get_git_commit_async(self.config.repo_path) 

322 if new_commit and new_commit != commit_hash: 

323 commit_hash = new_commit 

324 context = ValidationContext( 

325 issue_id=None, 

326 repo_path=self.config.repo_path, 

327 commit_hash=commit_hash, 

328 changed_files=[], 

329 scope=ValidationScope.RUN_LEVEL, 

330 ) 

331 

332 return RunLevelValidationOutput(passed=False) 

333 

334 def _build_validation_failure_output(self, result: ValidationResult | None) -> str: 

335 """Build failure output string for fixer agent prompt. 

336 

337 Args: 

338 result: Validation result, or None if validation crashed. 

339 

340 Returns: 

341 Human-readable failure output. 

342 """ 

343 if result is None: 

344 return "Validation crashed - check logs for details." 

345 

346 lines: list[str] = [] 

347 if result.failure_reasons: 

348 lines.append("**Failure reasons:**") 

349 for reason in result.failure_reasons: 

350 lines.append(f"- {reason}") 

351 lines.append("") 

352 

353 # Add step details for failed steps 

354 failed_steps = [s for s in result.steps if not s.ok] 

355 if failed_steps: 

356 lines.append("**Failed validation steps:**") 

357 for step in failed_steps: 

358 lines.append(f"\n### {step.name} (exit {step.returncode})") 

359 if step.stderr_tail: 

360 lines.append("```") 

361 lines.append(step.stderr_tail[:2000]) 

362 lines.append("```") 

363 elif step.stdout_tail: 

364 lines.append("```") 

365 lines.append(step.stdout_tail[:2000]) 

366 lines.append("```") 

367 

368 return ( 

369 "\n".join(lines) if lines else "Validation failed (no details available)." 

370 ) 

371 

372 async def _run_fixer_agent( 

373 self, 

374 failure_output: str, 

375 attempt: int, 

376 spec: ValidationSpec | None = None, 

377 ) -> bool: 

378 """Spawn a fixer agent to address run-level validation failures. 

379 

380 Args: 

381 failure_output: Human-readable description of what failed. 

382 attempt: Current attempt number. 

383 spec: Optional ValidationSpec for extracting lint tool names. 

384 

385 Returns: 

386 True if fixer agent completed successfully, False otherwise. 

387 """ 

388 agent_id = f"fixer-{uuid.uuid4().hex[:8]}" 

389 self._active_fixer_ids.append(agent_id) 

390 

391 prompt = self.config.fixer_prompt.format( 

392 attempt=attempt, 

393 max_attempts=self.config.max_gate_retries, 

394 failure_output=failure_output, 

395 ) 

396 

397 fixer_cwd = self.config.repo_path 

398 

399 # Build runtime using AgentRuntimeBuilder 

400 # Note: include_mala_disallowed_tools_hook=False matches original fixer behavior 

401 lint_tools = extract_lint_tools_from_spec(spec) 

402 runtime = ( 

403 AgentRuntimeBuilder(fixer_cwd, agent_id, self.sdk_client_factory) 

404 .with_hooks( 

405 deadlock_monitor=None, 

406 include_stop_hook=True, 

407 include_mala_disallowed_tools_hook=False, 

408 ) 

409 .with_env() 

410 .with_mcp() 

411 .with_disallowed_tools() 

412 .with_lint_tools(lint_tools) 

413 .build() 

414 ) 

415 client = self.sdk_client_factory.create(runtime.options) 

416 

417 pending_lint_commands: dict[str, tuple[str, str]] = {} 

418 

419 try: 

420 async with asyncio.timeout(self.config.timeout_seconds): 

421 async with client: 

422 await client.query(prompt) 

423 

424 async for message in client.receive_response(): 

425 # Use duck typing to avoid SDK imports 

426 msg_type = type(message).__name__ 

427 if msg_type == "AssistantMessage": 

428 content = getattr(message, "content", []) 

429 for block in content: 

430 block_type = type(block).__name__ 

431 if block_type == "TextBlock": 

432 text = getattr(block, "text", "") 

433 if self.event_sink is not None: 

434 self.event_sink.on_fixer_text(attempt, text) 

435 elif block_type == "ToolUseBlock": 

436 name = getattr(block, "name", "") 

437 block_input = getattr(block, "input", {}) 

438 if self.event_sink is not None: 

439 self.event_sink.on_fixer_tool_use( 

440 attempt, name, block_input 

441 ) 

442 if name.lower() == "bash": 

443 cmd = block_input.get("command", "") 

444 lint_type = ( 

445 runtime.lint_cache.detect_lint_command(cmd) 

446 ) 

447 if lint_type: 

448 block_id = getattr(block, "id", "") 

449 pending_lint_commands[block_id] = ( 

450 lint_type, 

451 cmd, 

452 ) 

453 elif block_type == "ToolResultBlock": 

454 tool_use_id = getattr(block, "tool_use_id", None) 

455 if tool_use_id in pending_lint_commands: 

456 lint_type, cmd = pending_lint_commands.pop( 

457 tool_use_id 

458 ) 

459 if not getattr(block, "is_error", False): 

460 runtime.lint_cache.mark_success( 

461 lint_type, cmd 

462 ) 

463 elif msg_type == "ResultMessage": 

464 result = getattr(message, "result", "") or "" 

465 if self.event_sink is not None: 

466 self.event_sink.on_fixer_completed(result) 

467 

468 return True 

469 

470 except TimeoutError: 

471 if self.event_sink is not None: 

472 self.event_sink.on_fixer_failed("timeout") 

473 return False 

474 except Exception as e: 

475 if self.event_sink is not None: 

476 self.event_sink.on_fixer_failed(str(e)) 

477 return False 

478 finally: 

479 self._active_fixer_ids.remove(agent_id) 

480 cleanup_agent_locks(agent_id) 

481 

482 def cleanup_fixer_locks(self) -> None: 

483 """Clean up any remaining fixer agent locks.""" 

484 for agent_id in self._active_fixer_ids: 

485 cleanup_agent_locks(agent_id) 

486 self._active_fixer_ids.clear()