Coverage for src / infra / issue_manager.py: 32%
63 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""IssueManager: Pure domain logic for issue sorting and filtering.
3This module provides stateless, testable functions for manipulating issue data.
4Unlike BeadsClient (which handles I/O with the bd CLI), IssueManager contains
5only pure transformations that can be tested without mocking subprocess calls.
7Design note: All public methods are static or class methods to emphasize their
8pure, functional nature. They operate on issue dicts and return new collections
9without mutating inputs.
10"""
12from __future__ import annotations
15class IssueManager:
16 """Pure domain logic for issue sorting and filtering.
18 All methods are static and operate on issue data without any I/O.
19 This separation allows:
20 - Direct unit testing with fixture data (no subprocess mocking)
21 - Clear boundary between I/O (BeadsClient) and domain logic (IssueManager)
22 - Reuse of sorting/filtering logic in different contexts
23 """
25 @staticmethod
26 def merge_wip_issues(
27 base_issues: list[dict[str, object]], wip_issues: list[dict[str, object]]
28 ) -> list[dict[str, object]]:
29 """Merge WIP issues into base list, avoiding duplicates.
31 Args:
32 base_issues: List of base issues (typically from bd ready).
33 wip_issues: List of in-progress issues to merge.
35 Returns:
36 Combined list with WIP issues appended if not already in base.
37 """
38 ready_ids: set[str] = {str(i["id"]) for i in base_issues}
39 return base_issues + [w for w in wip_issues if str(w["id"]) not in ready_ids]
41 @staticmethod
42 def filter_blocked_wip(issues: list[dict[str, object]]) -> list[dict[str, object]]:
43 """Filter out blocked in_progress issues.
45 A WIP issue is considered blocked when it has a truthy blocked_by value.
47 Args:
48 issues: List of issue dicts (may include open + in_progress).
50 Returns:
51 List with blocked in_progress issues removed.
52 """
53 return [
54 issue
55 for issue in issues
56 if not (issue.get("status") == "in_progress" and issue.get("blocked_by"))
57 ]
59 @staticmethod
60 def filter_blocked_epics(
61 issues: list[dict[str, object]], blocked_epics: set[str]
62 ) -> list[dict[str, object]]:
63 """Filter out issues whose parent epic is blocked.
65 Args:
66 issues: List of issue dicts with parent_epic field populated.
67 blocked_epics: Set of blocked epic IDs.
69 Returns:
70 List with issues under blocked epics removed.
71 """
72 if not blocked_epics:
73 return issues
74 return [
75 issue
76 for issue in issues
77 if str(issue.get("parent_epic")) not in blocked_epics
78 ]
80 @staticmethod
81 def apply_filters(
82 issues: list[dict[str, object]],
83 exclude_ids: set[str],
84 epic_children: set[str] | None,
85 only_ids: set[str] | None,
86 ) -> list[dict[str, object]]:
87 """Apply filtering rules to issues.
89 Filters out:
90 - Issues in exclude_ids
91 - Epics (issue_type == "epic")
92 - Issues not in epic_children (if specified)
93 - Issues not in only_ids (if specified)
95 Args:
96 issues: List of issue dicts to filter.
97 exclude_ids: Set of issue IDs to exclude.
98 epic_children: If set, only include issues in this set.
99 only_ids: If set, only include issues in this set.
101 Returns:
102 Filtered list of issues.
103 """
104 return [
105 i
106 for i in issues
107 if str(i["id"]) not in exclude_ids
108 and i.get("issue_type") != "epic"
109 and (epic_children is None or str(i["id"]) in epic_children)
110 and (only_ids is None or str(i["id"]) in only_ids)
111 ]
113 @staticmethod
114 def negate_timestamp(timestamp: object) -> str:
115 """Negate a timestamp string for descending sort.
117 Converts each character to its complement relative to chr(255)
118 so that lexicographic sort becomes descending.
120 Args:
121 timestamp: ISO timestamp string or empty string.
123 Returns:
124 Negated string for descending sort.
125 """
126 if not timestamp or not isinstance(timestamp, str):
127 # Empty/missing timestamps sort last (after all real timestamps)
128 # Use \xff (ordinal 255) since negated timestamp chars are in [198, 207]
129 return "\xff"
130 # Negate each character: higher chars become lower
131 # This works for ISO timestamps like "2025-01-15T10:30:00Z"
132 return "".join(chr(255 - ord(c)) for c in timestamp)
134 @staticmethod
135 def sort_by_epic_groups(
136 issues: list[dict[str, object]],
137 ) -> list[dict[str, object]]:
138 """Sort issues by epic groups for focus mode.
140 Groups issues by parent_epic field, then sorts:
141 1. Groups by (min_priority, max_updated DESC)
142 2. Within groups by (priority, updated DESC)
144 Orphan tasks (no parent epic) form a virtual group with the same rules.
146 Note: Issues must have parent_epic field populated before calling.
148 Args:
149 issues: List of issue dicts with parent_epic field.
151 Returns:
152 Sorted list of issue dicts.
153 """
154 if not issues:
155 return issues
157 # Group issues by parent epic (None for orphans)
158 groups: dict[str | None, list[dict[str, object]]] = {}
159 for issue in issues:
160 key: str | None = (
161 str(issue.get("parent_epic")) if issue.get("parent_epic") else None
162 )
163 groups.setdefault(key, []).append(issue)
165 def get_priority(issue: dict[str, object]) -> int:
166 """Extract priority as int, defaulting to 0."""
167 prio = issue.get("priority")
168 if prio is None:
169 return 0
170 return int(str(prio))
172 def get_updated_at(issue: dict[str, object]) -> str:
173 """Extract updated_at as string, defaulting to empty."""
174 val = issue.get("updated_at")
175 return str(val) if val is not None else ""
177 # Sort within each group by (priority, updated DESC)
178 for group_issues in groups.values():
179 group_issues.sort(
180 key=lambda i: (
181 get_priority(i),
182 IssueManager.negate_timestamp(get_updated_at(i)),
183 )
184 )
186 # Compute group sort key: (min_priority, -max_updated)
187 def group_sort_key(epic: str | None) -> tuple[int, str]:
188 group_issues = groups[epic]
189 min_priority = min(get_priority(i) for i in group_issues)
190 max_updated = max(get_updated_at(i) for i in group_issues)
191 return (min_priority, IssueManager.negate_timestamp(max_updated))
193 # Sort groups and flatten
194 sorted_epics = sorted(groups.keys(), key=group_sort_key)
195 return [issue for epic in sorted_epics for issue in groups[epic]]
197 @staticmethod
198 def sort_issues(
199 issues: list[dict[str, object]], focus: bool, prioritize_wip: bool
200 ) -> list[dict[str, object]]:
201 """Sort issues by focus mode vs priority.
203 Args:
204 issues: List of issue dicts to sort.
205 focus: If True, group by parent epic and sort by epic groups.
206 prioritize_wip: If True, put in_progress issues first.
208 Returns:
209 Sorted list of issue dicts.
210 """
211 if not issues:
212 return issues
214 result = (
215 IssueManager.sort_by_epic_groups(list(issues))
216 if focus
217 else sorted(issues, key=lambda i: i.get("priority") or 0)
218 )
220 if prioritize_wip:
221 return sorted(
222 result, key=lambda i: 0 if i.get("status") == "in_progress" else 1
223 )
224 return result
226 @staticmethod
227 def find_missing_ids(
228 only_ids: set[str] | None,
229 issues: list[dict[str, object]],
230 suppress_ids: set[str],
231 ) -> set[str]:
232 """Find IDs from only_ids that are not in issues.
234 Args:
235 only_ids: Set of expected issue IDs (None means no filtering).
236 issues: List of issue dicts to check against.
237 suppress_ids: IDs to exclude from the missing set.
239 Returns:
240 Set of IDs that were expected but not found.
241 """
242 if not only_ids:
243 return set()
244 return only_ids - {str(i["id"]) for i in issues} - suppress_ids
246 @staticmethod
247 def filter_orphans_only(
248 issues: list[dict[str, object]],
249 ) -> list[dict[str, object]]:
250 """Filter to only issues with no parent epic.
252 Args:
253 issues: List of issue dicts with parent_epic field populated.
255 Returns:
256 List containing only issues where parent_epic is None/empty.
257 """
258 return [issue for issue in issues if not issue.get("parent_epic")]