Coverage for src / infra / io / base_sink.py: 54%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""Base event sink implementation with no-op defaults. 

2 

3Provides BaseEventSink with default no-op implementations of all protocol 

4methods, and NullEventSink for testing. 

5""" 

6 

7from typing import Any 

8 

9from src.core.protocols import DeadlockInfoProtocol, EventRunConfig, MalaEventSink 

10 

11 

12class BaseEventSink: 

13 """Base event sink with no-op implementations of all protocol methods. 

14 

15 Provides default no-op implementations for all MalaEventSink protocol 

16 methods. Subclasses can override only the methods they need to handle. 

17 

18 This eliminates the need to implement all 51 methods when creating a 

19 new event sink - just inherit from BaseEventSink and override what 

20 you need. 

21 

22 Example: 

23 class MyEventSink(BaseEventSink): 

24 def on_agent_started(self, agent_id: str, issue_id: str) -> None: 

25 print(f"Agent {agent_id} started on {issue_id}") 

26 # All other methods are no-ops by default 

27 """ 

28 

29 # ------------------------------------------------------------------------- 

30 # Run lifecycle 

31 # ------------------------------------------------------------------------- 

32 

33 def on_run_started(self, config: EventRunConfig) -> None: 

34 pass 

35 

36 def on_run_completed( 

37 self, 

38 success_count: int, 

39 total_count: int, 

40 run_validation_passed: bool, 

41 abort_reason: str | None = None, 

42 ) -> None: 

43 pass 

44 

45 def on_ready_issues(self, issue_ids: list[str]) -> None: 

46 pass 

47 

48 def on_waiting_for_agents(self, count: int) -> None: 

49 pass 

50 

51 def on_no_more_issues(self, reason: str) -> None: 

52 pass 

53 

54 # ------------------------------------------------------------------------- 

55 # Agent lifecycle 

56 # ------------------------------------------------------------------------- 

57 

58 def on_agent_started(self, agent_id: str, issue_id: str) -> None: 

59 pass 

60 

61 def on_agent_completed( 

62 self, 

63 agent_id: str, 

64 issue_id: str, 

65 success: bool, 

66 duration_seconds: float, 

67 summary: str, 

68 ) -> None: 

69 pass 

70 

71 def on_claim_failed(self, agent_id: str, issue_id: str) -> None: 

72 pass 

73 

74 # ------------------------------------------------------------------------- 

75 # SDK message streaming 

76 # ------------------------------------------------------------------------- 

77 

78 def on_tool_use( 

79 self, 

80 agent_id: str, 

81 tool_name: str, 

82 description: str = "", 

83 arguments: dict[str, Any] | None = None, 

84 ) -> None: 

85 pass 

86 

87 def on_agent_text(self, agent_id: str, text: str) -> None: 

88 pass 

89 

90 # ------------------------------------------------------------------------- 

91 # Quality gate events 

92 # ------------------------------------------------------------------------- 

93 

94 def on_gate_started( 

95 self, 

96 agent_id: str | None, 

97 attempt: int, 

98 max_attempts: int, 

99 issue_id: str | None = None, 

100 ) -> None: 

101 pass 

102 

103 def on_gate_passed( 

104 self, 

105 agent_id: str | None, 

106 issue_id: str | None = None, 

107 ) -> None: 

108 pass 

109 

110 def on_gate_failed( 

111 self, 

112 agent_id: str | None, 

113 attempt: int, 

114 max_attempts: int, 

115 issue_id: str | None = None, 

116 ) -> None: 

117 pass 

118 

119 def on_gate_retry( 

120 self, 

121 agent_id: str, 

122 attempt: int, 

123 max_attempts: int, 

124 issue_id: str | None = None, 

125 ) -> None: 

126 pass 

127 

128 def on_gate_result( 

129 self, 

130 agent_id: str | None, 

131 passed: bool, 

132 failure_reasons: list[str] | None = None, 

133 issue_id: str | None = None, 

134 ) -> None: 

135 pass 

136 

137 # ------------------------------------------------------------------------- 

138 # Codex review events 

139 # ------------------------------------------------------------------------- 

140 

141 def on_review_started( 

142 self, 

143 agent_id: str, 

144 attempt: int, 

145 max_attempts: int, 

146 issue_id: str | None = None, 

147 ) -> None: 

148 pass 

149 

150 def on_review_passed( 

151 self, 

152 agent_id: str, 

153 issue_id: str | None = None, 

154 ) -> None: 

155 pass 

156 

157 def on_review_retry( 

158 self, 

159 agent_id: str, 

160 attempt: int, 

161 max_attempts: int, 

162 error_count: int | None = None, 

163 parse_error: str | None = None, 

164 issue_id: str | None = None, 

165 ) -> None: 

166 pass 

167 

168 def on_review_warning( 

169 self, 

170 message: str, 

171 agent_id: str | None = None, 

172 issue_id: str | None = None, 

173 ) -> None: 

174 pass 

175 

176 # ------------------------------------------------------------------------- 

177 # Fixer agent events 

178 # ------------------------------------------------------------------------- 

179 

180 def on_fixer_started( 

181 self, 

182 attempt: int, 

183 max_attempts: int, 

184 ) -> None: 

185 pass 

186 

187 def on_fixer_completed(self, result: str) -> None: 

188 pass 

189 

190 def on_fixer_failed(self, reason: str) -> None: 

191 pass 

192 

193 # ------------------------------------------------------------------------- 

194 # Issue lifecycle 

195 # ------------------------------------------------------------------------- 

196 

197 def on_issue_closed(self, agent_id: str, issue_id: str) -> None: 

198 pass 

199 

200 def on_issue_completed( 

201 self, 

202 agent_id: str, 

203 issue_id: str, 

204 success: bool, 

205 duration_seconds: float, 

206 summary: str, 

207 ) -> None: 

208 pass 

209 

210 def on_epic_closed(self, agent_id: str) -> None: 

211 pass 

212 

213 def on_validation_started( 

214 self, 

215 agent_id: str, 

216 issue_id: str | None = None, 

217 ) -> None: 

218 pass 

219 

220 def on_validation_result( 

221 self, 

222 agent_id: str, 

223 passed: bool, 

224 issue_id: str | None = None, 

225 ) -> None: 

226 pass 

227 

228 def on_validation_step_running( 

229 self, 

230 step_name: str, 

231 agent_id: str | None = None, 

232 ) -> None: 

233 pass 

234 

235 def on_validation_step_skipped( 

236 self, 

237 step_name: str, 

238 reason: str, 

239 agent_id: str | None = None, 

240 ) -> None: 

241 pass 

242 

243 def on_validation_step_passed( 

244 self, 

245 step_name: str, 

246 duration_seconds: float, 

247 agent_id: str | None = None, 

248 ) -> None: 

249 pass 

250 

251 def on_validation_step_failed( 

252 self, 

253 step_name: str, 

254 exit_code: int, 

255 agent_id: str | None = None, 

256 ) -> None: 

257 pass 

258 

259 # ------------------------------------------------------------------------- 

260 # Warnings and diagnostics 

261 # ------------------------------------------------------------------------- 

262 

263 def on_warning(self, message: str, agent_id: str | None = None) -> None: 

264 pass 

265 

266 def on_log_timeout(self, agent_id: str, log_path: str) -> None: 

267 pass 

268 

269 def on_locks_cleaned(self, agent_id: str, count: int) -> None: 

270 pass 

271 

272 def on_locks_released(self, count: int) -> None: 

273 pass 

274 

275 def on_issues_committed(self) -> None: 

276 pass 

277 

278 def on_run_metadata_saved(self, path: str) -> None: 

279 pass 

280 

281 def on_run_level_validation_disabled(self) -> None: 

282 pass 

283 

284 def on_abort_requested(self, reason: str) -> None: 

285 pass 

286 

287 def on_tasks_aborting(self, count: int, reason: str) -> None: 

288 pass 

289 

290 # ------------------------------------------------------------------------- 

291 # Epic verification lifecycle 

292 # ------------------------------------------------------------------------- 

293 

294 def on_epic_verification_started(self, epic_id: str) -> None: 

295 pass 

296 

297 def on_epic_verification_passed(self, epic_id: str, confidence: float) -> None: 

298 pass 

299 

300 def on_epic_verification_failed( 

301 self, 

302 epic_id: str, 

303 unmet_count: int, 

304 remediation_ids: list[str], 

305 ) -> None: 

306 pass 

307 

308 def on_epic_remediation_created( 

309 self, 

310 epic_id: str, 

311 issue_id: str, 

312 criterion: str, 

313 ) -> None: 

314 pass 

315 

316 # ------------------------------------------------------------------------- 

317 # Pipeline module events 

318 # ------------------------------------------------------------------------- 

319 

320 def on_lifecycle_state(self, agent_id: str, state: str) -> None: 

321 pass 

322 

323 def on_log_waiting(self, agent_id: str) -> None: 

324 pass 

325 

326 def on_log_ready(self, agent_id: str) -> None: 

327 pass 

328 

329 def on_review_skipped_no_progress(self, agent_id: str) -> None: 

330 pass 

331 

332 def on_fixer_text(self, attempt: int, text: str) -> None: 

333 pass 

334 

335 def on_fixer_tool_use( 

336 self, 

337 attempt: int, 

338 tool_name: str, 

339 arguments: dict[str, Any] | None = None, 

340 ) -> None: 

341 pass 

342 

343 def on_deadlock_detected(self, info: DeadlockInfoProtocol) -> None: 

344 pass 

345 

346 

347class NullEventSink(BaseEventSink): 

348 """No-op event sink for testing. 

349 

350 Inherits all no-op implementations from BaseEventSink. This class exists 

351 for backward compatibility and semantic clarity - use NullEventSink when 

352 you explicitly want no side effects (e.g., in tests). 

353 

354 Example: 

355 from src.orchestration.factory import create_orchestrator, OrchestratorDependencies 

356 

357 sink = NullEventSink() 

358 deps = OrchestratorDependencies(event_sink=sink) 

359 orchestrator = create_orchestrator(config, deps=deps) 

360 await orchestrator.run() # No console output 

361 """ 

362 

363 pass 

364 

365 

366# Protocol assertions to verify implementation compliance 

367assert isinstance(BaseEventSink(), MalaEventSink) 

368assert isinstance(NullEventSink(), MalaEventSink)