Coverage for src / infra / io / console_sink.py: 36%

170 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Console event sink implementation for MalaOrchestrator. 

2 

3Provides ConsoleEventSink which outputs orchestrator events to the console 

4using the log helpers from log_output/console.py. 

5""" 

6 

7import re 

8from typing import Any 

9 

10from src.core.protocols import DeadlockInfoProtocol, EventRunConfig, MalaEventSink 

11from src.infra.io.base_sink import BaseEventSink 

12from src.infra.io.log_output.console import ( 

13 Colors, 

14 log, 

15 log_agent_text, 

16 log_tool, 

17 log_verbose, 

18 truncate_text, 

19) 

20 

21 

22class ConsoleEventSink(BaseEventSink): 

23 """Event sink that outputs to the console using existing log helpers. 

24 

25 Implements all MalaEventSink methods, delegating to log(), log_tool(), 

26 and log_agent_text() from src/infra/io/log_output/console.py. 

27 

28 Example: 

29 from src.orchestration.factory import create_orchestrator, OrchestratorDependencies 

30 

31 sink = ConsoleEventSink() 

32 deps = OrchestratorDependencies(event_sink=sink) 

33 orchestrator = create_orchestrator(config, deps=deps) 

34 await orchestrator.run() # Produces console output 

35 """ 

36 

37 # ------------------------------------------------------------------------- 

38 # Run lifecycle 

39 # ------------------------------------------------------------------------- 

40 

41 def on_run_started(self, config: EventRunConfig) -> None: 

42 log("→", "[START] Run started", agent_id="run") 

43 log("◦", f"Repository: {config.repo_path}", agent_id="run") 

44 if config.epic_id: 

45 log("◦", f"Epic: {config.epic_id}", agent_id="run") 

46 if config.only_ids: 

47 log("◦", f"Issues: {', '.join(config.only_ids)}", agent_id="run") 

48 if config.prioritize_wip: 

49 log("◦", "Mode: prioritize WIP", agent_id="run") 

50 if config.orphans_only: 

51 log("◦", "Mode: orphans only", agent_id="run") 

52 log_verbose("◦", f"Parallelism: {config.max_agents}", agent_id="run") 

53 self._log_limits(config) 

54 self._log_review_config(config) 

55 self._log_braintrust_config(config) 

56 self._log_cli_args(config) 

57 

58 def _log_limits(self, config: EventRunConfig) -> None: 

59 limits_parts: list[str] = [] 

60 if config.timeout_minutes is not None: 

61 limits_parts.append(f"Time: {config.timeout_minutes} min") 

62 if config.max_issues is not None: 

63 limits_parts.append(f"Issues: {config.max_issues}") 

64 if limits_parts: 

65 log_verbose("◦", f"Limits: {', '.join(limits_parts)}", agent_id="run") 

66 

67 def _log_review_config(self, config: EventRunConfig) -> None: 

68 review_type = "LLM review" if config.review_enabled else "disabled" 

69 log_verbose("◦", f"Review: {review_type}", agent_id="run") 

70 log_verbose( 

71 "◦", 

72 f"Review retries: {config.max_review_retries} " 

73 f"(gate retries: {config.max_gate_retries})", 

74 agent_id="run", 

75 ) 

76 

77 def _log_braintrust_config(self, config: EventRunConfig) -> None: 

78 braintrust_mode = "enabled" if config.braintrust_enabled else "disabled" 

79 log_verbose("◦", f"Braintrust: {braintrust_mode}", agent_id="run") 

80 

81 def _log_cli_args(self, config: EventRunConfig) -> None: 

82 # Log CLI arguments if available 

83 if config.cli_args: 

84 # Filter out sensitive or empty arguments 

85 safe_args = { 

86 k: v 

87 for k, v in config.cli_args.items() 

88 if v is not None and k not in ("api_key",) 

89 } 

90 if safe_args: 

91 log_verbose("◦", f"CLI args: {safe_args}", agent_id="run") 

92 

93 def on_run_completed( 

94 self, 

95 success_count: int, 

96 total_count: int, 

97 run_validation_passed: bool, 

98 abort_reason: str | None = None, 

99 ) -> None: 

100 status_icon = "✓" if success_count == total_count else "✗" 

101 status = f"DONE {status_icon} {success_count}/{total_count} issues completed" 

102 if abort_reason: 

103 status += f" (aborted: {abort_reason})" 

104 log("→", status, agent_id="run") 

105 if run_validation_passed: 

106 log("✓", "RUN VALIDATION passed", agent_id="run") 

107 else: 

108 log( 

109 "✗", 

110 f"RUN VALIDATION {Colors.RED}failed{Colors.RESET}", 

111 agent_id="run", 

112 ) 

113 

114 def on_ready_issues(self, issue_ids: list[str]) -> None: 

115 log("→", f"Ready issues ({len(issue_ids)}): {issue_ids}", agent_id="run") 

116 

117 def on_waiting_for_agents(self, count: int) -> None: 

118 log_verbose("◦", f"Waiting for {count} agents to complete...", agent_id="run") 

119 

120 def on_no_more_issues(self, reason: str) -> None: 

121 log("→", f"No more issues: {reason}", agent_id="run") 

122 

123 # ------------------------------------------------------------------------- 

124 # Agent lifecycle 

125 # ------------------------------------------------------------------------- 

126 

127 def on_agent_started(self, agent_id: str, issue_id: str) -> None: 

128 log("▶", f"Claimed {issue_id}", agent_id=agent_id) 

129 

130 def on_agent_completed( 

131 self, 

132 agent_id: str, 

133 issue_id: str, 

134 success: bool, 

135 duration_seconds: float, 

136 summary: str, 

137 ) -> None: 

138 # Use verbose logging since on_issue_completed provides similar info 

139 status_icon = "✓" if success else "✗" 

140 log_verbose( 

141 status_icon, 

142 f"Agent {agent_id} completed in {duration_seconds:.1f}s: {summary}", 

143 agent_id=agent_id, 

144 ) 

145 

146 def on_claim_failed(self, agent_id: str, issue_id: str) -> None: 

147 log("○", f"SKIP {issue_id} already claimed", agent_id=agent_id) 

148 

149 # ------------------------------------------------------------------------- 

150 # SDK message streaming 

151 # ------------------------------------------------------------------------- 

152 

153 def on_tool_use( 

154 self, 

155 agent_id: str, 

156 tool_name: str, 

157 description: str = "", 

158 arguments: dict[str, Any] | None = None, 

159 ) -> None: 

160 log_tool(tool_name, description, agent_id=agent_id, arguments=arguments) 

161 

162 def on_agent_text(self, agent_id: str, text: str) -> None: 

163 log_agent_text(text, agent_id) 

164 

165 # ------------------------------------------------------------------------- 

166 # Quality gate events 

167 # ------------------------------------------------------------------------- 

168 

169 def on_gate_started( 

170 self, 

171 agent_id: str | None, 

172 attempt: int, 

173 max_attempts: int, 

174 issue_id: str | None = None, 

175 ) -> None: 

176 log( 

177 "→", 

178 f"GATE Attempt {attempt}/{max_attempts}", 

179 agent_id=agent_id or "run", 

180 issue_id=issue_id, 

181 ) 

182 

183 def on_gate_passed( 

184 self, 

185 agent_id: str | None, 

186 issue_id: str | None = None, 

187 ) -> None: 

188 log("✓", "GATE passed", agent_id=agent_id or "run", issue_id=issue_id) 

189 

190 def on_gate_failed( 

191 self, 

192 agent_id: str | None, 

193 attempt: int, 

194 max_attempts: int, 

195 issue_id: str | None = None, 

196 ) -> None: 

197 log( 

198 "✗", 

199 f"GATE {Colors.RED}failed{Colors.RESET} ({attempt}/{max_attempts})", 

200 agent_id=agent_id or "run", 

201 issue_id=issue_id, 

202 ) 

203 

204 def on_gate_retry( 

205 self, 

206 agent_id: str, 

207 attempt: int, 

208 max_attempts: int, 

209 issue_id: str | None = None, 

210 ) -> None: 

211 log( 

212 "→", 

213 f"GATE Retry {attempt}/{max_attempts}", 

214 agent_id=agent_id, 

215 issue_id=issue_id, 

216 ) 

217 

218 def on_gate_result( 

219 self, 

220 agent_id: str | None, 

221 passed: bool, 

222 failure_reasons: list[str] | None = None, 

223 issue_id: str | None = None, 

224 ) -> None: 

225 if passed: 

226 log( 

227 "✓", 

228 "GATE all checks passed", 

229 agent_id=agent_id or "run", 

230 issue_id=issue_id, 

231 ) 

232 elif failure_reasons: 

233 log( 

234 "✗", 

235 f"GATE {Colors.RED}{len(failure_reasons)} checks failed{Colors.RESET}", 

236 agent_id=agent_id or "run", 

237 issue_id=issue_id, 

238 ) 

239 for reason in failure_reasons: 

240 log("→", f" - {reason}", agent_id=agent_id or "run", issue_id=issue_id) 

241 

242 # ------------------------------------------------------------------------- 

243 # Codex review events 

244 # ------------------------------------------------------------------------- 

245 

246 def on_review_started( 

247 self, 

248 agent_id: str, 

249 attempt: int, 

250 max_attempts: int, 

251 issue_id: str | None = None, 

252 ) -> None: 

253 log( 

254 "→", 

255 f"REVIEW Attempt {attempt}/{max_attempts}", 

256 agent_id=agent_id, 

257 issue_id=issue_id, 

258 ) 

259 

260 def on_review_passed( 

261 self, 

262 agent_id: str, 

263 issue_id: str | None = None, 

264 ) -> None: 

265 log("✓", "REVIEW approved", agent_id=agent_id, issue_id=issue_id) 

266 

267 def on_review_retry( 

268 self, 

269 agent_id: str, 

270 attempt: int, 

271 max_attempts: int, 

272 error_count: int | None = None, 

273 parse_error: str | None = None, 

274 issue_id: str | None = None, 

275 ) -> None: 

276 details = "" 

277 if error_count is not None: 

278 details = f" ({error_count} errors)" 

279 elif parse_error: 

280 details = f" (parse error: {parse_error})" 

281 log( 

282 "→", 

283 f"REVIEW Retry {attempt}/{max_attempts}{details}", 

284 agent_id=agent_id, 

285 issue_id=issue_id, 

286 ) 

287 

288 def on_review_warning( 

289 self, 

290 message: str, 

291 agent_id: str | None = None, 

292 issue_id: str | None = None, 

293 ) -> None: 

294 log( 

295 "⚠", 

296 f"REVIEW {Colors.YELLOW}{message}{Colors.RESET}", 

297 agent_id=agent_id or "run", 

298 issue_id=issue_id, 

299 ) 

300 

301 # ------------------------------------------------------------------------- 

302 # Fixer agent events 

303 # ------------------------------------------------------------------------- 

304 

305 def on_fixer_started( 

306 self, 

307 attempt: int, 

308 max_attempts: int, 

309 ) -> None: 

310 log("→", f"FIXER Attempt {attempt}/{max_attempts}", agent_id="fixer") 

311 

312 def on_fixer_completed(self, result: str) -> None: 

313 log("✓", f"FIXER {result}", agent_id="fixer") 

314 

315 def on_fixer_failed(self, reason: str) -> None: 

316 log("✗", f"FIXER {Colors.RED}{reason}{Colors.RESET}", agent_id="fixer") 

317 

318 # ------------------------------------------------------------------------- 

319 # Issue lifecycle 

320 # ------------------------------------------------------------------------- 

321 

322 def on_issue_closed(self, agent_id: str, issue_id: str) -> None: 

323 log("→", f"CLOSE {issue_id}", agent_id=agent_id) 

324 

325 def on_issue_completed( 

326 self, 

327 agent_id: str, 

328 issue_id: str, 

329 success: bool, 

330 duration_seconds: float, 

331 summary: str, 

332 ) -> None: 

333 status_icon = "✓" if success else "✗" 

334 log( 

335 status_icon, 

336 f"{issue_id} completed in {duration_seconds:.1f}s: {summary}", 

337 agent_id=agent_id, 

338 ) 

339 

340 def on_epic_closed(self, agent_id: str) -> None: 

341 log("→", "EPIC Closed", agent_id=agent_id) 

342 

343 def on_validation_started( 

344 self, 

345 agent_id: str, 

346 issue_id: str | None = None, 

347 ) -> None: 

348 log("→", "VALIDATE Starting validation", agent_id=agent_id, issue_id=issue_id) 

349 

350 def on_validation_result( 

351 self, 

352 agent_id: str, 

353 passed: bool, 

354 issue_id: str | None = None, 

355 ) -> None: 

356 status_icon = "✓" if passed else "✗" 

357 log(status_icon, "VALIDATE", agent_id=agent_id, issue_id=issue_id) 

358 

359 def on_validation_step_running( 

360 self, 

361 step_name: str, 

362 agent_id: str | None = None, 

363 ) -> None: 

364 log("▸", f" {step_name} running...", agent_id=agent_id or "run") 

365 

366 def on_validation_step_skipped( 

367 self, 

368 step_name: str, 

369 reason: str, 

370 agent_id: str | None = None, 

371 ) -> None: 

372 log( 

373 "○", 

374 f" {step_name} {Colors.YELLOW}skipped: {reason}{Colors.RESET}", 

375 agent_id=agent_id or "run", 

376 ) 

377 

378 def on_validation_step_passed( 

379 self, 

380 step_name: str, 

381 duration_seconds: float, 

382 agent_id: str | None = None, 

383 ) -> None: 

384 log( 

385 "✓", 

386 f" {step_name} ({duration_seconds:.1f}s)", 

387 agent_id=agent_id or "run", 

388 ) 

389 

390 def on_validation_step_failed( 

391 self, 

392 step_name: str, 

393 exit_code: int, 

394 agent_id: str | None = None, 

395 ) -> None: 

396 log( 

397 "✗", 

398 f" {step_name} {Colors.RED}exit {exit_code}{Colors.RESET}", 

399 agent_id=agent_id or "run", 

400 ) 

401 

402 # ------------------------------------------------------------------------- 

403 # Warnings and diagnostics 

404 # ------------------------------------------------------------------------- 

405 

406 def on_warning(self, message: str, agent_id: str | None = None) -> None: 

407 log("⚠", f"{Colors.YELLOW}{message}{Colors.RESET}", agent_id=agent_id or "run") 

408 

409 def on_log_timeout(self, agent_id: str, log_path: str) -> None: 

410 log( 

411 "⚠", 

412 f"{Colors.YELLOW}Log timeout. Check: {log_path}{Colors.RESET}", 

413 agent_id=agent_id, 

414 ) 

415 

416 def on_locks_cleaned(self, agent_id: str, count: int) -> None: 

417 log("→", f"Cleaned {count} stale locks", agent_id=agent_id) 

418 

419 def on_locks_released(self, count: int) -> None: 

420 log("→", f"Released {count} locks", agent_id="run") 

421 

422 def on_issues_committed(self) -> None: 

423 log("→", "COMMIT Issues committed", agent_id="run") 

424 

425 def on_run_metadata_saved(self, path: str) -> None: 

426 log("◦", f"Run metadata saved to {path}", agent_id="run") 

427 

428 def on_run_level_validation_disabled(self) -> None: 

429 log_verbose("◦", "Run-level validation disabled", agent_id="run") 

430 

431 def on_abort_requested(self, reason: str) -> None: 

432 log("⚠", f"{Colors.YELLOW}ABORT {reason}{Colors.RESET}", agent_id="run") 

433 

434 def on_tasks_aborting(self, count: int, reason: str) -> None: 

435 log("→", f"ABORT Cancelling {count} tasks: {reason}", agent_id="run") 

436 

437 # ------------------------------------------------------------------------- 

438 # Epic verification lifecycle 

439 # ------------------------------------------------------------------------- 

440 

441 def on_epic_verification_started(self, epic_id: str) -> None: 

442 log("→", f"VERIFY Starting verification for {epic_id}", agent_id="epic") 

443 

444 def on_epic_verification_passed(self, epic_id: str, confidence: float) -> None: 

445 log( 

446 "✓", 

447 f"VERIFY {epic_id} passed (confidence: {confidence:.0%})", 

448 agent_id="epic", 

449 ) 

450 

451 def on_epic_verification_failed( 

452 self, 

453 epic_id: str, 

454 unmet_count: int, 

455 remediation_ids: list[str], 

456 ) -> None: 

457 log( 

458 "✗", 

459 f"VERIFY {Colors.RED}{epic_id}: {unmet_count} criteria unmet{Colors.RESET}", 

460 agent_id="epic", 

461 ) 

462 for issue_id in remediation_ids: 

463 log("→", f" → Remediation: {issue_id}", agent_id="epic") 

464 

465 def on_epic_remediation_created( 

466 self, 

467 epic_id: str, 

468 issue_id: str, 

469 criterion: str, 

470 ) -> None: 

471 truncated = truncate_text(criterion, 80) 

472 log("→", f"REMEDIATE {epic_id}{issue_id}: {truncated}", agent_id="epic") 

473 

474 # ------------------------------------------------------------------------- 

475 # Pipeline module events 

476 # ------------------------------------------------------------------------- 

477 

478 def on_lifecycle_state(self, agent_id: str, state: str) -> None: 

479 log_verbose("◦", f"LIFECYCLE {state}", agent_id=agent_id) 

480 

481 def on_log_waiting(self, agent_id: str) -> None: 

482 log_verbose("◦", "LOG Waiting for session log...", agent_id=agent_id) 

483 

484 def on_log_ready(self, agent_id: str) -> None: 

485 log_verbose("◦", "LOG Session log ready", agent_id=agent_id) 

486 

487 def on_review_skipped_no_progress(self, agent_id: str) -> None: 

488 log( 

489 "⚠", 

490 f"REVIEW {Colors.YELLOW}Skipped (no code changes){Colors.RESET}", 

491 agent_id=agent_id, 

492 ) 

493 

494 def on_fixer_text(self, attempt: int, text: str) -> None: 

495 # Strip ANSI codes for cleaner output 

496 clean_text = re.sub(r"\x1b\[[0-9;]*m", "", text) 

497 log("→", f"[{attempt}] {clean_text}", agent_id="fixer") 

498 

499 def on_fixer_tool_use( 

500 self, 

501 attempt: int, 

502 tool_name: str, 

503 arguments: dict[str, Any] | None = None, 

504 ) -> None: 

505 log_tool(tool_name, "", agent_id=f"fixer-{attempt}", arguments=arguments) 

506 

507 def on_deadlock_detected(self, info: DeadlockInfoProtocol) -> None: 

508 cycle_str = " → ".join(info.cycle) 

509 victim_issue = info.victim_issue_id or "unknown" 

510 blocker_issue = info.blocker_issue_id or "unknown" 

511 log( 

512 "⚠", 

513 f"{Colors.YELLOW}Deadlock detected:{Colors.RESET} cycle=[{cycle_str}] " 

514 f"victim={info.victim_id}({victim_issue}) blocked_on={info.blocked_on} " 

515 f"blocker={info.blocker_id}({blocker_issue})", 

516 ) 

517 

518 

519# Protocol assertion to verify implementation compliance 

520assert isinstance(ConsoleEventSink(), MalaEventSink)