Coverage for little_loops / issue_history / parsing.py: 0%

152 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Issue history parsing and scanning functions. 

2 

3Provides functions to parse completed issue files, extract metadata 

4from frontmatter and content, scan directories for issues, and 

5extract file paths from issue content. 

6""" 

7 

8from __future__ import annotations 

9 

10import re 

11from datetime import date 

12from pathlib import Path 

13from typing import Any 

14 

15from little_loops.frontmatter import parse_frontmatter 

16from little_loops.issue_history.models import CompletedIssue 

17from little_loops.text_utils import extract_file_paths 

18 

19 

20def parse_completed_issue(file_path: Path) -> CompletedIssue: 

21 """Parse a completed issue file. 

22 

23 Args: 

24 file_path: Path to the issue markdown file 

25 

26 Returns: 

27 CompletedIssue with parsed metadata 

28 """ 

29 filename = file_path.name 

30 content = file_path.read_text(encoding="utf-8") 

31 

32 # Extract from filename: P[0-5]-[TYPE]-[NNN]-description.md 

33 issue_type = "UNKNOWN" 

34 priority = "P5" 

35 issue_id = "UNKNOWN" 

36 

37 # Match priority 

38 priority_match = re.match(r"^(P\d)", filename) 

39 if priority_match: 

40 priority = priority_match.group(1) 

41 

42 # Match type and ID 

43 type_match = re.search(r"(BUG|ENH|FEAT)-(\d+)", filename) 

44 if type_match: 

45 issue_type = type_match.group(1) 

46 issue_id = f"{type_match.group(1)}-{type_match.group(2)}" 

47 

48 # Parse frontmatter once for discovered_by and discovered_date 

49 fm = parse_frontmatter(content) 

50 discovered_by = _parse_discovered_by(fm) 

51 discovered_date = _parse_discovered_date(fm) 

52 

53 # Parse completion date from Resolution section or file mtime 

54 completed_date = _parse_completion_date(content, file_path) 

55 

56 return CompletedIssue( 

57 path=file_path, 

58 issue_type=issue_type, 

59 priority=priority, 

60 issue_id=issue_id, 

61 discovered_by=discovered_by, 

62 discovered_date=discovered_date, 

63 completed_date=completed_date, 

64 ) 

65 

66 

67def _parse_discovered_by(fm: dict[str, Any]) -> str | None: 

68 """Extract discovered_by from parsed frontmatter. 

69 

70 Args: 

71 fm: Parsed frontmatter dictionary 

72 

73 Returns: 

74 discovered_by value or None 

75 """ 

76 value = fm.get("discovered_by") 

77 return value if isinstance(value, str) else None 

78 

79 

80def _parse_completion_date(content: str, file_path: Path) -> date | None: 

81 """Extract completion date from Resolution section or file mtime. 

82 

83 Args: 

84 content: File content 

85 file_path: Path for mtime fallback 

86 

87 Returns: 

88 Completion date or None 

89 """ 

90 # Try Resolution section: **Completed**: YYYY-MM-DD 

91 match = re.search(r"\*\*Completed\*\*:\s*(\d{4}-\d{2}-\d{2})", content) 

92 if match: 

93 try: 

94 return date.fromisoformat(match.group(1)) 

95 except ValueError: 

96 pass 

97 

98 # Fallback to file mtime 

99 try: 

100 mtime = file_path.stat().st_mtime 

101 return date.fromtimestamp(mtime) 

102 except OSError: 

103 return None 

104 

105 

106def _parse_resolution_action(content: str) -> str: 

107 """Extract resolution action category from issue content. 

108 

109 Categorizes based on Resolution section fields: 

110 - "completed": Normal completion with **Action**: fix/implement 

111 - "rejected": Explicitly rejected (out of scope, not valid) 

112 - "invalid": Invalid reference or spec 

113 - "duplicate": Duplicate of existing issue 

114 - "deferred": Deferred to future work 

115 

116 Args: 

117 content: Issue file content 

118 

119 Returns: 

120 Resolution category string 

121 """ 

122 # Look for Status field patterns 

123 status_match = re.search(r"\*\*Status\*\*:\s*(.+?)(?:\n|$)", content) 

124 if status_match: 

125 status = status_match.group(1).strip().lower() 

126 if "closed" in status: 

127 # Check Reason field for specific category 

128 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content) 

129 if reason_match: 

130 reason = reason_match.group(1).strip().lower() 

131 if "duplicate" in reason: 

132 return "duplicate" 

133 if "invalid" in reason: 

134 return "invalid" 

135 if "deferred" in reason: 

136 return "deferred" 

137 if "rejected" in reason or "out of scope" in reason: 

138 return "rejected" 

139 # Generic closed without specific reason 

140 return "rejected" 

141 

142 # Check for Action field (normal completion) 

143 action_match = re.search(r"\*\*Action\*\*:\s*(.+?)(?:\n|$)", content) 

144 if action_match: 

145 return "completed" 

146 

147 # Default to completed if no resolution section 

148 return "completed" 

149 

150 

151def _detect_processing_agent(content: str, discovered_source: str | None = None) -> str: 

152 """Detect which processing agent handled an issue. 

153 

154 Detection strategy (in priority order): 

155 1. Check discovered_source field for 'll-parallel' or 'll-auto' 

156 2. Check content for '**Log Type**:' field 

157 3. Check content for '**Tool**:' field 

158 4. Default to 'manual' 

159 

160 Args: 

161 content: Issue file content 

162 discovered_source: Optional discovered_source frontmatter value 

163 

164 Returns: 

165 Agent name: 'll-auto', 'll-parallel', or 'manual' 

166 """ 

167 # Check discovered_source first 

168 if discovered_source: 

169 source_lower = discovered_source.lower() 

170 if "ll-parallel" in source_lower: 

171 return "ll-parallel" 

172 if "ll-auto" in source_lower: 

173 return "ll-auto" 

174 

175 # Check Log Type field 

176 log_type_match = re.search(r"\*\*Log Type\*\*:\s*(.+?)(?:\n|$)", content) 

177 if log_type_match: 

178 log_type = log_type_match.group(1).strip().lower() 

179 if "ll-parallel" in log_type: 

180 return "ll-parallel" 

181 if "ll-auto" in log_type: 

182 return "ll-auto" 

183 

184 # Check Tool field 

185 tool_match = re.search(r"\*\*Tool\*\*:\s*(.+?)(?:\n|$)", content) 

186 if tool_match: 

187 tool = tool_match.group(1).strip().lower() 

188 if "ll-parallel" in tool: 

189 return "ll-parallel" 

190 if "ll-auto" in tool: 

191 return "ll-auto" 

192 

193 # Default to manual 

194 return "manual" 

195 

196 

197def scan_completed_issues(completed_dir: Path) -> list[CompletedIssue]: 

198 """Scan completed directory for issue files. 

199 

200 Args: 

201 completed_dir: Path to .issues/completed/ 

202 

203 Returns: 

204 List of parsed CompletedIssue objects 

205 """ 

206 issues: list[CompletedIssue] = [] 

207 

208 if not completed_dir.exists(): 

209 return issues 

210 

211 for file_path in sorted(completed_dir.glob("*.md")): 

212 try: 

213 issue = parse_completed_issue(file_path) 

214 issues.append(issue) 

215 except Exception: 

216 # Skip unparseable files 

217 continue 

218 

219 return issues 

220 

221 

222def _parse_discovered_date(fm: dict[str, Any]) -> date | None: 

223 """Extract discovered_date from parsed frontmatter. 

224 

225 Args: 

226 fm: Parsed frontmatter dictionary 

227 

228 Returns: 

229 discovered_date value or None 

230 """ 

231 value = fm.get("discovered_date") 

232 if not isinstance(value, str): 

233 return None 

234 try: 

235 return date.fromisoformat(value) 

236 except ValueError: 

237 return None 

238 

239 

240def _extract_subsystem(content: str) -> str | None: 

241 """Extract primary subsystem/directory from issue content. 

242 

243 Args: 

244 content: Issue file content 

245 

246 Returns: 

247 Directory path (e.g., "scripts/little_loops/") or None 

248 """ 

249 # Look for file paths in Location or common patterns 

250 patterns = [ 

251 r"\*\*File\*\*:\s*`?([^`\n]+/)[^/`\n]+`?", # **File**: path/to/file.py 

252 r"`([a-zA-Z_][\w/.-]+/)[^/`]+\.py`", # `path/to/file.py` 

253 ] 

254 

255 for pattern in patterns: 

256 match = re.search(pattern, content) 

257 if match: 

258 return match.group(1) 

259 

260 return None 

261 

262 

263def _extract_paths_from_issue(content: str) -> list[str]: 

264 """Extract all file paths from issue content. 

265 

266 Delegates to :func:`~little_loops.text_utils.extract_file_paths` 

267 and returns results as a sorted list for backward compatibility. 

268 

269 Args: 

270 content: Issue file content 

271 

272 Returns: 

273 Sorted list of file paths found in content 

274 """ 

275 return sorted(extract_file_paths(content)) 

276 

277 

278def _find_test_file(source_path: str) -> str | None: 

279 """Find corresponding test file for a source file. 

280 

281 Checks common test file naming patterns: 

282 - tests/test_<name>.py 

283 - tests/<path>/test_<name>.py 

284 - <path>/test_<name>.py 

285 - <path>/<name>_test.py 

286 - <path>/tests/test_<name>.py 

287 

288 Args: 

289 source_path: Path to source file (e.g., "src/core/processor.py") 

290 

291 Returns: 

292 Path to test file if found, None otherwise 

293 """ 

294 if not source_path.endswith(".py"): 

295 return None # Only check Python files for now 

296 

297 path = Path(source_path) 

298 stem = path.stem # filename without extension 

299 parent = str(path.parent) if path.parent != Path(".") else "" 

300 

301 # Generate candidate test file paths 

302 candidates: list[str] = [ 

303 f"tests/test_{stem}.py", 

304 f"{parent}/test_{stem}.py" if parent else f"test_{stem}.py", 

305 f"{parent}/{stem}_test.py" if parent else f"{stem}_test.py", 

306 f"{parent}/tests/test_{stem}.py" if parent else f"tests/test_{stem}.py", 

307 ] 

308 

309 # Add path-aware test locations 

310 if parent: 

311 candidates.append(f"tests/{parent}/test_{stem}.py") 

312 

313 # Project-specific pattern for little-loops 

314 # e.g., scripts/little_loops/foo.py -> scripts/tests/test_foo.py 

315 if source_path.startswith("scripts/little_loops/"): 

316 candidates.append(f"scripts/tests/test_{stem}.py") 

317 

318 for candidate in candidates: 

319 if Path(candidate).exists(): 

320 return candidate 

321 

322 return None 

323 

324 

325def scan_active_issues( 

326 issues_dir: Path, 

327 category_dirs: list[str] | None = None, 

328) -> list[tuple[Path, str, str, date | None]]: 

329 """Scan active issue directories. 

330 

331 Args: 

332 issues_dir: Path to .issues/ directory 

333 category_dirs: List of category subdirectory names to scan. When 

334 omitted, defaults to ``["bugs", "features", "enhancements"]`` for 

335 backward compatibility. Pass ``config.issue_categories`` to 

336 include custom project categories. 

337 

338 Returns: 

339 List of (path, issue_type, priority, discovered_date) tuples 

340 """ 

341 results: list[tuple[Path, str, str, date | None]] = [] 

342 

343 for category_dir in category_dirs or ["bugs", "features", "enhancements"]: 

344 category_path = issues_dir / category_dir 

345 if not category_path.exists(): 

346 continue 

347 

348 for file_path in category_path.glob("*.md"): 

349 filename = file_path.name 

350 

351 # Extract priority 

352 priority = "P5" 

353 priority_match = re.match(r"^(P\d)", filename) 

354 if priority_match: 

355 priority = priority_match.group(1) 

356 

357 # Extract type 

358 issue_type = "UNKNOWN" 

359 type_match = re.search(r"(BUG|ENH|FEAT)", filename) 

360 if type_match: 

361 issue_type = type_match.group(1) 

362 

363 # Extract discovered date from content 

364 discovered_date = None 

365 try: 

366 content = file_path.read_text(encoding="utf-8") 

367 fm = parse_frontmatter(content) 

368 discovered_date = _parse_discovered_date(fm) 

369 except Exception: 

370 pass 

371 

372 results.append((file_path, issue_type, priority, discovered_date)) 

373 

374 return results