Coverage for src / pipeline / session_callback_factory.py: 27%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""SessionCallbackFactory: Builds SessionCallbacks for agent sessions. 

2 

3This factory encapsulates callback construction that bridges orchestrator state 

4to the pipeline runners. It receives state references and returns a SessionCallbacks 

5instance wired to the appropriate callbacks. 

6 

7Design principles: 

8- Single responsibility: only builds callbacks, doesn't run gates/reviews 

9- Protocol-based dependencies for testability 

10- All callback closures capture minimal state 

11- Late-bound lookups: dependencies are accessed via callables to support 

12 runtime patching (e.g., tests that swap event_sink after construction) 

13""" 

14 

15from __future__ import annotations 

16 

17from typing import TYPE_CHECKING, Protocol 

18 

19from src.pipeline.agent_session_runner import SessionCallbacks 

20 

21if TYPE_CHECKING: 

22 from collections.abc import Callable 

23 from pathlib import Path 

24 

25 from src.core.protocols import GateResultProtocol, LogProvider, ReviewResultProtocol 

26 from src.domain.lifecycle import RetryState 

27 from src.domain.quality_gate import GateResult 

28 from src.domain.validation.spec import ValidationSpec 

29 from src.infra.clients.review_output_parser import ReviewResult 

30 from src.core.protocols import MalaEventSink 

31 from src.pipeline.review_runner import ReviewRunner 

32 

33 

34class GateAsyncRunner(Protocol): 

35 """Protocol for async gate check execution.""" 

36 

37 async def run_gate_async( 

38 self, 

39 issue_id: str, 

40 log_path: Path, 

41 retry_state: RetryState, 

42 ) -> tuple[GateResult | GateResultProtocol, int]: 

43 """Run quality gate check asynchronously.""" 

44 ... 

45 

46 

47class SessionCallbackFactory: 

48 """Factory for building SessionCallbacks with injected dependencies. 

49 

50 This factory creates callbacks that bridge orchestrator state to the 

51 pipeline runners without coupling the runners to orchestrator internals. 

52 

53 Usage: 

54 factory = SessionCallbackFactory( 

55 gate_async_runner=..., # Protocol for async gate checks 

56 review_runner=..., 

57 log_provider=..., 

58 event_sink=..., 

59 repo_path=..., 

60 on_session_log_path=..., # Callback for session log path 

61 on_review_log_path=..., # Callback for review log path 

62 ) 

63 callbacks = factory.build(issue_id) 

64 """ 

65 

66 def __init__( 

67 self, 

68 gate_async_runner: GateAsyncRunner, 

69 review_runner: ReviewRunner, 

70 log_provider: Callable[[], LogProvider], 

71 event_sink: Callable[[], MalaEventSink], 

72 quality_gate: Callable[[], GateChecker], 

73 repo_path: Path, 

74 on_session_log_path: Callable[[str, Path], None], 

75 on_review_log_path: Callable[[str, str], None], 

76 get_per_issue_spec: GetPerIssueSpec, 

77 is_verbose: IsVerboseCheck, 

78 ) -> None: 

79 """Initialize the factory with dependencies. 

80 

81 Args: 

82 gate_async_runner: Protocol for running async gate checks. 

83 review_runner: Runner for Cerberus code review. 

84 log_provider: Callable returning the log provider (late-bound). 

85 event_sink: Callable returning the event sink (late-bound). 

86 quality_gate: Callable returning the gate checker (late-bound). 

87 repo_path: Repository path for git operations. 

88 on_session_log_path: Callback when session log path becomes known. 

89 on_review_log_path: Callback when review log path becomes known. 

90 get_per_issue_spec: Callable to get current per-issue spec. 

91 is_verbose: Callable to check verbose mode. 

92 

93 Note: 

94 log_provider, event_sink, and quality_gate are callables to support 

95 late-bound lookups. This allows tests to patch orchestrator attributes 

96 after factory construction and have the patches take effect. 

97 """ 

98 self._gate_async_runner = gate_async_runner 

99 self._review_runner = review_runner 

100 self._get_log_provider = log_provider 

101 self._get_event_sink = event_sink 

102 self._get_quality_gate = quality_gate 

103 self._repo_path = repo_path 

104 self._on_session_log_path = on_session_log_path 

105 self._on_review_log_path = on_review_log_path 

106 self._get_per_issue_spec = get_per_issue_spec 

107 self._is_verbose = is_verbose 

108 

109 def build( 

110 self, 

111 issue_id: str, 

112 on_abort: Callable[[str], None] | None = None, 

113 ) -> SessionCallbacks: 

114 """Build SessionCallbacks for a specific issue. 

115 

116 Args: 

117 issue_id: The issue ID for tracking state. 

118 on_abort: Optional callback for fatal error signaling. 

119 

120 Returns: 

121 SessionCallbacks with gate, review, and logging callbacks. 

122 """ 

123 # Import here to avoid circular imports 

124 from src.infra.git_utils import get_git_commit_async, get_issue_commits_async 

125 from src.pipeline.review_runner import NoProgressInput, ReviewInput 

126 

127 async def on_gate_check( 

128 issue_id: str, log_path: Path, retry_state: RetryState 

129 ) -> tuple[GateResult | GateResultProtocol, int]: 

130 return await self._gate_async_runner.run_gate_async( 

131 issue_id, log_path, retry_state 

132 ) 

133 

134 async def on_review_check( 

135 issue_id: str, 

136 issue_desc: str | None, 

137 baseline: str | None, 

138 session_id: str | None, 

139 retry_state: RetryState, 

140 ) -> ReviewResult | ReviewResultProtocol: 

141 current_head = await get_git_commit_async(self._repo_path) 

142 self._review_runner.config.capture_session_log = self._is_verbose() 

143 commit_shas = await get_issue_commits_async( 

144 self._repo_path, 

145 issue_id, 

146 since_timestamp=retry_state.baseline_timestamp, 

147 ) 

148 review_input = ReviewInput( 

149 issue_id=issue_id, 

150 repo_path=self._repo_path, 

151 commit_sha=current_head, 

152 issue_description=issue_desc, 

153 baseline_commit=baseline, 

154 commit_shas=commit_shas or None, 

155 claude_session_id=session_id, 

156 ) 

157 output = await self._review_runner.run_review(review_input) 

158 if output.session_log_path: 

159 self._on_review_log_path(issue_id, output.session_log_path) 

160 return output.result 

161 

162 def on_review_no_progress( 

163 log_path: Path, 

164 log_offset: int, 

165 prev_commit: str | None, 

166 curr_commit: str | None, 

167 ) -> bool: 

168 no_progress_input = NoProgressInput( 

169 log_path=log_path, 

170 log_offset=log_offset, 

171 previous_commit_hash=prev_commit, 

172 current_commit_hash=curr_commit, 

173 spec=self._get_per_issue_spec(), 

174 ) 

175 return self._review_runner.check_no_progress(no_progress_input) 

176 

177 def get_log_path(session_id: str) -> Path: 

178 log_path = self._get_log_provider().get_log_path( 

179 self._repo_path, session_id 

180 ) 

181 self._on_session_log_path(issue_id, log_path) 

182 return log_path 

183 

184 def get_log_offset(log_path: Path, start_offset: int) -> int: 

185 return self._get_quality_gate().get_log_end_offset(log_path, start_offset) 

186 

187 def on_tool_use(agent_id: str, tool_name: str, arguments: dict | None) -> None: 

188 self._get_event_sink().on_tool_use(agent_id, tool_name, arguments=arguments) 

189 

190 def on_agent_text(agent_id: str, text: str) -> None: 

191 self._get_event_sink().on_agent_text(agent_id, text) 

192 

193 return SessionCallbacks( 

194 on_gate_check=on_gate_check, 

195 on_review_check=on_review_check, 

196 on_review_no_progress=on_review_no_progress, 

197 get_log_path=get_log_path, 

198 get_log_offset=get_log_offset, 

199 on_abort=on_abort, 

200 on_tool_use=on_tool_use, 

201 on_agent_text=on_agent_text, 

202 ) 

203 

204 

205# Protocol for getting per-issue spec 

206class GetPerIssueSpec(Protocol): 

207 """Protocol for getting the current per-issue validation spec.""" 

208 

209 def __call__(self) -> ValidationSpec | None: 

210 """Return the current per-issue spec, or None if not set.""" 

211 ... 

212 

213 

214# Protocol for checking verbose mode 

215class IsVerboseCheck(Protocol): 

216 """Protocol for checking if verbose mode is enabled.""" 

217 

218 def __call__(self) -> bool: 

219 """Return True if verbose mode is enabled.""" 

220 ... 

221 

222 

223# Protocol for gate checker (subset of GateChecker) 

224class GateChecker(Protocol): 

225 """Protocol for gate checking operations.""" 

226 

227 def get_log_end_offset(self, log_path: Path, start_offset: int) -> int: 

228 """Get the end offset of a log file.""" 

229 ...