Coverage for little_loops / cli / issues / search.py: 0%

216 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""ll-issues search: Search issues with filters and sorting.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import re 

7from datetime import date 

8from typing import TYPE_CHECKING 

9 

10from little_loops.cli.output import PRIORITY_COLOR, TYPE_COLOR, colorize, print_json 

11 

12if TYPE_CHECKING: 

13 from little_loops.config import BRConfig 

14 from little_loops.issue_parser import IssueInfo 

15 

16 

17def _parse_discovered_date(content: str) -> date | None: 

18 """Extract discovered_date from YAML frontmatter.""" 

19 match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL) 

20 if not match: 

21 return None 

22 date_match = re.search(r"discovered_date:\s*(\S+)", match.group(1)) 

23 if not date_match: 

24 return None 

25 date_str = date_match.group(1).strip("\"'") 

26 try: 

27 # Handle ISO datetime strings like 2026-03-13T21:11:34Z 

28 return date.fromisoformat(date_str[:10]) 

29 except ValueError: 

30 return None 

31 

32 

33def _parse_labels_from_content(content: str) -> list[str]: 

34 """Extract labels from ## Labels section (backtick-wrapped items).""" 

35 match = re.search(r"## Labels\s*\n(.*?)(?:\n##|\Z)", content, re.DOTALL) 

36 if not match: 

37 return [] 

38 return [m.lower() for m in re.findall(r"`([^`]+)`", match.group(1))] 

39 

40 

41def _parse_priority_filter(priority_values: list[str]) -> set[str]: 

42 """Parse priority filter values, supporting ranges like P0-P2.""" 

43 priorities: set[str] = set() 

44 for val in priority_values: 

45 range_match = re.match(r"^(P\d)-(P\d)$", val) 

46 if range_match: 

47 start = int(range_match.group(1)[1:]) 

48 end = int(range_match.group(2)[1:]) 

49 for i in range(start, end + 1): 

50 priorities.add(f"P{i}") 

51 elif re.match(r"^P\d$", val): 

52 priorities.add(val) 

53 return priorities 

54 

55 

56def _load_issues_with_status( 

57 config: BRConfig, 

58 include_active: bool, 

59 include_completed: bool, 

60 include_deferred: bool, 

61) -> list[tuple[IssueInfo, str]]: 

62 """Load issues from the relevant directories, tagged with their status. 

63 

64 Returns: 

65 List of (IssueInfo, status) where status is 'active', 'completed', or 'deferred'. 

66 """ 

67 from little_loops.issue_parser import IssueParser 

68 

69 parser = IssueParser(config) 

70 results: list[tuple[IssueInfo, str]] = [] 

71 

72 if include_active: 

73 for category in config.issue_categories: 

74 issue_dir = config.get_issue_dir(category) 

75 if issue_dir.exists(): 

76 for f in sorted(issue_dir.glob("*.md")): 

77 try: 

78 results.append((parser.parse_file(f), "active")) 

79 except Exception: 

80 continue 

81 

82 if include_completed: 

83 completed_dir = config.get_completed_dir() 

84 if completed_dir.exists(): 

85 for f in sorted(completed_dir.glob("*.md")): 

86 try: 

87 results.append((parser.parse_file(f), "completed")) 

88 except Exception: 

89 continue 

90 

91 if include_deferred: 

92 deferred_dir = config.get_deferred_dir() 

93 if deferred_dir.exists(): 

94 for f in sorted(deferred_dir.glob("*.md")): 

95 try: 

96 results.append((parser.parse_file(f), "deferred")) 

97 except Exception: 

98 continue 

99 

100 return results 

101 

102 

103def _sort_issues( 

104 items: list[tuple[IssueInfo, str, date | None, date | None]], 

105 sort_field: str, 

106 descending: bool, 

107) -> list[tuple[IssueInfo, str, date | None, date | None]]: 

108 """Sort issues by the requested field.""" 

109 sentinel = date(9999, 12, 31) 

110 

111 def key(item: tuple) -> tuple: 

112 issue, _status, disc_date, comp_date = item 

113 if sort_field == "priority": 

114 return (issue.priority_int, issue.issue_id) 

115 if sort_field == "id": 

116 m = re.search(r"-(\d+)$", issue.issue_id) 

117 num = int(m.group(1)) if m else 0 

118 return (issue.issue_id.split("-", 1)[0], num) 

119 if sort_field in ("date", "created"): 

120 return (disc_date or sentinel,) 

121 if sort_field == "completed": 

122 return (comp_date or sentinel,) 

123 if sort_field == "type": 

124 return (issue.issue_id.split("-", 1)[0], issue.priority_int, issue.issue_id) 

125 if sort_field == "title": 

126 return (issue.title.lower(),) 

127 if sort_field == "confidence": 

128 score = issue.confidence_score if issue.confidence_score is not None else 9999 

129 return (score,) 

130 if sort_field == "outcome": 

131 score = issue.outcome_confidence if issue.outcome_confidence is not None else 9999 

132 return (score,) 

133 if sort_field == "refinement": 

134 refinement_commands = { 

135 "/ll:verify-issues", 

136 "/ll:refine-issue", 

137 "/ll:tradeoff-review-issues", 

138 "/ll:map-dependencies", 

139 "/ll:ready-issue", 

140 } 

141 counts: dict[str, int] = getattr(issue, "session_command_counts", {}) or {} 

142 total = sum(counts.get(cmd, 0) for cmd in refinement_commands) 

143 return (total,) 

144 return (issue.priority_int, issue.issue_id) 

145 

146 return sorted(items, key=key, reverse=descending) 

147 

148 

149def cmd_search(config: BRConfig, args: argparse.Namespace) -> int: 

150 """Search issues with optional text query, filters, and sorting. 

151 

152 Args: 

153 config: Project configuration 

154 args: Parsed arguments 

155 

156 Returns: 

157 Exit code (0 = success) 

158 """ 

159 # Resolve status flags 

160 status = getattr(args, "status", "active") 

161 if getattr(args, "include_completed", False): 

162 status = "all" 

163 

164 include_active = status in ("active", "all") 

165 include_completed = status in ("completed", "all") 

166 include_deferred = status in ("deferred", "all") 

167 

168 # Load issues 

169 raw = _load_issues_with_status(config, include_active, include_completed, include_deferred) 

170 

171 # Parse additional metadata (dates, labels) only when needed 

172 query: str | None = getattr(args, "query", None) 

173 since_date: date | None = None 

174 until_date: date | None = None 

175 raw_since = getattr(args, "since", None) 

176 raw_until = getattr(args, "until", None) 

177 if raw_since: 

178 try: 

179 since_date = date.fromisoformat(raw_since) 

180 except ValueError: 

181 print(f"Invalid --since date: {raw_since!r}. Use YYYY-MM-DD.") 

182 return 1 

183 if raw_until: 

184 try: 

185 until_date = date.fromisoformat(raw_until) 

186 except ValueError: 

187 print(f"Invalid --until date: {raw_until!r}. Use YYYY-MM-DD.") 

188 return 1 

189 

190 sort_field = getattr(args, "sort", "priority") or "priority" 

191 need_content = bool( 

192 query 

193 or since_date 

194 or until_date 

195 or getattr(args, "label", None) 

196 or sort_field in {"date", "created", "completed"} 

197 ) 

198 

199 # Build enriched list: (IssueInfo, status, discovered_date, completed_date) 

200 enriched: list[tuple[IssueInfo, str, date | None, date | None]] = [] 

201 for issue, stat in raw: 

202 if need_content: 

203 try: 

204 content = issue.path.read_text(encoding="utf-8") 

205 except Exception: 

206 content = "" 

207 disc_date = _parse_discovered_date(content) 

208 labels = _parse_labels_from_content(content) 

209 else: 

210 content = "" 

211 disc_date = None 

212 labels = [] 

213 

214 # --- Filter: text query --- 

215 if query: 

216 haystack = (issue.title + "\n" + content).lower() 

217 if query.lower() not in haystack: 

218 continue 

219 

220 # --- Filter: type --- 

221 type_filters: list[str] = getattr(args, "type", None) or [] 

222 if type_filters: 

223 issue_type = issue.issue_id.split("-", 1)[0] 

224 if issue_type not in type_filters: 

225 continue 

226 

227 # --- Filter: priority --- 

228 priority_filters: list[str] = getattr(args, "priority", None) or [] 

229 if priority_filters: 

230 allowed = _parse_priority_filter(priority_filters) 

231 if issue.priority not in allowed: 

232 continue 

233 

234 # --- Filter: label --- 

235 label_filters: list[str] = getattr(args, "label", None) or [] 

236 if label_filters: 

237 if not any(lf.lower() in labels for lf in label_filters): 

238 continue 

239 

240 # --- Filter: date range --- 

241 date_field = getattr(args, "date_field", "discovered") 

242 if date_field == "discovered": 

243 ref_date = disc_date 

244 else: 

245 # "updated" not stored; fall back to discovered_date 

246 ref_date = disc_date 

247 

248 if since_date and (ref_date is None or ref_date < since_date): 

249 continue 

250 if until_date and (ref_date is None or ref_date > until_date): 

251 continue 

252 

253 comp_date: date | None = None 

254 if sort_field == "completed" and need_content: 

255 from little_loops.issue_history.parsing import _parse_completion_date 

256 

257 comp_date = _parse_completion_date(content, issue.path) 

258 enriched.append((issue, stat, disc_date, comp_date)) 

259 

260 # --- Sort --- 

261 # Default direction: desc for date/created/completed (newest first), asc for everything else 

262 if getattr(args, "desc", False): 

263 descending = True 

264 elif getattr(args, "asc", False): 

265 descending = False 

266 else: 

267 descending = sort_field in {"date", "created", "completed"} 

268 

269 enriched = _sort_issues(enriched, sort_field, descending) 

270 

271 # --- Limit --- 

272 limit = getattr(args, "limit", None) 

273 if limit and limit > 0: 

274 enriched = enriched[:limit] 

275 

276 if not enriched: 

277 print("No issues found.") 

278 return 0 

279 

280 # --- Output --- 

281 issues_out = [item[0] for item in enriched] 

282 statuses_out = [item[1] for item in enriched] 

283 dates_out = [item[2] for item in enriched] 

284 

285 if getattr(args, "json", False): 

286 print_json( 

287 [ 

288 { 

289 "id": issue.issue_id, 

290 "priority": issue.priority, 

291 "type": issue.issue_id.split("-", 1)[0], 

292 "title": issue.title, 

293 "path": str(issue.path), 

294 "status": stat, 

295 "discovered_date": str(d) if d else None, 

296 } 

297 for issue, stat, d in zip(issues_out, statuses_out, dates_out, strict=True) 

298 ] 

299 ) 

300 return 0 

301 

302 fmt = getattr(args, "format", "table") or "table" 

303 

304 if fmt == "ids": 

305 for issue in issues_out: 

306 print(issue.issue_id) 

307 return 0 

308 

309 if fmt == "list": 

310 for issue in issues_out: 

311 print(f"{issue.path.name} {issue.title}") 

312 return 0 

313 

314 # Default: table (grouped by type, similar to ll-issues list) 

315 buckets: dict[str, list[tuple[IssueInfo, str]]] = {"BUG": [], "FEAT": [], "ENH": []} 

316 for issue, stat in zip(issues_out, statuses_out, strict=True): 

317 prefix = issue.issue_id.split("-", 1)[0] 

318 if prefix in buckets: 

319 buckets[prefix].append((issue, stat)) 

320 

321 type_labels = {"BUG": "Bugs", "FEAT": "Features", "ENH": "Enhancements"} 

322 lines: list[str] = [] 

323 for prefix, label in type_labels.items(): 

324 group = buckets[prefix] 

325 if not group: 

326 continue 

327 header = colorize(f"{label} ({len(group)})", f"{TYPE_COLOR.get(prefix, '0')};1") 

328 lines.append(header) 

329 for issue, stat in group: 

330 issue_type = issue.issue_id.split("-", 1)[0] 

331 colored_id = colorize(issue.issue_id, TYPE_COLOR.get(issue_type, "0")) 

332 colored_priority = colorize(issue.priority, PRIORITY_COLOR.get(issue.priority, "0")) 

333 status_tag = f" [{stat}]" if stat != "active" else "" 

334 lines.append(f" {colored_priority} {colored_id} {issue.title}{status_tag}") 

335 lines.append("") 

336 lines.append(f"Total: {len(issues_out)} issue(s) found") 

337 print("\n".join(lines)) 

338 return 0