Coverage for little_loops / parallel / types.py: 86%
136 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Type definitions for parallel issue processing.
3Provides dataclasses for worker results, merge requests, orchestrator state,
4and parallel configuration. Reuses IssueInfo from issue_parser.py.
5"""
7from __future__ import annotations
9import time
10from dataclasses import dataclass, field
11from enum import Enum
12from pathlib import Path
13from typing import TYPE_CHECKING, Any
15if TYPE_CHECKING:
16 from little_loops.issue_parser import IssueInfo
19@dataclass
20class QueuedIssue:
21 """Issue in priority queue with ordering support.
23 Uses __lt__ for priority queue comparison. Lower priority number = higher priority.
24 Within same priority level, earlier timestamp wins (FIFO).
26 Attributes:
27 priority: Numeric priority (0=P0, 1=P1, etc.)
28 issue_info: The parsed issue information
29 timestamp: When the issue was queued (for FIFO ordering)
30 """
32 priority: int
33 issue_info: IssueInfo
34 timestamp: float = field(default_factory=time.time)
36 def __lt__(self, other: QueuedIssue) -> bool:
37 """Compare issues for priority queue ordering."""
38 if self.priority != other.priority:
39 return self.priority < other.priority
40 return self.timestamp < other.timestamp
42 def to_dict(self) -> dict[str, Any]:
43 """Convert to dictionary for JSON serialization."""
44 return {
45 "priority": self.priority,
46 "issue_info": self.issue_info.to_dict(),
47 "timestamp": self.timestamp,
48 }
51@dataclass
52class WorkerResult:
53 """Result from a worker processing an issue.
55 Attributes:
56 issue_id: ID of the processed issue
57 success: Whether processing succeeded
58 branch_name: Git branch created for this issue
59 worktree_path: Path to the worker's git worktree
60 changed_files: List of files modified during processing
61 leaked_files: Files incorrectly written to main repo instead of worktree
62 duration: Processing time in seconds
63 error: Error message if processing failed
64 stdout: Captured standard output
65 stderr: Captured standard error
66 was_corrected: Whether the issue file was auto-corrected
67 corrections: List of corrections made during validation (for pattern analysis)
68 should_close: Whether the issue should be closed (e.g., already fixed, invalid)
69 close_reason: Reason code for closure (e.g., "already_fixed")
70 close_status: Status text for closure (e.g., "Closed - Already Fixed")
71 interrupted: Whether the worker was interrupted during shutdown
72 """
74 issue_id: str
75 success: bool
76 branch_name: str
77 worktree_path: Path
78 changed_files: list[str] = field(default_factory=list)
79 leaked_files: list[str] = field(default_factory=list)
80 duration: float = 0.0
81 error: str | None = None
82 stdout: str = ""
83 stderr: str = ""
84 was_corrected: bool = False
85 corrections: list[str] = field(default_factory=list)
86 should_close: bool = False
87 close_reason: str | None = None
88 close_status: str | None = None
89 was_blocked: bool = False
90 interrupted: bool = False
92 def to_dict(self) -> dict[str, Any]:
93 """Convert to dictionary for JSON serialization."""
94 return {
95 "issue_id": self.issue_id,
96 "success": self.success,
97 "branch_name": self.branch_name,
98 "worktree_path": str(self.worktree_path),
99 "changed_files": self.changed_files,
100 "leaked_files": self.leaked_files,
101 "duration": self.duration,
102 "error": self.error,
103 "stdout": self.stdout,
104 "stderr": self.stderr,
105 "was_corrected": self.was_corrected,
106 "corrections": self.corrections,
107 "should_close": self.should_close,
108 "close_reason": self.close_reason,
109 "close_status": self.close_status,
110 "was_blocked": self.was_blocked,
111 "interrupted": self.interrupted,
112 }
114 @classmethod
115 def from_dict(cls, data: dict[str, Any]) -> WorkerResult:
116 """Create from dictionary (JSON deserialization)."""
117 return cls(
118 issue_id=data["issue_id"],
119 success=data["success"],
120 branch_name=data["branch_name"],
121 worktree_path=Path(data["worktree_path"]),
122 changed_files=data.get("changed_files", []),
123 leaked_files=data.get("leaked_files", []),
124 duration=data.get("duration", 0.0),
125 error=data.get("error"),
126 stdout=data.get("stdout", ""),
127 stderr=data.get("stderr", ""),
128 was_corrected=data.get("was_corrected", False),
129 corrections=data.get("corrections", []),
130 should_close=data.get("should_close", False),
131 close_reason=data.get("close_reason"),
132 close_status=data.get("close_status"),
133 was_blocked=data.get("was_blocked", False),
134 interrupted=data.get("interrupted", False),
135 )
138class MergeStatus(Enum):
139 """Status of a merge operation."""
141 PENDING = "pending"
142 IN_PROGRESS = "in_progress"
143 SUCCESS = "success"
144 CONFLICT = "conflict"
145 FAILED = "failed"
146 RETRYING = "retrying"
149class WorkerStage(Enum):
150 """Processing stage of a worker.
152 Stages progress in order:
153 - SETUP: Creating git worktree and copying .claude/ directory
154 - VALIDATING: Running ready-issue command
155 - IMPLEMENTING: Running manage-issue command
156 - VERIFYING: Checking work was done and updating branch base
157 - MERGING: Awaiting merge coordination
158 - COMPLETED: Successfully finished
159 - FAILED: Failed at some stage
160 - INTERRUPTED: Interrupted during shutdown
161 """
163 SETUP = "setup"
164 VALIDATING = "validating"
165 IMPLEMENTING = "implementing"
166 VERIFYING = "verifying"
167 MERGING = "merging"
168 COMPLETED = "completed"
169 FAILED = "failed"
170 INTERRUPTED = "interrupted"
173@dataclass
174class MergeRequest:
175 """Request to merge a completed worker's changes.
177 Attributes:
178 worker_result: The result from the worker
179 status: Current merge status
180 retry_count: Number of merge/rebase attempts
181 error: Error message if merge failed
182 queued_at: When the merge was requested
183 """
185 worker_result: WorkerResult
186 status: MergeStatus = MergeStatus.PENDING
187 retry_count: int = 0
188 error: str | None = None
189 queued_at: float = field(default_factory=time.time)
191 def to_dict(self) -> dict[str, Any]:
192 """Convert to dictionary for JSON serialization."""
193 return {
194 "worker_result": self.worker_result.to_dict(),
195 "status": self.status.value,
196 "retry_count": self.retry_count,
197 "error": self.error,
198 "queued_at": self.queued_at,
199 }
202@dataclass
203class OrchestratorState:
204 """Persistent state for the parallel orchestrator.
206 Enables resume capability after interruption.
208 Attributes:
209 in_progress_issues: Issues currently being processed by workers
210 completed_issues: Successfully completed issue IDs
211 failed_issues: Mapping of issue ID to failure reason
212 pending_merges: Issues awaiting merge
213 timing: Per-issue timing breakdown
214 corrections: Mapping of issue ID to list of corrections made (for pattern analysis)
215 started_at: When orchestration started
216 last_checkpoint: Last state save timestamp
217 """
219 in_progress_issues: list[str] = field(default_factory=list)
220 completed_issues: list[str] = field(default_factory=list)
221 failed_issues: dict[str, str] = field(default_factory=dict)
222 pending_merges: list[str] = field(default_factory=list)
223 timing: dict[str, dict[str, float]] = field(default_factory=dict)
224 corrections: dict[str, list[str]] = field(default_factory=dict)
225 started_at: str = ""
226 last_checkpoint: str = ""
228 def to_dict(self) -> dict[str, Any]:
229 """Convert state to dictionary for JSON serialization."""
230 return {
231 "in_progress_issues": self.in_progress_issues,
232 "completed_issues": self.completed_issues,
233 "failed_issues": self.failed_issues,
234 "pending_merges": self.pending_merges,
235 "timing": self.timing,
236 "corrections": self.corrections,
237 "started_at": self.started_at,
238 "last_checkpoint": self.last_checkpoint,
239 }
241 @classmethod
242 def from_dict(cls, data: dict[str, Any]) -> OrchestratorState:
243 """Create state from dictionary (JSON deserialization)."""
244 return cls(
245 in_progress_issues=data.get("in_progress_issues", []),
246 completed_issues=data.get("completed_issues", []),
247 failed_issues=data.get("failed_issues", {}),
248 pending_merges=data.get("pending_merges", []),
249 timing=data.get("timing", {}),
250 corrections=data.get("corrections", {}),
251 started_at=data.get("started_at", ""),
252 last_checkpoint=data.get("last_checkpoint", ""),
253 )
256@dataclass
257class PendingWorktreeInfo:
258 """Information about a pending worktree from a previous run.
260 Attributes:
261 worktree_path: Path to the worktree directory
262 branch_name: Git branch name (parallel/<issue-id>-<timestamp>)
263 issue_id: Extracted issue ID (e.g., BUG-045)
264 commits_ahead: Number of commits ahead of main
265 has_uncommitted_changes: Whether there are uncommitted changes
266 changed_files: List of files with uncommitted changes
267 """
269 worktree_path: Path
270 branch_name: str
271 issue_id: str
272 commits_ahead: int
273 has_uncommitted_changes: bool
274 changed_files: list[str] = field(default_factory=list)
276 @property
277 def has_pending_work(self) -> bool:
278 """Return True if this worktree has work that could be merged."""
279 return self.commits_ahead > 0 or self.has_uncommitted_changes
282@dataclass
283class ParallelConfig:
284 """Configuration for the parallel issue manager.
286 Supports configurable command templates for different project setups.
287 Commands use placeholders: {{issue_id}}, {{issue_type}}, {{action}}
289 Attributes:
290 max_workers: Number of parallel workers (default: 2)
291 p0_sequential: Process P0 issues sequentially (default: True)
292 merge_interval: Seconds between merge attempts (default: 30.0)
293 worktree_base: Base directory for git worktrees
294 state_file: Path to state persistence file
295 max_merge_retries: Maximum rebase attempts before giving up (default: 2)
296 priority_filter: Which priority levels to process
297 max_issues: Maximum issues to process (0 = unlimited)
298 dry_run: Preview mode without actual processing
299 timeout_per_issue: Timeout in seconds for each issue (default: 7200)
300 orchestrator_timeout: Timeout for waiting on workers (default: 0 = auto)
301 stream_subprocess_output: Whether to stream subprocess output
302 show_model: Make API call to verify and display model on worktree setup
303 command_prefix: Prefix for slash commands (default: "/ll:")
304 ready_command: Template for ready-issue command
305 manage_command: Template for manage-issue command
306 only_ids: If provided, only process these issue IDs
307 skip_ids: Issue IDs to skip (in addition to completed/failed)
308 merge_pending: Attempt to merge pending worktrees from previous runs
309 clean_start: Remove all worktrees without checking for pending work
310 ignore_pending: Report pending work but continue without merging
311 """
313 max_workers: int = 2
314 p0_sequential: bool = True
315 merge_interval: float = 30.0
316 worktree_base: Path = field(default_factory=lambda: Path(".worktrees"))
317 state_file: Path = field(default_factory=lambda: Path(".parallel-manage-state.json"))
318 max_merge_retries: int = 2
319 priority_filter: list[str] = field(default_factory=lambda: ["P0", "P1", "P2", "P3", "P4", "P5"])
320 max_issues: int = 0
321 dry_run: bool = False
322 timeout_per_issue: int = 3600
323 idle_timeout_per_issue: int = 0 # Kill if no output for N seconds (0 to disable)
324 orchestrator_timeout: int = 0 # 0 = use timeout_per_issue * max_workers
325 stream_subprocess_output: bool = False
326 show_model: bool = False # Make API call to verify model on worktree setup
327 # Configurable command templates
328 command_prefix: str = "/ll:"
329 ready_command: str = "ready-issue {{issue_id}}"
330 manage_command: str = "manage-issue {{issue_type}} {{action}} {{issue_id}}"
331 # Issue ID filters
332 only_ids: set[str] | None = None
333 skip_ids: set[str] | None = None
334 type_prefixes: set[str] | None = None
335 # Validation settings
336 require_code_changes: bool = True # If False, allow changes to only excluded dirs
337 # Additional files to copy from main repo to worktrees
338 # Note: .claude/ directory is always copied automatically (see worker_pool.py)
339 worktree_copy_files: list[str] = field(
340 default_factory=lambda: [".claude/settings.local.json", ".env"]
341 )
342 # Pending worktree handling flags
343 merge_pending: bool = False # Attempt to merge pending worktrees
344 clean_start: bool = False # Remove all worktrees without checking
345 ignore_pending: bool = False # Report pending work but continue
346 # Overlap detection settings (ENH-143)
347 overlap_detection: bool = False # Enable pre-flight overlap detection
348 serialize_overlapping: bool = True # If True, defer overlapping issues; if False, just warn
349 # Base branch for rebase/merge operations (auto-detected at startup)
350 base_branch: str = "main"
352 def get_ready_command(self, issue_id: str) -> str:
353 """Build the ready-issue command string.
355 Args:
356 issue_id: Issue identifier
358 Returns:
359 Complete command string
360 """
361 cmd = self.ready_command.replace("{{issue_id}}", issue_id)
362 return f"{self.command_prefix}{cmd}"
364 def get_manage_command(self, issue_type: str, action: str, issue_id: str) -> str:
365 """Build the manage-issue command string.
367 Args:
368 issue_type: Type of issue (bug, feature, enhancement)
369 action: Action to perform (fix, implement, improve)
370 issue_id: Issue identifier
372 Returns:
373 Complete command string
374 """
375 cmd = (
376 self.manage_command.replace("{{issue_type}}", issue_type)
377 .replace("{{action}}", action)
378 .replace("{{issue_id}}", issue_id)
379 )
380 return f"{self.command_prefix}{cmd}"
382 def to_dict(self) -> dict[str, Any]:
383 """Convert to dictionary for JSON serialization."""
384 return {
385 "max_workers": self.max_workers,
386 "p0_sequential": self.p0_sequential,
387 "merge_interval": self.merge_interval,
388 "worktree_base": str(self.worktree_base),
389 "state_file": str(self.state_file),
390 "max_merge_retries": self.max_merge_retries,
391 "priority_filter": self.priority_filter,
392 "max_issues": self.max_issues,
393 "dry_run": self.dry_run,
394 "timeout_per_issue": self.timeout_per_issue,
395 "idle_timeout_per_issue": self.idle_timeout_per_issue,
396 "orchestrator_timeout": self.orchestrator_timeout,
397 "stream_subprocess_output": self.stream_subprocess_output,
398 "show_model": self.show_model,
399 "command_prefix": self.command_prefix,
400 "ready_command": self.ready_command,
401 "manage_command": self.manage_command,
402 "only_ids": list(self.only_ids) if self.only_ids else None,
403 "skip_ids": list(self.skip_ids) if self.skip_ids else None,
404 "type_prefixes": list(self.type_prefixes) if self.type_prefixes else None,
405 "require_code_changes": self.require_code_changes,
406 "merge_pending": self.merge_pending,
407 "clean_start": self.clean_start,
408 "ignore_pending": self.ignore_pending,
409 "overlap_detection": self.overlap_detection,
410 "serialize_overlapping": self.serialize_overlapping,
411 "base_branch": self.base_branch,
412 }
414 @classmethod
415 def from_dict(cls, data: dict[str, Any]) -> ParallelConfig:
416 """Create from dictionary (JSON deserialization)."""
417 only_ids_data = data.get("only_ids")
418 skip_ids_data = data.get("skip_ids")
419 type_prefixes_data = data.get("type_prefixes")
420 return cls(
421 max_workers=data.get("max_workers", 2),
422 p0_sequential=data.get("p0_sequential", True),
423 merge_interval=data.get("merge_interval", 30.0),
424 worktree_base=Path(data.get("worktree_base", ".worktrees")),
425 state_file=Path(data.get("state_file", ".parallel-manage-state.json")),
426 max_merge_retries=data.get("max_merge_retries", 2),
427 priority_filter=data.get("priority_filter", ["P0", "P1", "P2", "P3", "P4", "P5"]),
428 max_issues=data.get("max_issues", 0),
429 dry_run=data.get("dry_run", False),
430 timeout_per_issue=data.get("timeout_per_issue", 7200),
431 idle_timeout_per_issue=data.get("idle_timeout_per_issue", 0),
432 orchestrator_timeout=data.get("orchestrator_timeout", 0),
433 stream_subprocess_output=data.get("stream_subprocess_output", False),
434 show_model=data.get("show_model", False),
435 command_prefix=data.get("command_prefix", "/ll:"),
436 ready_command=data.get("ready_command", "ready-issue {{issue_id}}"),
437 manage_command=data.get(
438 "manage_command", "manage-issue {{issue_type}} {{action}} {{issue_id}}"
439 ),
440 only_ids=set(only_ids_data) if only_ids_data else None,
441 skip_ids=set(skip_ids_data) if skip_ids_data else None,
442 type_prefixes=set(type_prefixes_data) if type_prefixes_data else None,
443 require_code_changes=data.get("require_code_changes", True),
444 merge_pending=data.get("merge_pending", False),
445 clean_start=data.get("clean_start", False),
446 ignore_pending=data.get("ignore_pending", False),
447 overlap_detection=data.get("overlap_detection", False),
448 serialize_overlapping=data.get("serialize_overlapping", True),
449 base_branch=data.get("base_branch", "main"),
450 )