Coverage for little_loops / issue_discovery / search.py: 0%

166 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Issue file search and main discovery functions.""" 

2 

3from __future__ import annotations 

4 

5import re 

6import subprocess 

7from datetime import datetime 

8from pathlib import Path 

9from typing import TYPE_CHECKING 

10 

11from little_loops.issue_discovery.extraction import ( 

12 _build_reopen_section, 

13 detect_regression_or_duplicate, 

14) 

15from little_loops.issue_discovery.matching import ( 

16 FindingMatch, 

17 MatchClassification, 

18 RegressionEvidence, 

19 _calculate_word_overlap, 

20 _extract_words, 

21 _matches_issue_type, 

22) 

23 

24if TYPE_CHECKING: 

25 from little_loops.config import BRConfig 

26 from little_loops.logger import Logger 

27 

28 

29# ============================================================================= 

30# Issue Search Functions 

31# ============================================================================= 

32 

33 

34def _get_all_issue_files( 

35 config: BRConfig, 

36 include_completed: bool = True, 

37 include_deferred: bool = False, 

38) -> list[tuple[Path, bool]]: 

39 """Get all issue files with their completion status. 

40 

41 Args: 

42 config: Project configuration 

43 include_completed: Whether to include completed issues 

44 include_deferred: Whether to include deferred issues 

45 

46 Returns: 

47 List of (path, is_completed) tuples. 

48 For deferred issues, is_completed is set to True (non-active). 

49 """ 

50 files: list[tuple[Path, bool]] = [] 

51 

52 # Active issues 

53 for category in config.issue_categories: 

54 issue_dir = config.get_issue_dir(category) 

55 if issue_dir.exists(): 

56 for f in issue_dir.glob("*.md"): 

57 files.append((f, False)) 

58 

59 # Completed issues 

60 if include_completed: 

61 completed_dir = config.get_completed_dir() 

62 if completed_dir.exists(): 

63 for f in completed_dir.glob("*.md"): 

64 files.append((f, True)) 

65 

66 # Deferred issues 

67 if include_deferred: 

68 deferred_dir = config.get_deferred_dir() 

69 if deferred_dir.exists(): 

70 for f in deferred_dir.glob("*.md"): 

71 files.append((f, True)) 

72 

73 return files 

74 

75 

76def search_issues_by_content( 

77 config: BRConfig, 

78 search_terms: list[str], 

79 include_completed: bool = True, 

80) -> list[tuple[Path, float, bool]]: 

81 """Search issues by content with relevance scoring. 

82 

83 Args: 

84 config: Project configuration 

85 search_terms: Terms to search for 

86 include_completed: Whether to include completed issues 

87 

88 Returns: 

89 List of (path, score, is_completed) sorted by score descending 

90 """ 

91 results: list[tuple[Path, float, bool]] = [] 

92 search_words = set() 

93 for term in search_terms: 

94 search_words.update(_extract_words(term)) 

95 

96 if not search_words: 

97 return results 

98 

99 for issue_path, is_completed in _get_all_issue_files(config, include_completed): 

100 try: 

101 content = issue_path.read_text(encoding="utf-8") 

102 content_words = _extract_words(content) 

103 score = _calculate_word_overlap(search_words, content_words) 

104 if score > 0.1: # Minimum threshold 

105 results.append((issue_path, score, is_completed)) 

106 except Exception: 

107 continue 

108 

109 results.sort(key=lambda x: x[1], reverse=True) 

110 return results 

111 

112 

113def search_issues_by_file_path( 

114 config: BRConfig, 

115 file_path: str, 

116 include_completed: bool = True, 

117) -> list[tuple[Path, bool]]: 

118 """Search for issues mentioning a specific file path. 

119 

120 Args: 

121 config: Project configuration 

122 file_path: File path to search for 

123 include_completed: Whether to include completed issues 

124 

125 Returns: 

126 List of (issue_path, is_completed) tuples 

127 """ 

128 results: list[tuple[Path, bool]] = [] 

129 normalized_path = file_path.strip().lower() 

130 

131 # Also match partial paths (e.g., "module.py" matches "src/module.py") 

132 path_parts = normalized_path.split("/") 

133 filename = path_parts[-1] if path_parts else normalized_path 

134 

135 for issue_path, is_completed in _get_all_issue_files(config, include_completed): 

136 try: 

137 content = issue_path.read_text(encoding="utf-8").lower() 

138 # Check for exact path or filename match 

139 if normalized_path in content or filename in content: 

140 results.append((issue_path, is_completed)) 

141 except Exception: 

142 continue 

143 

144 return results 

145 

146 

147# ============================================================================= 

148# Main Discovery Functions 

149# ============================================================================= 

150 

151 

152def find_existing_issue( 

153 config: BRConfig, 

154 finding_type: str, 

155 file_path: str | None, 

156 finding_title: str, 

157 finding_content: str, 

158) -> FindingMatch: 

159 """Search for an existing issue matching this finding. 

160 

161 Uses a multi-pass approach: 

162 1. Exact file path match in Location sections 

163 2. Title word overlap (>70% = likely duplicate) 

164 3. Content overlap analysis 

165 

166 For matches to completed issues, performs regression analysis to determine 

167 if the match is a regression (fix broke) or invalid fix (never worked). 

168 

169 Args: 

170 config: Project configuration 

171 finding_type: Issue type ("BUG", "ENH", "FEAT") 

172 file_path: File path from finding (if any) 

173 finding_title: Title of the finding 

174 finding_content: Full content/description of finding 

175 

176 Returns: 

177 FindingMatch with best match details, including classification and 

178 regression evidence for completed issue matches 

179 """ 

180 best_match = FindingMatch( 

181 issue_path=None, 

182 match_type="none", 

183 match_score=0.0, 

184 ) 

185 

186 # Pass 1: Exact file path match 

187 if file_path: 

188 path_matches = search_issues_by_file_path(config, file_path) 

189 for issue_path, is_completed in path_matches: 

190 try: 

191 # Check if same type of finding (uses configured categories) 

192 issue_type_match = _matches_issue_type( 

193 finding_type, issue_path, config, is_completed 

194 ) 

195 if issue_type_match: 

196 # Determine classification 

197 if is_completed: 

198 classification, evidence = detect_regression_or_duplicate( 

199 config, issue_path 

200 ) 

201 else: 

202 classification = MatchClassification.DUPLICATE 

203 evidence = None 

204 

205 # High confidence if same file + same type 

206 return FindingMatch( 

207 issue_path=issue_path, 

208 match_type="exact", 

209 match_score=0.85, 

210 is_completed=is_completed, 

211 matched_terms=[file_path], 

212 classification=classification, 

213 regression_evidence=evidence, 

214 ) 

215 except Exception: 

216 continue 

217 

218 # Pass 2: Title similarity 

219 title_words = _extract_words(finding_title) 

220 if title_words: 

221 best_pass2: tuple[Path, bool, float, list[str]] | None = None 

222 best_pass2_score = best_match.match_score 

223 for issue_path, is_completed in _get_all_issue_files(config): 

224 try: 

225 # Extract title from issue file 

226 content = issue_path.read_text(encoding="utf-8") 

227 title_match = re.search(r"^#\s+[\w-]+:\s*(.+)$", content, re.MULTILINE) 

228 if title_match: 

229 issue_title = title_match.group(1) 

230 issue_words = _extract_words(issue_title) 

231 overlap = _calculate_word_overlap(title_words, issue_words) 

232 if overlap > 0.7 and overlap > best_pass2_score: 

233 best_pass2_score = overlap 

234 best_pass2 = ( 

235 issue_path, 

236 is_completed, 

237 overlap, 

238 list(title_words & issue_words), 

239 ) 

240 except Exception: 

241 continue 

242 

243 # Determine classification once for the single best Pass 2 match 

244 if best_pass2 is not None: 

245 issue_path, is_completed, overlap, matched_terms = best_pass2 

246 if is_completed: 

247 classification, evidence = detect_regression_or_duplicate(config, issue_path) 

248 else: 

249 classification = MatchClassification.DUPLICATE 

250 evidence = None 

251 best_match = FindingMatch( 

252 issue_path=issue_path, 

253 match_type="similar", 

254 match_score=overlap, 

255 is_completed=is_completed, 

256 matched_terms=matched_terms, 

257 classification=classification, 

258 regression_evidence=evidence, 

259 ) 

260 

261 # Pass 3: Content analysis 

262 if best_match.match_score < 0.5: 

263 content_matches = search_issues_by_content( 

264 config, 

265 [finding_title, finding_content], 

266 ) 

267 best_pass3: tuple[Path, bool, float] | None = None 

268 best_pass3_score = best_match.match_score 

269 for issue_path, score, is_completed in content_matches[:5]: # Top 5 

270 adjusted_score = score * 0.8 # Content matches are less precise 

271 if adjusted_score > best_pass3_score: 

272 best_pass3_score = adjusted_score 

273 best_pass3 = (issue_path, is_completed, adjusted_score) 

274 

275 # Determine classification once for the single best Pass 3 match 

276 if best_pass3 is not None: 

277 issue_path, is_completed, adjusted_score = best_pass3 

278 if is_completed: 

279 classification, evidence = detect_regression_or_duplicate(config, issue_path) 

280 else: 

281 classification = MatchClassification.DUPLICATE 

282 evidence = None 

283 best_match = FindingMatch( 

284 issue_path=issue_path, 

285 match_type="content", 

286 match_score=adjusted_score, 

287 is_completed=is_completed, 

288 classification=classification, 

289 regression_evidence=evidence, 

290 ) 

291 

292 # If no match found, classification is NEW_ISSUE (the default) 

293 return best_match 

294 

295 

296# ============================================================================= 

297# Issue Reopening and Updating 

298# ============================================================================= 

299 

300 

301def _get_category_from_issue_path(issue_path: Path, config: BRConfig) -> str: 

302 """Determine the category for an issue based on its filename. 

303 

304 Args: 

305 issue_path: Path to issue file 

306 config: Project configuration 

307 

308 Returns: 

309 Category name (e.g., "bugs", "enhancements", "features") 

310 """ 

311 filename = issue_path.name.upper() 

312 for category_name, category_config in config.issues.categories.items(): 

313 if category_config.prefix in filename: 

314 return category_name 

315 return "bugs" # Default 

316 

317 

318def reopen_issue( 

319 config: BRConfig, 

320 completed_issue_path: Path, 

321 reopen_reason: str, 

322 new_context: str, 

323 source_command: str, 

324 logger: Logger, 

325 classification: MatchClassification | None = None, 

326 regression_evidence: RegressionEvidence | None = None, 

327) -> Path | None: 

328 """Move issue from completed back to active with Reopened section. 

329 

330 Args: 

331 config: Project configuration 

332 completed_issue_path: Path to issue in completed/ 

333 reopen_reason: Reason for reopening 

334 new_context: New context/findings to add 

335 source_command: Command triggering the reopen 

336 logger: Logger for output 

337 classification: How this issue was classified (regression, invalid_fix, etc.) 

338 regression_evidence: Evidence supporting the classification 

339 

340 Returns: 

341 New path to reopened issue, or None if failed 

342 """ 

343 if not completed_issue_path.exists(): 

344 logger.error(f"Completed issue not found: {completed_issue_path}") 

345 return None 

346 

347 # Determine target category directory 

348 category = _get_category_from_issue_path(completed_issue_path, config) 

349 target_dir = config.get_issue_dir(category) 

350 target_dir.mkdir(parents=True, exist_ok=True) 

351 

352 target_path = target_dir / completed_issue_path.name 

353 

354 # Safety check - don't overwrite existing active issue 

355 if target_path.exists(): 

356 logger.warning(f"Active issue already exists: {target_path}") 

357 return None 

358 

359 # Log with classification info if available 

360 if classification == MatchClassification.REGRESSION: 

361 logger.info(f"Reopening {completed_issue_path.name} as REGRESSION -> {category}/") 

362 elif classification == MatchClassification.INVALID_FIX: 

363 logger.info(f"Reopening {completed_issue_path.name} as INVALID_FIX -> {category}/") 

364 else: 

365 logger.info(f"Reopening {completed_issue_path.name} -> {category}/") 

366 

367 try: 

368 # Read and update content 

369 content = completed_issue_path.read_text(encoding="utf-8") 

370 

371 # Add reopened section with classification info 

372 reopen_section = _build_reopen_section( 

373 reopen_reason, 

374 new_context, 

375 source_command, 

376 classification, 

377 regression_evidence, 

378 ) 

379 content += reopen_section 

380 

381 # Try git mv first for history preservation 

382 result = subprocess.run( 

383 ["git", "mv", str(completed_issue_path), str(target_path)], 

384 capture_output=True, 

385 text=True, 

386 ) 

387 

388 if result.returncode != 0: 

389 # Fall back to manual copy 

390 logger.warning(f"git mv failed, using manual copy: {result.stderr}") 

391 target_path.write_text(content, encoding="utf-8") 

392 completed_issue_path.unlink() 

393 else: 

394 # Write updated content 

395 target_path.write_text(content, encoding="utf-8") 

396 

397 logger.success(f"Reopened: {target_path.name}") 

398 return target_path 

399 

400 except Exception as e: 

401 logger.error(f"Failed to reopen issue: {e}") 

402 return None 

403 

404 

405def update_existing_issue( 

406 config: BRConfig, 

407 issue_path: Path, 

408 update_section_name: str, 

409 update_content: str, 

410 source_command: str, 

411 logger: Logger, 

412) -> bool: 

413 """Add new findings to an existing issue. 

414 

415 Args: 

416 config: Project configuration 

417 issue_path: Path to issue file 

418 update_section_name: Name for the update section 

419 update_content: Content to add 

420 source_command: Command triggering the update 

421 logger: Logger for output 

422 

423 Returns: 

424 True if update succeeded 

425 """ 

426 if not issue_path.exists(): 

427 logger.error(f"Issue not found: {issue_path}") 

428 return False 

429 

430 try: 

431 content = issue_path.read_text(encoding="utf-8") 

432 

433 # Build update section 

434 update_section = f""" 

435 

436--- 

437 

438## {update_section_name} 

439 

440- **Date**: {datetime.now().strftime("%Y-%m-%d")} 

441- **Source**: {source_command} 

442 

443{update_content} 

444""" 

445 

446 # Check if section already exists 

447 if f"## {update_section_name}" not in content: 

448 content += update_section 

449 issue_path.write_text(content, encoding="utf-8") 

450 logger.success(f"Updated: {issue_path.name}") 

451 else: 

452 logger.info(f"Section already exists in {issue_path.name}, skipping") 

453 

454 return True 

455 

456 except Exception as e: 

457 logger.error(f"Failed to update issue: {e}") 

458 return False