Coverage for little_loops / cli / issues / search.py: 0%
216 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""ll-issues search: Search issues with filters and sorting."""
3from __future__ import annotations
5import argparse
6import re
7from datetime import date
8from typing import TYPE_CHECKING
10from little_loops.cli.output import PRIORITY_COLOR, TYPE_COLOR, colorize, print_json
12if TYPE_CHECKING:
13 from little_loops.config import BRConfig
14 from little_loops.issue_parser import IssueInfo
17def _parse_discovered_date(content: str) -> date | None:
18 """Extract discovered_date from YAML frontmatter."""
19 match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
20 if not match:
21 return None
22 date_match = re.search(r"discovered_date:\s*(\S+)", match.group(1))
23 if not date_match:
24 return None
25 date_str = date_match.group(1).strip("\"'")
26 try:
27 # Handle ISO datetime strings like 2026-03-13T21:11:34Z
28 return date.fromisoformat(date_str[:10])
29 except ValueError:
30 return None
33def _parse_labels_from_content(content: str) -> list[str]:
34 """Extract labels from ## Labels section (backtick-wrapped items)."""
35 match = re.search(r"## Labels\s*\n(.*?)(?:\n##|\Z)", content, re.DOTALL)
36 if not match:
37 return []
38 return [m.lower() for m in re.findall(r"`([^`]+)`", match.group(1))]
41def _parse_priority_filter(priority_values: list[str]) -> set[str]:
42 """Parse priority filter values, supporting ranges like P0-P2."""
43 priorities: set[str] = set()
44 for val in priority_values:
45 range_match = re.match(r"^(P\d)-(P\d)$", val)
46 if range_match:
47 start = int(range_match.group(1)[1:])
48 end = int(range_match.group(2)[1:])
49 for i in range(start, end + 1):
50 priorities.add(f"P{i}")
51 elif re.match(r"^P\d$", val):
52 priorities.add(val)
53 return priorities
56def _load_issues_with_status(
57 config: BRConfig,
58 include_active: bool,
59 include_completed: bool,
60 include_deferred: bool,
61) -> list[tuple[IssueInfo, str]]:
62 """Load issues from the relevant directories, tagged with their status.
64 Returns:
65 List of (IssueInfo, status) where status is 'active', 'completed', or 'deferred'.
66 """
67 from little_loops.issue_parser import IssueParser
69 parser = IssueParser(config)
70 results: list[tuple[IssueInfo, str]] = []
72 if include_active:
73 for category in config.issue_categories:
74 issue_dir = config.get_issue_dir(category)
75 if issue_dir.exists():
76 for f in sorted(issue_dir.glob("*.md")):
77 try:
78 results.append((parser.parse_file(f), "active"))
79 except Exception:
80 continue
82 if include_completed:
83 completed_dir = config.get_completed_dir()
84 if completed_dir.exists():
85 for f in sorted(completed_dir.glob("*.md")):
86 try:
87 results.append((parser.parse_file(f), "completed"))
88 except Exception:
89 continue
91 if include_deferred:
92 deferred_dir = config.get_deferred_dir()
93 if deferred_dir.exists():
94 for f in sorted(deferred_dir.glob("*.md")):
95 try:
96 results.append((parser.parse_file(f), "deferred"))
97 except Exception:
98 continue
100 return results
103def _sort_issues(
104 items: list[tuple[IssueInfo, str, date | None, date | None]],
105 sort_field: str,
106 descending: bool,
107) -> list[tuple[IssueInfo, str, date | None, date | None]]:
108 """Sort issues by the requested field."""
109 sentinel = date(9999, 12, 31)
111 def key(item: tuple) -> tuple:
112 issue, _status, disc_date, comp_date = item
113 if sort_field == "priority":
114 return (issue.priority_int, issue.issue_id)
115 if sort_field == "id":
116 m = re.search(r"-(\d+)$", issue.issue_id)
117 num = int(m.group(1)) if m else 0
118 return (issue.issue_id.split("-", 1)[0], num)
119 if sort_field in ("date", "created"):
120 return (disc_date or sentinel,)
121 if sort_field == "completed":
122 return (comp_date or sentinel,)
123 if sort_field == "type":
124 return (issue.issue_id.split("-", 1)[0], issue.priority_int, issue.issue_id)
125 if sort_field == "title":
126 return (issue.title.lower(),)
127 if sort_field == "confidence":
128 score = issue.confidence_score if issue.confidence_score is not None else 9999
129 return (score,)
130 if sort_field == "outcome":
131 score = issue.outcome_confidence if issue.outcome_confidence is not None else 9999
132 return (score,)
133 if sort_field == "refinement":
134 refinement_commands = {
135 "/ll:verify-issues",
136 "/ll:refine-issue",
137 "/ll:tradeoff-review-issues",
138 "/ll:map-dependencies",
139 "/ll:ready-issue",
140 }
141 counts: dict[str, int] = getattr(issue, "session_command_counts", {}) or {}
142 total = sum(counts.get(cmd, 0) for cmd in refinement_commands)
143 return (total,)
144 return (issue.priority_int, issue.issue_id)
146 return sorted(items, key=key, reverse=descending)
149def cmd_search(config: BRConfig, args: argparse.Namespace) -> int:
150 """Search issues with optional text query, filters, and sorting.
152 Args:
153 config: Project configuration
154 args: Parsed arguments
156 Returns:
157 Exit code (0 = success)
158 """
159 # Resolve status flags
160 status = getattr(args, "status", "active")
161 if getattr(args, "include_completed", False):
162 status = "all"
164 include_active = status in ("active", "all")
165 include_completed = status in ("completed", "all")
166 include_deferred = status in ("deferred", "all")
168 # Load issues
169 raw = _load_issues_with_status(config, include_active, include_completed, include_deferred)
171 # Parse additional metadata (dates, labels) only when needed
172 query: str | None = getattr(args, "query", None)
173 since_date: date | None = None
174 until_date: date | None = None
175 raw_since = getattr(args, "since", None)
176 raw_until = getattr(args, "until", None)
177 if raw_since:
178 try:
179 since_date = date.fromisoformat(raw_since)
180 except ValueError:
181 print(f"Invalid --since date: {raw_since!r}. Use YYYY-MM-DD.")
182 return 1
183 if raw_until:
184 try:
185 until_date = date.fromisoformat(raw_until)
186 except ValueError:
187 print(f"Invalid --until date: {raw_until!r}. Use YYYY-MM-DD.")
188 return 1
190 sort_field = getattr(args, "sort", "priority") or "priority"
191 need_content = bool(
192 query
193 or since_date
194 or until_date
195 or getattr(args, "label", None)
196 or sort_field in {"date", "created", "completed"}
197 )
199 # Build enriched list: (IssueInfo, status, discovered_date, completed_date)
200 enriched: list[tuple[IssueInfo, str, date | None, date | None]] = []
201 for issue, stat in raw:
202 if need_content:
203 try:
204 content = issue.path.read_text(encoding="utf-8")
205 except Exception:
206 content = ""
207 disc_date = _parse_discovered_date(content)
208 labels = _parse_labels_from_content(content)
209 else:
210 content = ""
211 disc_date = None
212 labels = []
214 # --- Filter: text query ---
215 if query:
216 haystack = (issue.title + "\n" + content).lower()
217 if query.lower() not in haystack:
218 continue
220 # --- Filter: type ---
221 type_filters: list[str] = getattr(args, "type", None) or []
222 if type_filters:
223 issue_type = issue.issue_id.split("-", 1)[0]
224 if issue_type not in type_filters:
225 continue
227 # --- Filter: priority ---
228 priority_filters: list[str] = getattr(args, "priority", None) or []
229 if priority_filters:
230 allowed = _parse_priority_filter(priority_filters)
231 if issue.priority not in allowed:
232 continue
234 # --- Filter: label ---
235 label_filters: list[str] = getattr(args, "label", None) or []
236 if label_filters:
237 if not any(lf.lower() in labels for lf in label_filters):
238 continue
240 # --- Filter: date range ---
241 date_field = getattr(args, "date_field", "discovered")
242 if date_field == "discovered":
243 ref_date = disc_date
244 else:
245 # "updated" not stored; fall back to discovered_date
246 ref_date = disc_date
248 if since_date and (ref_date is None or ref_date < since_date):
249 continue
250 if until_date and (ref_date is None or ref_date > until_date):
251 continue
253 comp_date: date | None = None
254 if sort_field == "completed" and need_content:
255 from little_loops.issue_history.parsing import _parse_completion_date
257 comp_date = _parse_completion_date(content, issue.path)
258 enriched.append((issue, stat, disc_date, comp_date))
260 # --- Sort ---
261 # Default direction: desc for date/created/completed (newest first), asc for everything else
262 if getattr(args, "desc", False):
263 descending = True
264 elif getattr(args, "asc", False):
265 descending = False
266 else:
267 descending = sort_field in {"date", "created", "completed"}
269 enriched = _sort_issues(enriched, sort_field, descending)
271 # --- Limit ---
272 limit = getattr(args, "limit", None)
273 if limit and limit > 0:
274 enriched = enriched[:limit]
276 if not enriched:
277 print("No issues found.")
278 return 0
280 # --- Output ---
281 issues_out = [item[0] for item in enriched]
282 statuses_out = [item[1] for item in enriched]
283 dates_out = [item[2] for item in enriched]
285 if getattr(args, "json", False):
286 print_json(
287 [
288 {
289 "id": issue.issue_id,
290 "priority": issue.priority,
291 "type": issue.issue_id.split("-", 1)[0],
292 "title": issue.title,
293 "path": str(issue.path),
294 "status": stat,
295 "discovered_date": str(d) if d else None,
296 }
297 for issue, stat, d in zip(issues_out, statuses_out, dates_out, strict=True)
298 ]
299 )
300 return 0
302 fmt = getattr(args, "format", "table") or "table"
304 if fmt == "ids":
305 for issue in issues_out:
306 print(issue.issue_id)
307 return 0
309 if fmt == "list":
310 for issue in issues_out:
311 print(f"{issue.path.name} {issue.title}")
312 return 0
314 # Default: table (grouped by type, similar to ll-issues list)
315 buckets: dict[str, list[tuple[IssueInfo, str]]] = {"BUG": [], "FEAT": [], "ENH": []}
316 for issue, stat in zip(issues_out, statuses_out, strict=True):
317 prefix = issue.issue_id.split("-", 1)[0]
318 if prefix in buckets:
319 buckets[prefix].append((issue, stat))
321 type_labels = {"BUG": "Bugs", "FEAT": "Features", "ENH": "Enhancements"}
322 lines: list[str] = []
323 for prefix, label in type_labels.items():
324 group = buckets[prefix]
325 if not group:
326 continue
327 header = colorize(f"{label} ({len(group)})", f"{TYPE_COLOR.get(prefix, '0')};1")
328 lines.append(header)
329 for issue, stat in group:
330 issue_type = issue.issue_id.split("-", 1)[0]
331 colored_id = colorize(issue.issue_id, TYPE_COLOR.get(issue_type, "0"))
332 colored_priority = colorize(issue.priority, PRIORITY_COLOR.get(issue.priority, "0"))
333 status_tag = f" [{stat}]" if stat != "active" else ""
334 lines.append(f" {colored_priority} {colored_id} {issue.title}{status_tag}")
335 lines.append("")
336 lines.append(f"Total: {len(issues_out)} issue(s) found")
337 print("\n".join(lines))
338 return 0