Coverage for little_loops / parallel / types.py: 86%

136 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Type definitions for parallel issue processing. 

2 

3Provides dataclasses for worker results, merge requests, orchestrator state, 

4and parallel configuration. Reuses IssueInfo from issue_parser.py. 

5""" 

6 

7from __future__ import annotations 

8 

9import time 

10from dataclasses import dataclass, field 

11from enum import Enum 

12from pathlib import Path 

13from typing import TYPE_CHECKING, Any 

14 

15if TYPE_CHECKING: 

16 from little_loops.issue_parser import IssueInfo 

17 

18 

19@dataclass 

20class QueuedIssue: 

21 """Issue in priority queue with ordering support. 

22 

23 Uses __lt__ for priority queue comparison. Lower priority number = higher priority. 

24 Within same priority level, earlier timestamp wins (FIFO). 

25 

26 Attributes: 

27 priority: Numeric priority (0=P0, 1=P1, etc.) 

28 issue_info: The parsed issue information 

29 timestamp: When the issue was queued (for FIFO ordering) 

30 """ 

31 

32 priority: int 

33 issue_info: IssueInfo 

34 timestamp: float = field(default_factory=time.time) 

35 

36 def __lt__(self, other: QueuedIssue) -> bool: 

37 """Compare issues for priority queue ordering.""" 

38 if self.priority != other.priority: 

39 return self.priority < other.priority 

40 return self.timestamp < other.timestamp 

41 

42 def to_dict(self) -> dict[str, Any]: 

43 """Convert to dictionary for JSON serialization.""" 

44 return { 

45 "priority": self.priority, 

46 "issue_info": self.issue_info.to_dict(), 

47 "timestamp": self.timestamp, 

48 } 

49 

50 

51@dataclass 

52class WorkerResult: 

53 """Result from a worker processing an issue. 

54 

55 Attributes: 

56 issue_id: ID of the processed issue 

57 success: Whether processing succeeded 

58 branch_name: Git branch created for this issue 

59 worktree_path: Path to the worker's git worktree 

60 changed_files: List of files modified during processing 

61 leaked_files: Files incorrectly written to main repo instead of worktree 

62 duration: Processing time in seconds 

63 error: Error message if processing failed 

64 stdout: Captured standard output 

65 stderr: Captured standard error 

66 was_corrected: Whether the issue file was auto-corrected 

67 corrections: List of corrections made during validation (for pattern analysis) 

68 should_close: Whether the issue should be closed (e.g., already fixed, invalid) 

69 close_reason: Reason code for closure (e.g., "already_fixed") 

70 close_status: Status text for closure (e.g., "Closed - Already Fixed") 

71 interrupted: Whether the worker was interrupted during shutdown 

72 """ 

73 

74 issue_id: str 

75 success: bool 

76 branch_name: str 

77 worktree_path: Path 

78 changed_files: list[str] = field(default_factory=list) 

79 leaked_files: list[str] = field(default_factory=list) 

80 duration: float = 0.0 

81 error: str | None = None 

82 stdout: str = "" 

83 stderr: str = "" 

84 was_corrected: bool = False 

85 corrections: list[str] = field(default_factory=list) 

86 should_close: bool = False 

87 close_reason: str | None = None 

88 close_status: str | None = None 

89 was_blocked: bool = False 

90 interrupted: bool = False 

91 

92 def to_dict(self) -> dict[str, Any]: 

93 """Convert to dictionary for JSON serialization.""" 

94 return { 

95 "issue_id": self.issue_id, 

96 "success": self.success, 

97 "branch_name": self.branch_name, 

98 "worktree_path": str(self.worktree_path), 

99 "changed_files": self.changed_files, 

100 "leaked_files": self.leaked_files, 

101 "duration": self.duration, 

102 "error": self.error, 

103 "stdout": self.stdout, 

104 "stderr": self.stderr, 

105 "was_corrected": self.was_corrected, 

106 "corrections": self.corrections, 

107 "should_close": self.should_close, 

108 "close_reason": self.close_reason, 

109 "close_status": self.close_status, 

110 "was_blocked": self.was_blocked, 

111 "interrupted": self.interrupted, 

112 } 

113 

114 @classmethod 

115 def from_dict(cls, data: dict[str, Any]) -> WorkerResult: 

116 """Create from dictionary (JSON deserialization).""" 

117 return cls( 

118 issue_id=data["issue_id"], 

119 success=data["success"], 

120 branch_name=data["branch_name"], 

121 worktree_path=Path(data["worktree_path"]), 

122 changed_files=data.get("changed_files", []), 

123 leaked_files=data.get("leaked_files", []), 

124 duration=data.get("duration", 0.0), 

125 error=data.get("error"), 

126 stdout=data.get("stdout", ""), 

127 stderr=data.get("stderr", ""), 

128 was_corrected=data.get("was_corrected", False), 

129 corrections=data.get("corrections", []), 

130 should_close=data.get("should_close", False), 

131 close_reason=data.get("close_reason"), 

132 close_status=data.get("close_status"), 

133 was_blocked=data.get("was_blocked", False), 

134 interrupted=data.get("interrupted", False), 

135 ) 

136 

137 

138class MergeStatus(Enum): 

139 """Status of a merge operation.""" 

140 

141 PENDING = "pending" 

142 IN_PROGRESS = "in_progress" 

143 SUCCESS = "success" 

144 CONFLICT = "conflict" 

145 FAILED = "failed" 

146 RETRYING = "retrying" 

147 

148 

149class WorkerStage(Enum): 

150 """Processing stage of a worker. 

151 

152 Stages progress in order: 

153 - SETUP: Creating git worktree and copying .claude/ directory 

154 - VALIDATING: Running ready-issue command 

155 - IMPLEMENTING: Running manage-issue command 

156 - VERIFYING: Checking work was done and updating branch base 

157 - MERGING: Awaiting merge coordination 

158 - COMPLETED: Successfully finished 

159 - FAILED: Failed at some stage 

160 - INTERRUPTED: Interrupted during shutdown 

161 """ 

162 

163 SETUP = "setup" 

164 VALIDATING = "validating" 

165 IMPLEMENTING = "implementing" 

166 VERIFYING = "verifying" 

167 MERGING = "merging" 

168 COMPLETED = "completed" 

169 FAILED = "failed" 

170 INTERRUPTED = "interrupted" 

171 

172 

173@dataclass 

174class MergeRequest: 

175 """Request to merge a completed worker's changes. 

176 

177 Attributes: 

178 worker_result: The result from the worker 

179 status: Current merge status 

180 retry_count: Number of merge/rebase attempts 

181 error: Error message if merge failed 

182 queued_at: When the merge was requested 

183 """ 

184 

185 worker_result: WorkerResult 

186 status: MergeStatus = MergeStatus.PENDING 

187 retry_count: int = 0 

188 error: str | None = None 

189 queued_at: float = field(default_factory=time.time) 

190 

191 def to_dict(self) -> dict[str, Any]: 

192 """Convert to dictionary for JSON serialization.""" 

193 return { 

194 "worker_result": self.worker_result.to_dict(), 

195 "status": self.status.value, 

196 "retry_count": self.retry_count, 

197 "error": self.error, 

198 "queued_at": self.queued_at, 

199 } 

200 

201 

202@dataclass 

203class OrchestratorState: 

204 """Persistent state for the parallel orchestrator. 

205 

206 Enables resume capability after interruption. 

207 

208 Attributes: 

209 in_progress_issues: Issues currently being processed by workers 

210 completed_issues: Successfully completed issue IDs 

211 failed_issues: Mapping of issue ID to failure reason 

212 pending_merges: Issues awaiting merge 

213 timing: Per-issue timing breakdown 

214 corrections: Mapping of issue ID to list of corrections made (for pattern analysis) 

215 started_at: When orchestration started 

216 last_checkpoint: Last state save timestamp 

217 """ 

218 

219 in_progress_issues: list[str] = field(default_factory=list) 

220 completed_issues: list[str] = field(default_factory=list) 

221 failed_issues: dict[str, str] = field(default_factory=dict) 

222 pending_merges: list[str] = field(default_factory=list) 

223 timing: dict[str, dict[str, float]] = field(default_factory=dict) 

224 corrections: dict[str, list[str]] = field(default_factory=dict) 

225 started_at: str = "" 

226 last_checkpoint: str = "" 

227 

228 def to_dict(self) -> dict[str, Any]: 

229 """Convert state to dictionary for JSON serialization.""" 

230 return { 

231 "in_progress_issues": self.in_progress_issues, 

232 "completed_issues": self.completed_issues, 

233 "failed_issues": self.failed_issues, 

234 "pending_merges": self.pending_merges, 

235 "timing": self.timing, 

236 "corrections": self.corrections, 

237 "started_at": self.started_at, 

238 "last_checkpoint": self.last_checkpoint, 

239 } 

240 

241 @classmethod 

242 def from_dict(cls, data: dict[str, Any]) -> OrchestratorState: 

243 """Create state from dictionary (JSON deserialization).""" 

244 return cls( 

245 in_progress_issues=data.get("in_progress_issues", []), 

246 completed_issues=data.get("completed_issues", []), 

247 failed_issues=data.get("failed_issues", {}), 

248 pending_merges=data.get("pending_merges", []), 

249 timing=data.get("timing", {}), 

250 corrections=data.get("corrections", {}), 

251 started_at=data.get("started_at", ""), 

252 last_checkpoint=data.get("last_checkpoint", ""), 

253 ) 

254 

255 

256@dataclass 

257class PendingWorktreeInfo: 

258 """Information about a pending worktree from a previous run. 

259 

260 Attributes: 

261 worktree_path: Path to the worktree directory 

262 branch_name: Git branch name (parallel/<issue-id>-<timestamp>) 

263 issue_id: Extracted issue ID (e.g., BUG-045) 

264 commits_ahead: Number of commits ahead of main 

265 has_uncommitted_changes: Whether there are uncommitted changes 

266 changed_files: List of files with uncommitted changes 

267 """ 

268 

269 worktree_path: Path 

270 branch_name: str 

271 issue_id: str 

272 commits_ahead: int 

273 has_uncommitted_changes: bool 

274 changed_files: list[str] = field(default_factory=list) 

275 

276 @property 

277 def has_pending_work(self) -> bool: 

278 """Return True if this worktree has work that could be merged.""" 

279 return self.commits_ahead > 0 or self.has_uncommitted_changes 

280 

281 

282@dataclass 

283class ParallelConfig: 

284 """Configuration for the parallel issue manager. 

285 

286 Supports configurable command templates for different project setups. 

287 Commands use placeholders: {{issue_id}}, {{issue_type}}, {{action}} 

288 

289 Attributes: 

290 max_workers: Number of parallel workers (default: 2) 

291 p0_sequential: Process P0 issues sequentially (default: True) 

292 merge_interval: Seconds between merge attempts (default: 30.0) 

293 worktree_base: Base directory for git worktrees 

294 state_file: Path to state persistence file 

295 max_merge_retries: Maximum rebase attempts before giving up (default: 2) 

296 priority_filter: Which priority levels to process 

297 max_issues: Maximum issues to process (0 = unlimited) 

298 dry_run: Preview mode without actual processing 

299 timeout_per_issue: Timeout in seconds for each issue (default: 7200) 

300 orchestrator_timeout: Timeout for waiting on workers (default: 0 = auto) 

301 stream_subprocess_output: Whether to stream subprocess output 

302 show_model: Make API call to verify and display model on worktree setup 

303 command_prefix: Prefix for slash commands (default: "/ll:") 

304 ready_command: Template for ready-issue command 

305 manage_command: Template for manage-issue command 

306 only_ids: If provided, only process these issue IDs 

307 skip_ids: Issue IDs to skip (in addition to completed/failed) 

308 merge_pending: Attempt to merge pending worktrees from previous runs 

309 clean_start: Remove all worktrees without checking for pending work 

310 ignore_pending: Report pending work but continue without merging 

311 """ 

312 

313 max_workers: int = 2 

314 p0_sequential: bool = True 

315 merge_interval: float = 30.0 

316 worktree_base: Path = field(default_factory=lambda: Path(".worktrees")) 

317 state_file: Path = field(default_factory=lambda: Path(".parallel-manage-state.json")) 

318 max_merge_retries: int = 2 

319 priority_filter: list[str] = field(default_factory=lambda: ["P0", "P1", "P2", "P3", "P4", "P5"]) 

320 max_issues: int = 0 

321 dry_run: bool = False 

322 timeout_per_issue: int = 3600 

323 idle_timeout_per_issue: int = 0 # Kill if no output for N seconds (0 to disable) 

324 orchestrator_timeout: int = 0 # 0 = use timeout_per_issue * max_workers 

325 stream_subprocess_output: bool = False 

326 show_model: bool = False # Make API call to verify model on worktree setup 

327 # Configurable command templates 

328 command_prefix: str = "/ll:" 

329 ready_command: str = "ready-issue {{issue_id}}" 

330 manage_command: str = "manage-issue {{issue_type}} {{action}} {{issue_id}}" 

331 # Issue ID filters 

332 only_ids: set[str] | None = None 

333 skip_ids: set[str] | None = None 

334 type_prefixes: set[str] | None = None 

335 # Validation settings 

336 require_code_changes: bool = True # If False, allow changes to only excluded dirs 

337 # Additional files to copy from main repo to worktrees 

338 # Note: .claude/ directory is always copied automatically (see worker_pool.py) 

339 worktree_copy_files: list[str] = field( 

340 default_factory=lambda: [".claude/settings.local.json", ".env"] 

341 ) 

342 # Pending worktree handling flags 

343 merge_pending: bool = False # Attempt to merge pending worktrees 

344 clean_start: bool = False # Remove all worktrees without checking 

345 ignore_pending: bool = False # Report pending work but continue 

346 # Overlap detection settings (ENH-143) 

347 overlap_detection: bool = False # Enable pre-flight overlap detection 

348 serialize_overlapping: bool = True # If True, defer overlapping issues; if False, just warn 

349 # Base branch for rebase/merge operations (auto-detected at startup) 

350 base_branch: str = "main" 

351 

352 def get_ready_command(self, issue_id: str) -> str: 

353 """Build the ready-issue command string. 

354 

355 Args: 

356 issue_id: Issue identifier 

357 

358 Returns: 

359 Complete command string 

360 """ 

361 cmd = self.ready_command.replace("{{issue_id}}", issue_id) 

362 return f"{self.command_prefix}{cmd}" 

363 

364 def get_manage_command(self, issue_type: str, action: str, issue_id: str) -> str: 

365 """Build the manage-issue command string. 

366 

367 Args: 

368 issue_type: Type of issue (bug, feature, enhancement) 

369 action: Action to perform (fix, implement, improve) 

370 issue_id: Issue identifier 

371 

372 Returns: 

373 Complete command string 

374 """ 

375 cmd = ( 

376 self.manage_command.replace("{{issue_type}}", issue_type) 

377 .replace("{{action}}", action) 

378 .replace("{{issue_id}}", issue_id) 

379 ) 

380 return f"{self.command_prefix}{cmd}" 

381 

382 def to_dict(self) -> dict[str, Any]: 

383 """Convert to dictionary for JSON serialization.""" 

384 return { 

385 "max_workers": self.max_workers, 

386 "p0_sequential": self.p0_sequential, 

387 "merge_interval": self.merge_interval, 

388 "worktree_base": str(self.worktree_base), 

389 "state_file": str(self.state_file), 

390 "max_merge_retries": self.max_merge_retries, 

391 "priority_filter": self.priority_filter, 

392 "max_issues": self.max_issues, 

393 "dry_run": self.dry_run, 

394 "timeout_per_issue": self.timeout_per_issue, 

395 "idle_timeout_per_issue": self.idle_timeout_per_issue, 

396 "orchestrator_timeout": self.orchestrator_timeout, 

397 "stream_subprocess_output": self.stream_subprocess_output, 

398 "show_model": self.show_model, 

399 "command_prefix": self.command_prefix, 

400 "ready_command": self.ready_command, 

401 "manage_command": self.manage_command, 

402 "only_ids": list(self.only_ids) if self.only_ids else None, 

403 "skip_ids": list(self.skip_ids) if self.skip_ids else None, 

404 "type_prefixes": list(self.type_prefixes) if self.type_prefixes else None, 

405 "require_code_changes": self.require_code_changes, 

406 "merge_pending": self.merge_pending, 

407 "clean_start": self.clean_start, 

408 "ignore_pending": self.ignore_pending, 

409 "overlap_detection": self.overlap_detection, 

410 "serialize_overlapping": self.serialize_overlapping, 

411 "base_branch": self.base_branch, 

412 } 

413 

414 @classmethod 

415 def from_dict(cls, data: dict[str, Any]) -> ParallelConfig: 

416 """Create from dictionary (JSON deserialization).""" 

417 only_ids_data = data.get("only_ids") 

418 skip_ids_data = data.get("skip_ids") 

419 type_prefixes_data = data.get("type_prefixes") 

420 return cls( 

421 max_workers=data.get("max_workers", 2), 

422 p0_sequential=data.get("p0_sequential", True), 

423 merge_interval=data.get("merge_interval", 30.0), 

424 worktree_base=Path(data.get("worktree_base", ".worktrees")), 

425 state_file=Path(data.get("state_file", ".parallel-manage-state.json")), 

426 max_merge_retries=data.get("max_merge_retries", 2), 

427 priority_filter=data.get("priority_filter", ["P0", "P1", "P2", "P3", "P4", "P5"]), 

428 max_issues=data.get("max_issues", 0), 

429 dry_run=data.get("dry_run", False), 

430 timeout_per_issue=data.get("timeout_per_issue", 7200), 

431 idle_timeout_per_issue=data.get("idle_timeout_per_issue", 0), 

432 orchestrator_timeout=data.get("orchestrator_timeout", 0), 

433 stream_subprocess_output=data.get("stream_subprocess_output", False), 

434 show_model=data.get("show_model", False), 

435 command_prefix=data.get("command_prefix", "/ll:"), 

436 ready_command=data.get("ready_command", "ready-issue {{issue_id}}"), 

437 manage_command=data.get( 

438 "manage_command", "manage-issue {{issue_type}} {{action}} {{issue_id}}" 

439 ), 

440 only_ids=set(only_ids_data) if only_ids_data else None, 

441 skip_ids=set(skip_ids_data) if skip_ids_data else None, 

442 type_prefixes=set(type_prefixes_data) if type_prefixes_data else None, 

443 require_code_changes=data.get("require_code_changes", True), 

444 merge_pending=data.get("merge_pending", False), 

445 clean_start=data.get("clean_start", False), 

446 ignore_pending=data.get("ignore_pending", False), 

447 overlap_detection=data.get("overlap_detection", False), 

448 serialize_overlapping=data.get("serialize_overlapping", True), 

449 base_branch=data.get("base_branch", "main"), 

450 )