Coverage for little_loops / issue_discovery / search.py: 0%
166 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Issue file search and main discovery functions."""
3from __future__ import annotations
5import re
6import subprocess
7from datetime import datetime
8from pathlib import Path
9from typing import TYPE_CHECKING
11from little_loops.issue_discovery.extraction import (
12 _build_reopen_section,
13 detect_regression_or_duplicate,
14)
15from little_loops.issue_discovery.matching import (
16 FindingMatch,
17 MatchClassification,
18 RegressionEvidence,
19 _calculate_word_overlap,
20 _extract_words,
21 _matches_issue_type,
22)
24if TYPE_CHECKING:
25 from little_loops.config import BRConfig
26 from little_loops.logger import Logger
29# =============================================================================
30# Issue Search Functions
31# =============================================================================
34def _get_all_issue_files(
35 config: BRConfig,
36 include_completed: bool = True,
37 include_deferred: bool = False,
38) -> list[tuple[Path, bool]]:
39 """Get all issue files with their completion status.
41 Args:
42 config: Project configuration
43 include_completed: Whether to include completed issues
44 include_deferred: Whether to include deferred issues
46 Returns:
47 List of (path, is_completed) tuples.
48 For deferred issues, is_completed is set to True (non-active).
49 """
50 files: list[tuple[Path, bool]] = []
52 # Active issues
53 for category in config.issue_categories:
54 issue_dir = config.get_issue_dir(category)
55 if issue_dir.exists():
56 for f in issue_dir.glob("*.md"):
57 files.append((f, False))
59 # Completed issues
60 if include_completed:
61 completed_dir = config.get_completed_dir()
62 if completed_dir.exists():
63 for f in completed_dir.glob("*.md"):
64 files.append((f, True))
66 # Deferred issues
67 if include_deferred:
68 deferred_dir = config.get_deferred_dir()
69 if deferred_dir.exists():
70 for f in deferred_dir.glob("*.md"):
71 files.append((f, True))
73 return files
76def search_issues_by_content(
77 config: BRConfig,
78 search_terms: list[str],
79 include_completed: bool = True,
80) -> list[tuple[Path, float, bool]]:
81 """Search issues by content with relevance scoring.
83 Args:
84 config: Project configuration
85 search_terms: Terms to search for
86 include_completed: Whether to include completed issues
88 Returns:
89 List of (path, score, is_completed) sorted by score descending
90 """
91 results: list[tuple[Path, float, bool]] = []
92 search_words = set()
93 for term in search_terms:
94 search_words.update(_extract_words(term))
96 if not search_words:
97 return results
99 for issue_path, is_completed in _get_all_issue_files(config, include_completed):
100 try:
101 content = issue_path.read_text(encoding="utf-8")
102 content_words = _extract_words(content)
103 score = _calculate_word_overlap(search_words, content_words)
104 if score > 0.1: # Minimum threshold
105 results.append((issue_path, score, is_completed))
106 except Exception:
107 continue
109 results.sort(key=lambda x: x[1], reverse=True)
110 return results
113def search_issues_by_file_path(
114 config: BRConfig,
115 file_path: str,
116 include_completed: bool = True,
117) -> list[tuple[Path, bool]]:
118 """Search for issues mentioning a specific file path.
120 Args:
121 config: Project configuration
122 file_path: File path to search for
123 include_completed: Whether to include completed issues
125 Returns:
126 List of (issue_path, is_completed) tuples
127 """
128 results: list[tuple[Path, bool]] = []
129 normalized_path = file_path.strip().lower()
131 # Also match partial paths (e.g., "module.py" matches "src/module.py")
132 path_parts = normalized_path.split("/")
133 filename = path_parts[-1] if path_parts else normalized_path
135 for issue_path, is_completed in _get_all_issue_files(config, include_completed):
136 try:
137 content = issue_path.read_text(encoding="utf-8").lower()
138 # Check for exact path or filename match
139 if normalized_path in content or filename in content:
140 results.append((issue_path, is_completed))
141 except Exception:
142 continue
144 return results
147# =============================================================================
148# Main Discovery Functions
149# =============================================================================
152def find_existing_issue(
153 config: BRConfig,
154 finding_type: str,
155 file_path: str | None,
156 finding_title: str,
157 finding_content: str,
158) -> FindingMatch:
159 """Search for an existing issue matching this finding.
161 Uses a multi-pass approach:
162 1. Exact file path match in Location sections
163 2. Title word overlap (>70% = likely duplicate)
164 3. Content overlap analysis
166 For matches to completed issues, performs regression analysis to determine
167 if the match is a regression (fix broke) or invalid fix (never worked).
169 Args:
170 config: Project configuration
171 finding_type: Issue type ("BUG", "ENH", "FEAT")
172 file_path: File path from finding (if any)
173 finding_title: Title of the finding
174 finding_content: Full content/description of finding
176 Returns:
177 FindingMatch with best match details, including classification and
178 regression evidence for completed issue matches
179 """
180 best_match = FindingMatch(
181 issue_path=None,
182 match_type="none",
183 match_score=0.0,
184 )
186 # Pass 1: Exact file path match
187 if file_path:
188 path_matches = search_issues_by_file_path(config, file_path)
189 for issue_path, is_completed in path_matches:
190 try:
191 # Check if same type of finding (uses configured categories)
192 issue_type_match = _matches_issue_type(
193 finding_type, issue_path, config, is_completed
194 )
195 if issue_type_match:
196 # Determine classification
197 if is_completed:
198 classification, evidence = detect_regression_or_duplicate(
199 config, issue_path
200 )
201 else:
202 classification = MatchClassification.DUPLICATE
203 evidence = None
205 # High confidence if same file + same type
206 return FindingMatch(
207 issue_path=issue_path,
208 match_type="exact",
209 match_score=0.85,
210 is_completed=is_completed,
211 matched_terms=[file_path],
212 classification=classification,
213 regression_evidence=evidence,
214 )
215 except Exception:
216 continue
218 # Pass 2: Title similarity
219 title_words = _extract_words(finding_title)
220 if title_words:
221 best_pass2: tuple[Path, bool, float, list[str]] | None = None
222 best_pass2_score = best_match.match_score
223 for issue_path, is_completed in _get_all_issue_files(config):
224 try:
225 # Extract title from issue file
226 content = issue_path.read_text(encoding="utf-8")
227 title_match = re.search(r"^#\s+[\w-]+:\s*(.+)$", content, re.MULTILINE)
228 if title_match:
229 issue_title = title_match.group(1)
230 issue_words = _extract_words(issue_title)
231 overlap = _calculate_word_overlap(title_words, issue_words)
232 if overlap > 0.7 and overlap > best_pass2_score:
233 best_pass2_score = overlap
234 best_pass2 = (
235 issue_path,
236 is_completed,
237 overlap,
238 list(title_words & issue_words),
239 )
240 except Exception:
241 continue
243 # Determine classification once for the single best Pass 2 match
244 if best_pass2 is not None:
245 issue_path, is_completed, overlap, matched_terms = best_pass2
246 if is_completed:
247 classification, evidence = detect_regression_or_duplicate(config, issue_path)
248 else:
249 classification = MatchClassification.DUPLICATE
250 evidence = None
251 best_match = FindingMatch(
252 issue_path=issue_path,
253 match_type="similar",
254 match_score=overlap,
255 is_completed=is_completed,
256 matched_terms=matched_terms,
257 classification=classification,
258 regression_evidence=evidence,
259 )
261 # Pass 3: Content analysis
262 if best_match.match_score < 0.5:
263 content_matches = search_issues_by_content(
264 config,
265 [finding_title, finding_content],
266 )
267 best_pass3: tuple[Path, bool, float] | None = None
268 best_pass3_score = best_match.match_score
269 for issue_path, score, is_completed in content_matches[:5]: # Top 5
270 adjusted_score = score * 0.8 # Content matches are less precise
271 if adjusted_score > best_pass3_score:
272 best_pass3_score = adjusted_score
273 best_pass3 = (issue_path, is_completed, adjusted_score)
275 # Determine classification once for the single best Pass 3 match
276 if best_pass3 is not None:
277 issue_path, is_completed, adjusted_score = best_pass3
278 if is_completed:
279 classification, evidence = detect_regression_or_duplicate(config, issue_path)
280 else:
281 classification = MatchClassification.DUPLICATE
282 evidence = None
283 best_match = FindingMatch(
284 issue_path=issue_path,
285 match_type="content",
286 match_score=adjusted_score,
287 is_completed=is_completed,
288 classification=classification,
289 regression_evidence=evidence,
290 )
292 # If no match found, classification is NEW_ISSUE (the default)
293 return best_match
296# =============================================================================
297# Issue Reopening and Updating
298# =============================================================================
301def _get_category_from_issue_path(issue_path: Path, config: BRConfig) -> str:
302 """Determine the category for an issue based on its filename.
304 Args:
305 issue_path: Path to issue file
306 config: Project configuration
308 Returns:
309 Category name (e.g., "bugs", "enhancements", "features")
310 """
311 filename = issue_path.name.upper()
312 for category_name, category_config in config.issues.categories.items():
313 if category_config.prefix in filename:
314 return category_name
315 return "bugs" # Default
318def reopen_issue(
319 config: BRConfig,
320 completed_issue_path: Path,
321 reopen_reason: str,
322 new_context: str,
323 source_command: str,
324 logger: Logger,
325 classification: MatchClassification | None = None,
326 regression_evidence: RegressionEvidence | None = None,
327) -> Path | None:
328 """Move issue from completed back to active with Reopened section.
330 Args:
331 config: Project configuration
332 completed_issue_path: Path to issue in completed/
333 reopen_reason: Reason for reopening
334 new_context: New context/findings to add
335 source_command: Command triggering the reopen
336 logger: Logger for output
337 classification: How this issue was classified (regression, invalid_fix, etc.)
338 regression_evidence: Evidence supporting the classification
340 Returns:
341 New path to reopened issue, or None if failed
342 """
343 if not completed_issue_path.exists():
344 logger.error(f"Completed issue not found: {completed_issue_path}")
345 return None
347 # Determine target category directory
348 category = _get_category_from_issue_path(completed_issue_path, config)
349 target_dir = config.get_issue_dir(category)
350 target_dir.mkdir(parents=True, exist_ok=True)
352 target_path = target_dir / completed_issue_path.name
354 # Safety check - don't overwrite existing active issue
355 if target_path.exists():
356 logger.warning(f"Active issue already exists: {target_path}")
357 return None
359 # Log with classification info if available
360 if classification == MatchClassification.REGRESSION:
361 logger.info(f"Reopening {completed_issue_path.name} as REGRESSION -> {category}/")
362 elif classification == MatchClassification.INVALID_FIX:
363 logger.info(f"Reopening {completed_issue_path.name} as INVALID_FIX -> {category}/")
364 else:
365 logger.info(f"Reopening {completed_issue_path.name} -> {category}/")
367 try:
368 # Read and update content
369 content = completed_issue_path.read_text(encoding="utf-8")
371 # Add reopened section with classification info
372 reopen_section = _build_reopen_section(
373 reopen_reason,
374 new_context,
375 source_command,
376 classification,
377 regression_evidence,
378 )
379 content += reopen_section
381 # Try git mv first for history preservation
382 result = subprocess.run(
383 ["git", "mv", str(completed_issue_path), str(target_path)],
384 capture_output=True,
385 text=True,
386 )
388 if result.returncode != 0:
389 # Fall back to manual copy
390 logger.warning(f"git mv failed, using manual copy: {result.stderr}")
391 target_path.write_text(content, encoding="utf-8")
392 completed_issue_path.unlink()
393 else:
394 # Write updated content
395 target_path.write_text(content, encoding="utf-8")
397 logger.success(f"Reopened: {target_path.name}")
398 return target_path
400 except Exception as e:
401 logger.error(f"Failed to reopen issue: {e}")
402 return None
405def update_existing_issue(
406 config: BRConfig,
407 issue_path: Path,
408 update_section_name: str,
409 update_content: str,
410 source_command: str,
411 logger: Logger,
412) -> bool:
413 """Add new findings to an existing issue.
415 Args:
416 config: Project configuration
417 issue_path: Path to issue file
418 update_section_name: Name for the update section
419 update_content: Content to add
420 source_command: Command triggering the update
421 logger: Logger for output
423 Returns:
424 True if update succeeded
425 """
426 if not issue_path.exists():
427 logger.error(f"Issue not found: {issue_path}")
428 return False
430 try:
431 content = issue_path.read_text(encoding="utf-8")
433 # Build update section
434 update_section = f"""
436---
438## {update_section_name}
440- **Date**: {datetime.now().strftime("%Y-%m-%d")}
441- **Source**: {source_command}
443{update_content}
444"""
446 # Check if section already exists
447 if f"## {update_section_name}" not in content:
448 content += update_section
449 issue_path.write_text(content, encoding="utf-8")
450 logger.success(f"Updated: {issue_path.name}")
451 else:
452 logger.info(f"Section already exists in {issue_path.name}, skipping")
454 return True
456 except Exception as e:
457 logger.error(f"Failed to update issue: {e}")
458 return False