Coverage for src / infra / io / console_sink.py: 36%
170 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Console event sink implementation for MalaOrchestrator.
3Provides ConsoleEventSink which outputs orchestrator events to the console
4using the log helpers from log_output/console.py.
5"""
7import re
8from typing import Any
10from src.core.protocols import DeadlockInfoProtocol, EventRunConfig, MalaEventSink
11from src.infra.io.base_sink import BaseEventSink
12from src.infra.io.log_output.console import (
13 Colors,
14 log,
15 log_agent_text,
16 log_tool,
17 log_verbose,
18 truncate_text,
19)
22class ConsoleEventSink(BaseEventSink):
23 """Event sink that outputs to the console using existing log helpers.
25 Implements all MalaEventSink methods, delegating to log(), log_tool(),
26 and log_agent_text() from src/infra/io/log_output/console.py.
28 Example:
29 from src.orchestration.factory import create_orchestrator, OrchestratorDependencies
31 sink = ConsoleEventSink()
32 deps = OrchestratorDependencies(event_sink=sink)
33 orchestrator = create_orchestrator(config, deps=deps)
34 await orchestrator.run() # Produces console output
35 """
37 # -------------------------------------------------------------------------
38 # Run lifecycle
39 # -------------------------------------------------------------------------
41 def on_run_started(self, config: EventRunConfig) -> None:
42 log("→", "[START] Run started", agent_id="run")
43 log("◦", f"Repository: {config.repo_path}", agent_id="run")
44 if config.epic_id:
45 log("◦", f"Epic: {config.epic_id}", agent_id="run")
46 if config.only_ids:
47 log("◦", f"Issues: {', '.join(config.only_ids)}", agent_id="run")
48 if config.prioritize_wip:
49 log("◦", "Mode: prioritize WIP", agent_id="run")
50 if config.orphans_only:
51 log("◦", "Mode: orphans only", agent_id="run")
52 log_verbose("◦", f"Parallelism: {config.max_agents}", agent_id="run")
53 self._log_limits(config)
54 self._log_review_config(config)
55 self._log_braintrust_config(config)
56 self._log_cli_args(config)
58 def _log_limits(self, config: EventRunConfig) -> None:
59 limits_parts: list[str] = []
60 if config.timeout_minutes is not None:
61 limits_parts.append(f"Time: {config.timeout_minutes} min")
62 if config.max_issues is not None:
63 limits_parts.append(f"Issues: {config.max_issues}")
64 if limits_parts:
65 log_verbose("◦", f"Limits: {', '.join(limits_parts)}", agent_id="run")
67 def _log_review_config(self, config: EventRunConfig) -> None:
68 review_type = "LLM review" if config.review_enabled else "disabled"
69 log_verbose("◦", f"Review: {review_type}", agent_id="run")
70 log_verbose(
71 "◦",
72 f"Review retries: {config.max_review_retries} "
73 f"(gate retries: {config.max_gate_retries})",
74 agent_id="run",
75 )
77 def _log_braintrust_config(self, config: EventRunConfig) -> None:
78 braintrust_mode = "enabled" if config.braintrust_enabled else "disabled"
79 log_verbose("◦", f"Braintrust: {braintrust_mode}", agent_id="run")
81 def _log_cli_args(self, config: EventRunConfig) -> None:
82 # Log CLI arguments if available
83 if config.cli_args:
84 # Filter out sensitive or empty arguments
85 safe_args = {
86 k: v
87 for k, v in config.cli_args.items()
88 if v is not None and k not in ("api_key",)
89 }
90 if safe_args:
91 log_verbose("◦", f"CLI args: {safe_args}", agent_id="run")
93 def on_run_completed(
94 self,
95 success_count: int,
96 total_count: int,
97 run_validation_passed: bool,
98 abort_reason: str | None = None,
99 ) -> None:
100 status_icon = "✓" if success_count == total_count else "✗"
101 status = f"DONE {status_icon} {success_count}/{total_count} issues completed"
102 if abort_reason:
103 status += f" (aborted: {abort_reason})"
104 log("→", status, agent_id="run")
105 if run_validation_passed:
106 log("✓", "RUN VALIDATION passed", agent_id="run")
107 else:
108 log(
109 "✗",
110 f"RUN VALIDATION {Colors.RED}failed{Colors.RESET}",
111 agent_id="run",
112 )
114 def on_ready_issues(self, issue_ids: list[str]) -> None:
115 log("→", f"Ready issues ({len(issue_ids)}): {issue_ids}", agent_id="run")
117 def on_waiting_for_agents(self, count: int) -> None:
118 log_verbose("◦", f"Waiting for {count} agents to complete...", agent_id="run")
120 def on_no_more_issues(self, reason: str) -> None:
121 log("→", f"No more issues: {reason}", agent_id="run")
123 # -------------------------------------------------------------------------
124 # Agent lifecycle
125 # -------------------------------------------------------------------------
127 def on_agent_started(self, agent_id: str, issue_id: str) -> None:
128 log("▶", f"Claimed {issue_id}", agent_id=agent_id)
130 def on_agent_completed(
131 self,
132 agent_id: str,
133 issue_id: str,
134 success: bool,
135 duration_seconds: float,
136 summary: str,
137 ) -> None:
138 # Use verbose logging since on_issue_completed provides similar info
139 status_icon = "✓" if success else "✗"
140 log_verbose(
141 status_icon,
142 f"Agent {agent_id} completed in {duration_seconds:.1f}s: {summary}",
143 agent_id=agent_id,
144 )
146 def on_claim_failed(self, agent_id: str, issue_id: str) -> None:
147 log("○", f"SKIP {issue_id} already claimed", agent_id=agent_id)
149 # -------------------------------------------------------------------------
150 # SDK message streaming
151 # -------------------------------------------------------------------------
153 def on_tool_use(
154 self,
155 agent_id: str,
156 tool_name: str,
157 description: str = "",
158 arguments: dict[str, Any] | None = None,
159 ) -> None:
160 log_tool(tool_name, description, agent_id=agent_id, arguments=arguments)
162 def on_agent_text(self, agent_id: str, text: str) -> None:
163 log_agent_text(text, agent_id)
165 # -------------------------------------------------------------------------
166 # Quality gate events
167 # -------------------------------------------------------------------------
169 def on_gate_started(
170 self,
171 agent_id: str | None,
172 attempt: int,
173 max_attempts: int,
174 issue_id: str | None = None,
175 ) -> None:
176 log(
177 "→",
178 f"GATE Attempt {attempt}/{max_attempts}",
179 agent_id=agent_id or "run",
180 issue_id=issue_id,
181 )
183 def on_gate_passed(
184 self,
185 agent_id: str | None,
186 issue_id: str | None = None,
187 ) -> None:
188 log("✓", "GATE passed", agent_id=agent_id or "run", issue_id=issue_id)
190 def on_gate_failed(
191 self,
192 agent_id: str | None,
193 attempt: int,
194 max_attempts: int,
195 issue_id: str | None = None,
196 ) -> None:
197 log(
198 "✗",
199 f"GATE {Colors.RED}failed{Colors.RESET} ({attempt}/{max_attempts})",
200 agent_id=agent_id or "run",
201 issue_id=issue_id,
202 )
204 def on_gate_retry(
205 self,
206 agent_id: str,
207 attempt: int,
208 max_attempts: int,
209 issue_id: str | None = None,
210 ) -> None:
211 log(
212 "→",
213 f"GATE Retry {attempt}/{max_attempts}",
214 agent_id=agent_id,
215 issue_id=issue_id,
216 )
218 def on_gate_result(
219 self,
220 agent_id: str | None,
221 passed: bool,
222 failure_reasons: list[str] | None = None,
223 issue_id: str | None = None,
224 ) -> None:
225 if passed:
226 log(
227 "✓",
228 "GATE all checks passed",
229 agent_id=agent_id or "run",
230 issue_id=issue_id,
231 )
232 elif failure_reasons:
233 log(
234 "✗",
235 f"GATE {Colors.RED}{len(failure_reasons)} checks failed{Colors.RESET}",
236 agent_id=agent_id or "run",
237 issue_id=issue_id,
238 )
239 for reason in failure_reasons:
240 log("→", f" - {reason}", agent_id=agent_id or "run", issue_id=issue_id)
242 # -------------------------------------------------------------------------
243 # Codex review events
244 # -------------------------------------------------------------------------
246 def on_review_started(
247 self,
248 agent_id: str,
249 attempt: int,
250 max_attempts: int,
251 issue_id: str | None = None,
252 ) -> None:
253 log(
254 "→",
255 f"REVIEW Attempt {attempt}/{max_attempts}",
256 agent_id=agent_id,
257 issue_id=issue_id,
258 )
260 def on_review_passed(
261 self,
262 agent_id: str,
263 issue_id: str | None = None,
264 ) -> None:
265 log("✓", "REVIEW approved", agent_id=agent_id, issue_id=issue_id)
267 def on_review_retry(
268 self,
269 agent_id: str,
270 attempt: int,
271 max_attempts: int,
272 error_count: int | None = None,
273 parse_error: str | None = None,
274 issue_id: str | None = None,
275 ) -> None:
276 details = ""
277 if error_count is not None:
278 details = f" ({error_count} errors)"
279 elif parse_error:
280 details = f" (parse error: {parse_error})"
281 log(
282 "→",
283 f"REVIEW Retry {attempt}/{max_attempts}{details}",
284 agent_id=agent_id,
285 issue_id=issue_id,
286 )
288 def on_review_warning(
289 self,
290 message: str,
291 agent_id: str | None = None,
292 issue_id: str | None = None,
293 ) -> None:
294 log(
295 "⚠",
296 f"REVIEW {Colors.YELLOW}{message}{Colors.RESET}",
297 agent_id=agent_id or "run",
298 issue_id=issue_id,
299 )
301 # -------------------------------------------------------------------------
302 # Fixer agent events
303 # -------------------------------------------------------------------------
305 def on_fixer_started(
306 self,
307 attempt: int,
308 max_attempts: int,
309 ) -> None:
310 log("→", f"FIXER Attempt {attempt}/{max_attempts}", agent_id="fixer")
312 def on_fixer_completed(self, result: str) -> None:
313 log("✓", f"FIXER {result}", agent_id="fixer")
315 def on_fixer_failed(self, reason: str) -> None:
316 log("✗", f"FIXER {Colors.RED}{reason}{Colors.RESET}", agent_id="fixer")
318 # -------------------------------------------------------------------------
319 # Issue lifecycle
320 # -------------------------------------------------------------------------
322 def on_issue_closed(self, agent_id: str, issue_id: str) -> None:
323 log("→", f"CLOSE {issue_id}", agent_id=agent_id)
325 def on_issue_completed(
326 self,
327 agent_id: str,
328 issue_id: str,
329 success: bool,
330 duration_seconds: float,
331 summary: str,
332 ) -> None:
333 status_icon = "✓" if success else "✗"
334 log(
335 status_icon,
336 f"{issue_id} completed in {duration_seconds:.1f}s: {summary}",
337 agent_id=agent_id,
338 )
340 def on_epic_closed(self, agent_id: str) -> None:
341 log("→", "EPIC Closed", agent_id=agent_id)
343 def on_validation_started(
344 self,
345 agent_id: str,
346 issue_id: str | None = None,
347 ) -> None:
348 log("→", "VALIDATE Starting validation", agent_id=agent_id, issue_id=issue_id)
350 def on_validation_result(
351 self,
352 agent_id: str,
353 passed: bool,
354 issue_id: str | None = None,
355 ) -> None:
356 status_icon = "✓" if passed else "✗"
357 log(status_icon, "VALIDATE", agent_id=agent_id, issue_id=issue_id)
359 def on_validation_step_running(
360 self,
361 step_name: str,
362 agent_id: str | None = None,
363 ) -> None:
364 log("▸", f" {step_name} running...", agent_id=agent_id or "run")
366 def on_validation_step_skipped(
367 self,
368 step_name: str,
369 reason: str,
370 agent_id: str | None = None,
371 ) -> None:
372 log(
373 "○",
374 f" {step_name} {Colors.YELLOW}skipped: {reason}{Colors.RESET}",
375 agent_id=agent_id or "run",
376 )
378 def on_validation_step_passed(
379 self,
380 step_name: str,
381 duration_seconds: float,
382 agent_id: str | None = None,
383 ) -> None:
384 log(
385 "✓",
386 f" {step_name} ({duration_seconds:.1f}s)",
387 agent_id=agent_id or "run",
388 )
390 def on_validation_step_failed(
391 self,
392 step_name: str,
393 exit_code: int,
394 agent_id: str | None = None,
395 ) -> None:
396 log(
397 "✗",
398 f" {step_name} {Colors.RED}exit {exit_code}{Colors.RESET}",
399 agent_id=agent_id or "run",
400 )
402 # -------------------------------------------------------------------------
403 # Warnings and diagnostics
404 # -------------------------------------------------------------------------
406 def on_warning(self, message: str, agent_id: str | None = None) -> None:
407 log("⚠", f"{Colors.YELLOW}{message}{Colors.RESET}", agent_id=agent_id or "run")
409 def on_log_timeout(self, agent_id: str, log_path: str) -> None:
410 log(
411 "⚠",
412 f"{Colors.YELLOW}Log timeout. Check: {log_path}{Colors.RESET}",
413 agent_id=agent_id,
414 )
416 def on_locks_cleaned(self, agent_id: str, count: int) -> None:
417 log("→", f"Cleaned {count} stale locks", agent_id=agent_id)
419 def on_locks_released(self, count: int) -> None:
420 log("→", f"Released {count} locks", agent_id="run")
422 def on_issues_committed(self) -> None:
423 log("→", "COMMIT Issues committed", agent_id="run")
425 def on_run_metadata_saved(self, path: str) -> None:
426 log("◦", f"Run metadata saved to {path}", agent_id="run")
428 def on_run_level_validation_disabled(self) -> None:
429 log_verbose("◦", "Run-level validation disabled", agent_id="run")
431 def on_abort_requested(self, reason: str) -> None:
432 log("⚠", f"{Colors.YELLOW}ABORT {reason}{Colors.RESET}", agent_id="run")
434 def on_tasks_aborting(self, count: int, reason: str) -> None:
435 log("→", f"ABORT Cancelling {count} tasks: {reason}", agent_id="run")
437 # -------------------------------------------------------------------------
438 # Epic verification lifecycle
439 # -------------------------------------------------------------------------
441 def on_epic_verification_started(self, epic_id: str) -> None:
442 log("→", f"VERIFY Starting verification for {epic_id}", agent_id="epic")
444 def on_epic_verification_passed(self, epic_id: str, confidence: float) -> None:
445 log(
446 "✓",
447 f"VERIFY {epic_id} passed (confidence: {confidence:.0%})",
448 agent_id="epic",
449 )
451 def on_epic_verification_failed(
452 self,
453 epic_id: str,
454 unmet_count: int,
455 remediation_ids: list[str],
456 ) -> None:
457 log(
458 "✗",
459 f"VERIFY {Colors.RED}{epic_id}: {unmet_count} criteria unmet{Colors.RESET}",
460 agent_id="epic",
461 )
462 for issue_id in remediation_ids:
463 log("→", f" → Remediation: {issue_id}", agent_id="epic")
465 def on_epic_remediation_created(
466 self,
467 epic_id: str,
468 issue_id: str,
469 criterion: str,
470 ) -> None:
471 truncated = truncate_text(criterion, 80)
472 log("→", f"REMEDIATE {epic_id} → {issue_id}: {truncated}", agent_id="epic")
474 # -------------------------------------------------------------------------
475 # Pipeline module events
476 # -------------------------------------------------------------------------
478 def on_lifecycle_state(self, agent_id: str, state: str) -> None:
479 log_verbose("◦", f"LIFECYCLE {state}", agent_id=agent_id)
481 def on_log_waiting(self, agent_id: str) -> None:
482 log_verbose("◦", "LOG Waiting for session log...", agent_id=agent_id)
484 def on_log_ready(self, agent_id: str) -> None:
485 log_verbose("◦", "LOG Session log ready", agent_id=agent_id)
487 def on_review_skipped_no_progress(self, agent_id: str) -> None:
488 log(
489 "⚠",
490 f"REVIEW {Colors.YELLOW}Skipped (no code changes){Colors.RESET}",
491 agent_id=agent_id,
492 )
494 def on_fixer_text(self, attempt: int, text: str) -> None:
495 # Strip ANSI codes for cleaner output
496 clean_text = re.sub(r"\x1b\[[0-9;]*m", "", text)
497 log("→", f"[{attempt}] {clean_text}", agent_id="fixer")
499 def on_fixer_tool_use(
500 self,
501 attempt: int,
502 tool_name: str,
503 arguments: dict[str, Any] | None = None,
504 ) -> None:
505 log_tool(tool_name, "", agent_id=f"fixer-{attempt}", arguments=arguments)
507 def on_deadlock_detected(self, info: DeadlockInfoProtocol) -> None:
508 cycle_str = " → ".join(info.cycle)
509 victim_issue = info.victim_issue_id or "unknown"
510 blocker_issue = info.blocker_issue_id or "unknown"
511 log(
512 "⚠",
513 f"{Colors.YELLOW}Deadlock detected:{Colors.RESET} cycle=[{cycle_str}] "
514 f"victim={info.victim_id}({victim_issue}) blocked_on={info.blocked_on} "
515 f"blocker={info.blocker_id}({blocker_issue})",
516 )
519# Protocol assertion to verify implementation compliance
520assert isinstance(ConsoleEventSink(), MalaEventSink)