Coverage for little_loops / parallel / file_hints.py: 22%

135 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""File hint extraction for overlap detection in parallel processing. 

2 

3Extracts file paths, directories, and scopes from issue content to predict 

4potential file modifications before dispatch. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from dataclasses import dataclass, field 

11from typing import TYPE_CHECKING 

12 

13if TYPE_CHECKING: 

14 from little_loops.config import DependencyMappingConfig 

15 

16# File path patterns - adapted from workflow_sequence_analyzer.py 

17# Match paths with common source code extensions 

18# NOTE: Order matters! Longer extensions must come before shorter ones (tsx before ts, jsx before js) 

19FILE_PATH_PATTERN = re.compile( 

20 r"(?:^|[\s`\"'(\[])([a-zA-Z0-9_./\-]+\.(?:tsx|jsx|json|yaml|yml|toml|scss|html|cpp|hpp|py|ts|js|md|sh|css|go|rs|java|c|h))", 

21 re.MULTILINE, 

22) 

23 

24# Directory path patterns (paths ending with / followed by whitespace/delimiter) 

25# This ensures we only capture explicit directory references, not prefixes of file paths 

26DIR_PATH_PATTERN = re.compile( 

27 r"(?:^|[\s`\"'(\[])([a-zA-Z0-9_./\-]+/)(?=[\s`\"')\],$]|$)", 

28 re.MULTILINE, 

29) 

30 

31# Component/scope patterns from issue content 

32# Matches: "scope: sidebar", "Component: auth", "module: api" 

33SCOPE_PATTERN = re.compile( 

34 r"(?:scope|component|module|directory|folder):\s*[`\"']?([a-zA-Z0-9_./\-]+)[`\"']?", 

35 re.IGNORECASE, 

36) 

37 

38# Overlap detection thresholds 

39MIN_OVERLAP_FILES = 2 # Minimum overlapping files to trigger overlap 

40OVERLAP_RATIO_THRESHOLD = 0.25 # Minimum ratio of overlapping files to smaller set 

41MIN_DIRECTORY_DEPTH = 2 # Minimum path segments for directory overlap (e.g., src/components/ = 2) 

42 

43# Common infrastructure files excluded from overlap detection. 

44# These appear incidentally in many issues but are rarely the actual conflict. 

45COMMON_FILES_EXCLUDE = frozenset( 

46 { 

47 "__init__.py", 

48 "pyproject.toml", 

49 "setup.py", 

50 "setup.cfg", 

51 "CHANGELOG.md", 

52 "README.md", 

53 "conftest.py", 

54 } 

55) 

56 

57 

58@dataclass 

59class FileHints: 

60 """Extracted file hints from issue content. 

61 

62 Attributes: 

63 files: Specific file paths mentioned 

64 directories: Directory paths mentioned 

65 scopes: Component/scope identifiers 

66 issue_id: Source issue ID 

67 """ 

68 

69 files: set[str] = field(default_factory=set) 

70 directories: set[str] = field(default_factory=set) 

71 scopes: set[str] = field(default_factory=set) 

72 issue_id: str = "" 

73 

74 @property 

75 def all_paths(self) -> set[str]: 

76 """All paths (files and directories) combined.""" 

77 return self.files | self.directories 

78 

79 @property 

80 def is_empty(self) -> bool: 

81 """Check if no hints were extracted.""" 

82 return not self.files and not self.directories and not self.scopes 

83 

84 def overlaps_with( 

85 self, 

86 other: FileHints, 

87 *, 

88 config: DependencyMappingConfig | None = None, 

89 ) -> bool: 

90 """Check if this hint set overlaps with another via file/directory signals. 

91 

92 Uses graduated thresholds rather than binary matching: 

93 - Common infrastructure files are excluded from file checks 

94 - File overlap requires minimum count or ratio threshold 

95 - Directory overlap requires minimum path depth 

96 - Scope matches are intentionally excluded — scope names are too coarse 

97 for sprint scheduling (all FSM/loop issues share the same scope) and 

98 cause every pair to serialize. Use ``contends_with()`` when scope 

99 matching is required for safety (e.g. worktree isolation). 

100 

101 Args: 

102 other: FileHints to compare against 

103 config: Optional dependency mapping config for custom thresholds. 

104 Falls back to module-level constants when not provided. 

105 """ 

106 # Empty hints don't overlap 

107 if self.is_empty or other.is_empty: 

108 return False 

109 

110 # Resolve thresholds from config or module constants 

111 min_files = config.overlap_min_files if config else MIN_OVERLAP_FILES 

112 ratio_threshold = config.overlap_min_ratio if config else OVERLAP_RATIO_THRESHOLD 

113 min_depth = config.min_directory_depth if config else MIN_DIRECTORY_DEPTH 

114 exclude_files = frozenset(config.exclude_common_files) if config else COMMON_FILES_EXCLUDE 

115 

116 # Filter common infrastructure files 

117 self_files = {f for f in self.files if not _is_common_file(f, exclude_files)} 

118 other_files = {f for f in other.files if not _is_common_file(f, exclude_files)} 

119 

120 # Exact file matches with thresholds 

121 shared_files = self_files & other_files 

122 if shared_files: 

123 smaller_set = min(len(self_files), len(other_files)) 

124 if smaller_set > 0: 

125 ratio = len(shared_files) / smaller_set 

126 if len(shared_files) >= min_files or ratio >= ratio_threshold: 

127 return True 

128 

129 # Directory overlaps (depth check in _directories_overlap) 

130 for d1 in self.directories: 

131 for d2 in other.directories: 

132 if _directories_overlap(d1, d2, min_depth=min_depth): 

133 return True 

134 

135 # File in directory (depth check in _file_in_directory) 

136 for f in self_files: 

137 for d in other.directories: 

138 if _file_in_directory(f, d, min_depth=min_depth): 

139 return True 

140 for f in other_files: 

141 for d in self.directories: 

142 if _file_in_directory(f, d, min_depth=min_depth): 

143 return True 

144 

145 return False 

146 

147 def contends_with( 

148 self, 

149 other: FileHints, 

150 *, 

151 config: DependencyMappingConfig | None = None, 

152 ) -> bool: 

153 """Check if this hint set contends with another, including scope signals. 

154 

155 Extends ``overlaps_with()`` with scope/component name matching. 

156 Scope matching is intentionally kept here for paths that require 

157 conservative safety (e.g. worktree isolation in ``ll-parallel``) where 

158 a shared component name is a meaningful risk signal even without 

159 confirmed file overlap. 

160 

161 Use ``overlaps_with()`` for sprint scheduling to avoid over-serialization 

162 of focused sprints where all issues share a scope name by design. 

163 

164 Args: 

165 other: FileHints to compare against 

166 config: Optional dependency mapping config for custom thresholds. 

167 Falls back to module-level constants when not provided. 

168 """ 

169 if self.overlaps_with(other, config=config): 

170 return True 

171 

172 # Scope matches — kept for worktree safety even without file overlap 

173 if self.scopes & other.scopes: 

174 return True 

175 

176 return False 

177 

178 def get_overlapping_paths( 

179 self, 

180 other: FileHints, 

181 *, 

182 config: DependencyMappingConfig | None = None, 

183 ) -> set[str]: 

184 """Get specific paths that overlap between two hint sets. 

185 

186 Unlike overlaps_with() which returns bool, this returns the 

187 actual file/directory paths causing the overlap. Applies the 

188 same filtering and thresholds as overlaps_with(). 

189 

190 Args: 

191 other: FileHints to compare against 

192 config: Optional dependency mapping config for custom thresholds. 

193 Falls back to module-level constants when not provided. 

194 """ 

195 if self.is_empty or other.is_empty: 

196 return set() 

197 

198 overlapping: set[str] = set() 

199 

200 # Resolve thresholds from config or module constants 

201 min_files = config.overlap_min_files if config else MIN_OVERLAP_FILES 

202 ratio_threshold = config.overlap_min_ratio if config else OVERLAP_RATIO_THRESHOLD 

203 min_depth = config.min_directory_depth if config else MIN_DIRECTORY_DEPTH 

204 exclude_files = frozenset(config.exclude_common_files) if config else COMMON_FILES_EXCLUDE 

205 

206 # Filter common infrastructure files 

207 self_files = {f for f in self.files if not _is_common_file(f, exclude_files)} 

208 other_files = {f for f in other.files if not _is_common_file(f, exclude_files)} 

209 

210 # Exact file matches (only if they meet thresholds) 

211 shared_files = self_files & other_files 

212 if shared_files: 

213 smaller_set = min(len(self_files), len(other_files)) 

214 if smaller_set > 0: 

215 ratio = len(shared_files) / smaller_set 

216 if len(shared_files) >= min_files or ratio >= ratio_threshold: 

217 overlapping.update(shared_files) 

218 

219 # Directory overlaps (depth check in _directories_overlap) 

220 for d1 in self.directories: 

221 for d2 in other.directories: 

222 if _directories_overlap(d1, d2, min_depth=min_depth): 

223 overlapping.add(d1 if len(d1) <= len(d2) else d2) 

224 

225 # File in directory (depth check in _file_in_directory) 

226 for f in self_files: 

227 for d in other.directories: 

228 if _file_in_directory(f, d, min_depth=min_depth): 

229 overlapping.add(f) 

230 for f in other_files: 

231 for d in self.directories: 

232 if _file_in_directory(f, d, min_depth=min_depth): 

233 overlapping.add(f) 

234 

235 return overlapping 

236 

237 

238def _is_common_file( 

239 path: str, 

240 exclude_files: frozenset[str] = COMMON_FILES_EXCLUDE, 

241) -> bool: 

242 """Check if a file is a common infrastructure file to exclude from overlap.""" 

243 basename = path.rsplit("/", 1)[-1] if "/" in path else path 

244 return basename in exclude_files 

245 

246 

247def _directories_overlap( 

248 dir1: str, 

249 dir2: str, 

250 *, 

251 min_depth: int = MIN_DIRECTORY_DEPTH, 

252) -> bool: 

253 """Check if two directory paths overlap (one contains the other). 

254 

255 Requires the shorter (parent) directory to have at least ``min_depth`` 

256 path segments to avoid treating broad directories like ``scripts/`` as 

257 a conflict signal. 

258 """ 

259 d1 = dir1.rstrip("/") + "/" 

260 d2 = dir2.rstrip("/") + "/" 

261 if not (d1.startswith(d2) or d2.startswith(d1)): 

262 return False 

263 # Require minimum depth on the shorter (parent) path 

264 shorter = d1 if len(d1) <= len(d2) else d2 

265 depth = len(shorter.rstrip("/").split("/")) 

266 return depth >= min_depth 

267 

268 

269def _file_in_directory( 

270 file_path: str, 

271 dir_path: str, 

272 *, 

273 min_depth: int = MIN_DIRECTORY_DEPTH, 

274) -> bool: 

275 """Check if a file is within a directory. 

276 

277 Requires the directory to have at least ``min_depth`` path segments 

278 to avoid treating broad directories as a conflict signal. 

279 """ 

280 dir_normalized = dir_path.rstrip("/") + "/" 

281 if not file_path.startswith(dir_normalized): 

282 return False 

283 depth = len(dir_normalized.rstrip("/").split("/")) 

284 return depth >= min_depth 

285 

286 

287def _extract_write_target_files(content: str) -> set[str]: 

288 """Extract file paths only from write-target sections of issue content. 

289 

290 Scopes extraction to "### Files to Modify" and "### Files Changed" sections 

291 to avoid treating reference docs (e.g., in "Related Key Documentation") as 

292 write targets, which would cause false overlap detection in sprint scheduling. 

293 

294 Args: 

295 content: Issue markdown content 

296 

297 Returns: 

298 Set of file paths that are actual write targets 

299 """ 

300 files: set[str] = set() 

301 section_pattern = re.compile( 

302 r"###\s*(?:Files to Modify|Files Changed)\s*\n(.*?)(?=\n###|\n##|\Z)", 

303 re.DOTALL, 

304 ) 

305 for section_match in section_pattern.finditer(content): 

306 section = section_match.group(1) 

307 for match in FILE_PATH_PATTERN.findall(section): 

308 if _is_valid_path(match): 

309 files.add(match) 

310 return files 

311 

312 

313def extract_file_hints(content: str, issue_id: str = "") -> FileHints: 

314 """Extract file hints from issue content. 

315 

316 Args: 

317 content: Issue markdown content 

318 issue_id: Optional issue ID for tracking 

319 

320 Returns: 

321 FileHints with extracted paths and scopes 

322 """ 

323 hints = FileHints(issue_id=issue_id) 

324 

325 # Extract file paths from write-target sections only. 

326 # Broad extraction across the full content would include reference docs 

327 # (e.g., "Related Key Documentation") causing false sprint serialization. 

328 hints.files = _extract_write_target_files(content) 

329 

330 # Extract directory paths 

331 for match in DIR_PATH_PATTERN.findall(content): 

332 if not _is_valid_path(match): 

333 continue 

334 hints.directories.add(match) 

335 

336 # Extract scopes 

337 for match in SCOPE_PATTERN.findall(content): 

338 hints.scopes.add(match.lower()) 

339 

340 return hints 

341 

342 

343def _is_valid_path(path: str) -> bool: 

344 """Filter out false positive paths.""" 

345 # Skip URLs 

346 if path.startswith("http") or path.startswith("//"): 

347 return False 

348 # Skip very short paths (likely not real file paths) 

349 if len(path) < 3: 

350 return False 

351 # Skip paths that are just file extensions 

352 if path.startswith(".") and "/" not in path: 

353 return False 

354 # Skip common false positives 

355 false_positives = {"e.g.", "i.e.", "etc.", "vs.", "v1.", "v2."} 

356 if path.lower() in false_positives: 

357 return False 

358 return True