Coverage for src / pipeline / session_callback_factory.py: 27%
51 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""SessionCallbackFactory: Builds SessionCallbacks for agent sessions.
3This factory encapsulates callback construction that bridges orchestrator state
4to the pipeline runners. It receives state references and returns a SessionCallbacks
5instance wired to the appropriate callbacks.
7Design principles:
8- Single responsibility: only builds callbacks, doesn't run gates/reviews
9- Protocol-based dependencies for testability
10- All callback closures capture minimal state
11- Late-bound lookups: dependencies are accessed via callables to support
12 runtime patching (e.g., tests that swap event_sink after construction)
13"""
15from __future__ import annotations
17from typing import TYPE_CHECKING, Protocol
19from src.pipeline.agent_session_runner import SessionCallbacks
21if TYPE_CHECKING:
22 from collections.abc import Callable
23 from pathlib import Path
25 from src.core.protocols import GateResultProtocol, LogProvider, ReviewResultProtocol
26 from src.domain.lifecycle import RetryState
27 from src.domain.quality_gate import GateResult
28 from src.domain.validation.spec import ValidationSpec
29 from src.infra.clients.review_output_parser import ReviewResult
30 from src.core.protocols import MalaEventSink
31 from src.pipeline.review_runner import ReviewRunner
34class GateAsyncRunner(Protocol):
35 """Protocol for async gate check execution."""
37 async def run_gate_async(
38 self,
39 issue_id: str,
40 log_path: Path,
41 retry_state: RetryState,
42 ) -> tuple[GateResult | GateResultProtocol, int]:
43 """Run quality gate check asynchronously."""
44 ...
47class SessionCallbackFactory:
48 """Factory for building SessionCallbacks with injected dependencies.
50 This factory creates callbacks that bridge orchestrator state to the
51 pipeline runners without coupling the runners to orchestrator internals.
53 Usage:
54 factory = SessionCallbackFactory(
55 gate_async_runner=..., # Protocol for async gate checks
56 review_runner=...,
57 log_provider=...,
58 event_sink=...,
59 repo_path=...,
60 on_session_log_path=..., # Callback for session log path
61 on_review_log_path=..., # Callback for review log path
62 )
63 callbacks = factory.build(issue_id)
64 """
66 def __init__(
67 self,
68 gate_async_runner: GateAsyncRunner,
69 review_runner: ReviewRunner,
70 log_provider: Callable[[], LogProvider],
71 event_sink: Callable[[], MalaEventSink],
72 quality_gate: Callable[[], GateChecker],
73 repo_path: Path,
74 on_session_log_path: Callable[[str, Path], None],
75 on_review_log_path: Callable[[str, str], None],
76 get_per_issue_spec: GetPerIssueSpec,
77 is_verbose: IsVerboseCheck,
78 ) -> None:
79 """Initialize the factory with dependencies.
81 Args:
82 gate_async_runner: Protocol for running async gate checks.
83 review_runner: Runner for Cerberus code review.
84 log_provider: Callable returning the log provider (late-bound).
85 event_sink: Callable returning the event sink (late-bound).
86 quality_gate: Callable returning the gate checker (late-bound).
87 repo_path: Repository path for git operations.
88 on_session_log_path: Callback when session log path becomes known.
89 on_review_log_path: Callback when review log path becomes known.
90 get_per_issue_spec: Callable to get current per-issue spec.
91 is_verbose: Callable to check verbose mode.
93 Note:
94 log_provider, event_sink, and quality_gate are callables to support
95 late-bound lookups. This allows tests to patch orchestrator attributes
96 after factory construction and have the patches take effect.
97 """
98 self._gate_async_runner = gate_async_runner
99 self._review_runner = review_runner
100 self._get_log_provider = log_provider
101 self._get_event_sink = event_sink
102 self._get_quality_gate = quality_gate
103 self._repo_path = repo_path
104 self._on_session_log_path = on_session_log_path
105 self._on_review_log_path = on_review_log_path
106 self._get_per_issue_spec = get_per_issue_spec
107 self._is_verbose = is_verbose
109 def build(
110 self,
111 issue_id: str,
112 on_abort: Callable[[str], None] | None = None,
113 ) -> SessionCallbacks:
114 """Build SessionCallbacks for a specific issue.
116 Args:
117 issue_id: The issue ID for tracking state.
118 on_abort: Optional callback for fatal error signaling.
120 Returns:
121 SessionCallbacks with gate, review, and logging callbacks.
122 """
123 # Import here to avoid circular imports
124 from src.infra.git_utils import get_git_commit_async, get_issue_commits_async
125 from src.pipeline.review_runner import NoProgressInput, ReviewInput
127 async def on_gate_check(
128 issue_id: str, log_path: Path, retry_state: RetryState
129 ) -> tuple[GateResult | GateResultProtocol, int]:
130 return await self._gate_async_runner.run_gate_async(
131 issue_id, log_path, retry_state
132 )
134 async def on_review_check(
135 issue_id: str,
136 issue_desc: str | None,
137 baseline: str | None,
138 session_id: str | None,
139 retry_state: RetryState,
140 ) -> ReviewResult | ReviewResultProtocol:
141 current_head = await get_git_commit_async(self._repo_path)
142 self._review_runner.config.capture_session_log = self._is_verbose()
143 commit_shas = await get_issue_commits_async(
144 self._repo_path,
145 issue_id,
146 since_timestamp=retry_state.baseline_timestamp,
147 )
148 review_input = ReviewInput(
149 issue_id=issue_id,
150 repo_path=self._repo_path,
151 commit_sha=current_head,
152 issue_description=issue_desc,
153 baseline_commit=baseline,
154 commit_shas=commit_shas or None,
155 claude_session_id=session_id,
156 )
157 output = await self._review_runner.run_review(review_input)
158 if output.session_log_path:
159 self._on_review_log_path(issue_id, output.session_log_path)
160 return output.result
162 def on_review_no_progress(
163 log_path: Path,
164 log_offset: int,
165 prev_commit: str | None,
166 curr_commit: str | None,
167 ) -> bool:
168 no_progress_input = NoProgressInput(
169 log_path=log_path,
170 log_offset=log_offset,
171 previous_commit_hash=prev_commit,
172 current_commit_hash=curr_commit,
173 spec=self._get_per_issue_spec(),
174 )
175 return self._review_runner.check_no_progress(no_progress_input)
177 def get_log_path(session_id: str) -> Path:
178 log_path = self._get_log_provider().get_log_path(
179 self._repo_path, session_id
180 )
181 self._on_session_log_path(issue_id, log_path)
182 return log_path
184 def get_log_offset(log_path: Path, start_offset: int) -> int:
185 return self._get_quality_gate().get_log_end_offset(log_path, start_offset)
187 def on_tool_use(agent_id: str, tool_name: str, arguments: dict | None) -> None:
188 self._get_event_sink().on_tool_use(agent_id, tool_name, arguments=arguments)
190 def on_agent_text(agent_id: str, text: str) -> None:
191 self._get_event_sink().on_agent_text(agent_id, text)
193 return SessionCallbacks(
194 on_gate_check=on_gate_check,
195 on_review_check=on_review_check,
196 on_review_no_progress=on_review_no_progress,
197 get_log_path=get_log_path,
198 get_log_offset=get_log_offset,
199 on_abort=on_abort,
200 on_tool_use=on_tool_use,
201 on_agent_text=on_agent_text,
202 )
205# Protocol for getting per-issue spec
206class GetPerIssueSpec(Protocol):
207 """Protocol for getting the current per-issue validation spec."""
209 def __call__(self) -> ValidationSpec | None:
210 """Return the current per-issue spec, or None if not set."""
211 ...
214# Protocol for checking verbose mode
215class IsVerboseCheck(Protocol):
216 """Protocol for checking if verbose mode is enabled."""
218 def __call__(self) -> bool:
219 """Return True if verbose mode is enabled."""
220 ...
223# Protocol for gate checker (subset of GateChecker)
224class GateChecker(Protocol):
225 """Protocol for gate checking operations."""
227 def get_log_end_offset(self, log_path: Path, start_offset: int) -> int:
228 """Get the end offset of a log file."""
229 ...