Coverage for little_loops / parallel / priority_queue.py: 32%

103 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Thread-safe priority queue for issue processing. 

2 

3Provides a priority queue implementation that orders issues by priority level 

4(P0 > P1 > P2 > P3 > P4 > P5) with FIFO ordering within the same priority level. 

5""" 

6 

7from __future__ import annotations 

8 

9import threading 

10from queue import Empty, PriorityQueue 

11from typing import TYPE_CHECKING 

12 

13from little_loops.issue_parser import IssueInfo, find_issues 

14from little_loops.parallel.types import QueuedIssue 

15 

16if TYPE_CHECKING: 

17 from collections.abc import Iterable 

18 

19 from little_loops.config import BRConfig 

20 

21 

22class IssuePriorityQueue: 

23 """Thread-safe priority queue for issues. 

24 

25 Orders issues by priority (P0=highest) with FIFO within same priority. 

26 Tracks in-progress and completed issues to prevent duplicate processing. 

27 

28 Example: 

29 >>> config = BRConfig(Path.cwd()) 

30 >>> queue = IssuePriorityQueue() 

31 >>> issues = queue.scan_issues(config) 

32 >>> queue.add_many(issues) 

33 >>> issue = queue.get() # Returns highest priority issue 

34 >>> queue.mark_completed(issue.issue_info.issue_id) 

35 """ 

36 

37 # Default priority ordering (P0 highest) 

38 DEFAULT_PRIORITIES = ["P0", "P1", "P2", "P3", "P4", "P5"] 

39 

40 def __init__(self) -> None: 

41 """Initialize the priority queue.""" 

42 self._queue: PriorityQueue[QueuedIssue] = PriorityQueue() 

43 self._lock = threading.Lock() 

44 self._queued: set[str] = set() # Issues currently in queue 

45 self._in_progress: set[str] = set() 

46 self._completed: set[str] = set() 

47 self._failed: set[str] = set() 

48 

49 def add(self, issue: IssueInfo) -> bool: 

50 """Add an issue to the queue if not already processed. 

51 

52 Args: 

53 issue: Issue information to queue 

54 

55 Returns: 

56 True if issue was added, False if already queued/in_progress/completed 

57 """ 

58 with self._lock: 

59 if issue.issue_id in self._queued: 

60 return False 

61 if issue.issue_id in self._in_progress: 

62 return False 

63 if issue.issue_id in self._completed: 

64 return False 

65 if issue.issue_id in self._failed: 

66 return False 

67 

68 queued = QueuedIssue( 

69 priority=issue.priority_int, 

70 issue_info=issue, 

71 ) 

72 self._queue.put(queued) 

73 self._queued.add(issue.issue_id) 

74 return True 

75 

76 def add_many(self, issues: Iterable[IssueInfo]) -> int: 

77 """Add multiple issues to the queue. 

78 

79 Args: 

80 issues: Iterable of issue information 

81 

82 Returns: 

83 Number of issues successfully added 

84 """ 

85 added = 0 

86 for issue in issues: 

87 if self.add(issue): 

88 added += 1 

89 return added 

90 

91 def get(self, block: bool = True, timeout: float | None = None) -> QueuedIssue | None: 

92 """Get the highest priority issue from the queue. 

93 

94 Args: 

95 block: Whether to block waiting for an issue 

96 timeout: Maximum time to wait (None = forever) 

97 

98 Returns: 

99 The highest priority queued issue, or None if queue is empty 

100 """ 

101 try: 

102 queued = self._queue.get(block=block, timeout=timeout) 

103 with self._lock: 

104 self._queued.discard(queued.issue_info.issue_id) 

105 self._in_progress.add(queued.issue_info.issue_id) 

106 return queued 

107 except Empty: 

108 return None 

109 

110 def mark_completed(self, issue_id: str) -> None: 

111 """Mark an issue as successfully completed. 

112 

113 Args: 

114 issue_id: ID of the completed issue 

115 """ 

116 with self._lock: 

117 self._in_progress.discard(issue_id) 

118 self._completed.add(issue_id) 

119 

120 def mark_failed(self, issue_id: str) -> None: 

121 """Mark an issue as failed. 

122 

123 Args: 

124 issue_id: ID of the failed issue 

125 """ 

126 with self._lock: 

127 self._in_progress.discard(issue_id) 

128 self._failed.add(issue_id) 

129 

130 def requeue(self, issue: IssueInfo, demote_priority: bool = False) -> None: 

131 """Requeue an issue (e.g., after merge conflict). 

132 

133 Args: 

134 issue: Issue to requeue 

135 demote_priority: Whether to lower the priority by one level 

136 """ 

137 with self._lock: 

138 self._in_progress.discard(issue.issue_id) 

139 self._failed.discard(issue.issue_id) 

140 

141 priority = issue.priority_int 

142 if demote_priority and priority < 5: 

143 priority += 1 

144 

145 queued = QueuedIssue( 

146 priority=priority, 

147 issue_info=issue, 

148 ) 

149 self._queue.put(queued) 

150 self._queued.add(issue.issue_id) 

151 

152 def empty(self) -> bool: 

153 """Check if the queue is empty.""" 

154 return self._queue.empty() 

155 

156 def qsize(self) -> int: 

157 """Get approximate queue size.""" 

158 return self._queue.qsize() 

159 

160 @property 

161 def in_progress_count(self) -> int: 

162 """Number of issues currently being processed.""" 

163 with self._lock: 

164 return len(self._in_progress) 

165 

166 @property 

167 def completed_count(self) -> int: 

168 """Number of completed issues.""" 

169 with self._lock: 

170 return len(self._completed) 

171 

172 @property 

173 def failed_count(self) -> int: 

174 """Number of failed issues.""" 

175 with self._lock: 

176 return len(self._failed) 

177 

178 @property 

179 def in_progress_ids(self) -> list[str]: 

180 """List of issue IDs currently being processed.""" 

181 with self._lock: 

182 return list(self._in_progress) 

183 

184 @property 

185 def completed_ids(self) -> list[str]: 

186 """List of completed issue IDs.""" 

187 with self._lock: 

188 return list(self._completed) 

189 

190 @property 

191 def failed_ids(self) -> list[str]: 

192 """List of failed issue IDs.""" 

193 with self._lock: 

194 return list(self._failed) 

195 

196 def load_completed(self, completed: Iterable[str]) -> None: 

197 """Load previously completed issues (for resume). 

198 

199 Args: 

200 completed: Issue IDs that were already completed 

201 """ 

202 with self._lock: 

203 self._completed.update(completed) 

204 

205 def load_failed(self, failed: Iterable[str]) -> None: 

206 """Load previously failed issues (for resume). 

207 

208 Args: 

209 failed: Issue IDs that previously failed 

210 """ 

211 with self._lock: 

212 self._failed.update(failed) 

213 

214 @staticmethod 

215 def scan_issues( 

216 config: BRConfig, 

217 priority_filter: list[str] | None = None, 

218 skip_ids: set[str] | None = None, 

219 only_ids: set[str] | None = None, 

220 category: str | None = None, 

221 type_prefixes: set[str] | None = None, 

222 ) -> list[IssueInfo]: 

223 """Scan issue directories and return sorted issues. 

224 

225 Uses BRConfig to locate issue directories and determine categories. 

226 

227 Args: 

228 config: Project configuration 

229 priority_filter: Only include these priority levels (default: all) 

230 skip_ids: Issue IDs to skip 

231 only_ids: If provided, only include these issue IDs 

232 category: Optional category filter (e.g., "bugs") 

233 type_prefixes: If provided, only include issues with these type prefixes 

234 

235 Returns: 

236 List of IssueInfo sorted by priority then alphabetically 

237 """ 

238 skip_ids = skip_ids or set() 

239 priority_filter = priority_filter or IssuePriorityQueue.DEFAULT_PRIORITIES 

240 

241 # Use the existing find_issues function from issue_parser 

242 all_issues = find_issues( 

243 config, 

244 category=category, 

245 skip_ids=skip_ids, 

246 only_ids=only_ids, 

247 type_prefixes=type_prefixes, 

248 ) 

249 

250 # Apply priority filter 

251 filtered = [i for i in all_issues if i.priority in priority_filter] 

252 

253 return filtered