Coverage for little_loops / parallel / priority_queue.py: 32%
103 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Thread-safe priority queue for issue processing.
3Provides a priority queue implementation that orders issues by priority level
4(P0 > P1 > P2 > P3 > P4 > P5) with FIFO ordering within the same priority level.
5"""
7from __future__ import annotations
9import threading
10from queue import Empty, PriorityQueue
11from typing import TYPE_CHECKING
13from little_loops.issue_parser import IssueInfo, find_issues
14from little_loops.parallel.types import QueuedIssue
16if TYPE_CHECKING:
17 from collections.abc import Iterable
19 from little_loops.config import BRConfig
22class IssuePriorityQueue:
23 """Thread-safe priority queue for issues.
25 Orders issues by priority (P0=highest) with FIFO within same priority.
26 Tracks in-progress and completed issues to prevent duplicate processing.
28 Example:
29 >>> config = BRConfig(Path.cwd())
30 >>> queue = IssuePriorityQueue()
31 >>> issues = queue.scan_issues(config)
32 >>> queue.add_many(issues)
33 >>> issue = queue.get() # Returns highest priority issue
34 >>> queue.mark_completed(issue.issue_info.issue_id)
35 """
37 # Default priority ordering (P0 highest)
38 DEFAULT_PRIORITIES = ["P0", "P1", "P2", "P3", "P4", "P5"]
40 def __init__(self) -> None:
41 """Initialize the priority queue."""
42 self._queue: PriorityQueue[QueuedIssue] = PriorityQueue()
43 self._lock = threading.Lock()
44 self._queued: set[str] = set() # Issues currently in queue
45 self._in_progress: set[str] = set()
46 self._completed: set[str] = set()
47 self._failed: set[str] = set()
49 def add(self, issue: IssueInfo) -> bool:
50 """Add an issue to the queue if not already processed.
52 Args:
53 issue: Issue information to queue
55 Returns:
56 True if issue was added, False if already queued/in_progress/completed
57 """
58 with self._lock:
59 if issue.issue_id in self._queued:
60 return False
61 if issue.issue_id in self._in_progress:
62 return False
63 if issue.issue_id in self._completed:
64 return False
65 if issue.issue_id in self._failed:
66 return False
68 queued = QueuedIssue(
69 priority=issue.priority_int,
70 issue_info=issue,
71 )
72 self._queue.put(queued)
73 self._queued.add(issue.issue_id)
74 return True
76 def add_many(self, issues: Iterable[IssueInfo]) -> int:
77 """Add multiple issues to the queue.
79 Args:
80 issues: Iterable of issue information
82 Returns:
83 Number of issues successfully added
84 """
85 added = 0
86 for issue in issues:
87 if self.add(issue):
88 added += 1
89 return added
91 def get(self, block: bool = True, timeout: float | None = None) -> QueuedIssue | None:
92 """Get the highest priority issue from the queue.
94 Args:
95 block: Whether to block waiting for an issue
96 timeout: Maximum time to wait (None = forever)
98 Returns:
99 The highest priority queued issue, or None if queue is empty
100 """
101 try:
102 queued = self._queue.get(block=block, timeout=timeout)
103 with self._lock:
104 self._queued.discard(queued.issue_info.issue_id)
105 self._in_progress.add(queued.issue_info.issue_id)
106 return queued
107 except Empty:
108 return None
110 def mark_completed(self, issue_id: str) -> None:
111 """Mark an issue as successfully completed.
113 Args:
114 issue_id: ID of the completed issue
115 """
116 with self._lock:
117 self._in_progress.discard(issue_id)
118 self._completed.add(issue_id)
120 def mark_failed(self, issue_id: str) -> None:
121 """Mark an issue as failed.
123 Args:
124 issue_id: ID of the failed issue
125 """
126 with self._lock:
127 self._in_progress.discard(issue_id)
128 self._failed.add(issue_id)
130 def requeue(self, issue: IssueInfo, demote_priority: bool = False) -> None:
131 """Requeue an issue (e.g., after merge conflict).
133 Args:
134 issue: Issue to requeue
135 demote_priority: Whether to lower the priority by one level
136 """
137 with self._lock:
138 self._in_progress.discard(issue.issue_id)
139 self._failed.discard(issue.issue_id)
141 priority = issue.priority_int
142 if demote_priority and priority < 5:
143 priority += 1
145 queued = QueuedIssue(
146 priority=priority,
147 issue_info=issue,
148 )
149 self._queue.put(queued)
150 self._queued.add(issue.issue_id)
152 def empty(self) -> bool:
153 """Check if the queue is empty."""
154 return self._queue.empty()
156 def qsize(self) -> int:
157 """Get approximate queue size."""
158 return self._queue.qsize()
160 @property
161 def in_progress_count(self) -> int:
162 """Number of issues currently being processed."""
163 with self._lock:
164 return len(self._in_progress)
166 @property
167 def completed_count(self) -> int:
168 """Number of completed issues."""
169 with self._lock:
170 return len(self._completed)
172 @property
173 def failed_count(self) -> int:
174 """Number of failed issues."""
175 with self._lock:
176 return len(self._failed)
178 @property
179 def in_progress_ids(self) -> list[str]:
180 """List of issue IDs currently being processed."""
181 with self._lock:
182 return list(self._in_progress)
184 @property
185 def completed_ids(self) -> list[str]:
186 """List of completed issue IDs."""
187 with self._lock:
188 return list(self._completed)
190 @property
191 def failed_ids(self) -> list[str]:
192 """List of failed issue IDs."""
193 with self._lock:
194 return list(self._failed)
196 def load_completed(self, completed: Iterable[str]) -> None:
197 """Load previously completed issues (for resume).
199 Args:
200 completed: Issue IDs that were already completed
201 """
202 with self._lock:
203 self._completed.update(completed)
205 def load_failed(self, failed: Iterable[str]) -> None:
206 """Load previously failed issues (for resume).
208 Args:
209 failed: Issue IDs that previously failed
210 """
211 with self._lock:
212 self._failed.update(failed)
214 @staticmethod
215 def scan_issues(
216 config: BRConfig,
217 priority_filter: list[str] | None = None,
218 skip_ids: set[str] | None = None,
219 only_ids: set[str] | None = None,
220 category: str | None = None,
221 type_prefixes: set[str] | None = None,
222 ) -> list[IssueInfo]:
223 """Scan issue directories and return sorted issues.
225 Uses BRConfig to locate issue directories and determine categories.
227 Args:
228 config: Project configuration
229 priority_filter: Only include these priority levels (default: all)
230 skip_ids: Issue IDs to skip
231 only_ids: If provided, only include these issue IDs
232 category: Optional category filter (e.g., "bugs")
233 type_prefixes: If provided, only include issues with these type prefixes
235 Returns:
236 List of IssueInfo sorted by priority then alphabetically
237 """
238 skip_ids = skip_ids or set()
239 priority_filter = priority_filter or IssuePriorityQueue.DEFAULT_PRIORITIES
241 # Use the existing find_issues function from issue_parser
242 all_issues = find_issues(
243 config,
244 category=category,
245 skip_ids=skip_ids,
246 only_ids=only_ids,
247 type_prefixes=type_prefixes,
248 )
250 # Apply priority filter
251 filtered = [i for i in all_issues if i.priority in priority_filter]
253 return filtered