Coverage for src/pylint_sort_functions/privacy_fixer.py: 100%

179 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-08-12 16:06 +0200

1"""Privacy fixer for automatic function renaming. 

2 

3This module provides functionality to automatically rename functions that should 

4be private (detected by W9004) by adding underscore prefixes. 

5 

6The implementation follows a conservative approach: 

71. Only rename functions where we can find ALL references safely 

82. Provide dry-run mode to preview changes 

93. Create backups by default 

104. Report all changes clearly 

11 

12Safety-first design ensures user confidence in the automated renaming. 

13 

14Refactored version using composition as described in GitHub Issue #32. 

15""" 

16 

17from collections import defaultdict 

18from pathlib import Path 

19from typing import Any, Dict, List, Optional, Set, Tuple, Union 

20 

21# Import the new modular components 

22from pylint_sort_functions.file_operations import FileOperations 

23from pylint_sort_functions.privacy_analyzer import PrivacyAnalyzer 

24from pylint_sort_functions.privacy_types import ( 

25 FunctionReference, 

26 FunctionTestReference, 

27 RenameCandidate, 

28) 

29from pylint_sort_functions.test_file_manager import TestFileManager 

30from pylint_sort_functions.test_file_updater import TestFileUpdater 

31 

32# Re-export types for backward compatibility 

33__all__ = [ 

34 "PrivacyFixer", 

35 "FunctionReference", 

36 "RenameCandidate", 

37 "FunctionTestReference", 

38] 

39 

40 

41class PrivacyFixer: 

42 """Handles automatic renaming of functions that should be private. 

43 

44 Refactored to use composition with focused components for better 

45 maintainability and separation of concerns. 

46 """ 

47 

48 def __init__(self, dry_run: bool = False, backup: bool = True): 

49 """Initialize the privacy fixer. 

50 

51 :param dry_run: If True, only analyze and report changes without applying them 

52 :param backup: If True, create .bak files before modifying originals 

53 """ 

54 self.dry_run = dry_run 

55 self.backup = backup 

56 self.rename_candidates: List[RenameCandidate] = [] 

57 

58 # Initialize component classes with composition 

59 self.analyzer = PrivacyAnalyzer() 

60 self.test_manager = TestFileManager() 

61 self.test_updater = TestFileUpdater(backup=backup) 

62 self.file_ops = FileOperations(backup=backup) 

63 

64 # Public methods 

65 

66 def analyze_module( 

67 self, 

68 files_or_module_path: Union[List[Path], Path], # For backward compatibility 

69 project_root: Path, 

70 public_patterns_or_include_test_analysis: Optional[ 

71 Union[Set[str], bool] 

72 ] = None, 

73 ) -> List[RenameCandidate]: 

74 """Analyze a module for functions that can be automatically renamed to private. 

75 

76 This method supports two signatures for backward compatibility: 

77 1. New: analyze_module(files, project_root, include_test_analysis=True) 

78 2. Old: analyze_module(module_path, project_root, public_patterns=None) 

79 

80 :param files_or_module_path: List of files (new) or single module path (old) 

81 :param project_root: Root directory of the project 

82 :param public_patterns_or_include_test_analysis: Set of public patterns (old) 

83 or include_test_analysis flag (new) 

84 :returns: List of functions that can be safely renamed 

85 """ 

86 # Handle backward compatibility with old signature 

87 if isinstance(files_or_module_path, Path): 

88 # Old signature: analyze_module(module_path, project_root, public_patterns) 

89 # Return empty list for backward compatibility (was TODO placeholder) 

90 return [] 

91 

92 # New signature: analyze_module(files, project_root, include_test_analysis) 

93 files = files_or_module_path 

94 include_test_analysis = public_patterns_or_include_test_analysis 

95 if include_test_analysis is None: 

96 include_test_analysis = True 

97 

98 # Use analyzer to detect privacy violations 

99 violations = self.analyzer.analyze_module_privacy(files, project_root) 

100 

101 # Find test references if requested 

102 if include_test_analysis: 

103 test_files = self.test_manager.find_test_files(project_root) 

104 

105 for candidate in violations: 

106 # Find test references for this function 

107 test_references = self.test_manager.find_test_references( 

108 candidate.old_name, test_files 

109 ) 

110 

111 # Update candidate with test references 

112 updated_candidate = candidate._replace(test_references=test_references) 

113 violations[violations.index(candidate)] = updated_candidate 

114 

115 # Validate safety for each candidate 

116 validated_candidates = [] 

117 for candidate in violations: 

118 is_safe, issues = self.is_safe_to_rename( 

119 candidate 

120 ) # Use our own method for inheritance 

121 validated_candidate = candidate._replace( 

122 is_safe=is_safe, safety_issues=issues 

123 ) 

124 validated_candidates.append(validated_candidate) 

125 

126 self.rename_candidates = validated_candidates 

127 return validated_candidates 

128 

129 def apply_renames( # pylint: disable=too-many-locals,too-many-branches 

130 self, candidates: List[RenameCandidate], project_root: Optional[Path] = None 

131 ) -> Dict[str, Any]: 

132 """Apply the function renames to the module files and update test files. 

133 

134 :param candidates: List of validated rename candidates 

135 :param project_root: Root directory for finding test files (optional) 

136 :returns: Report of changes made 

137 """ 

138 if not candidates: 

139 return {"renamed": 0, "skipped": 0, "reason": "No candidates provided"} 

140 

141 # Group candidates by file path 

142 candidates_by_file = self._group_candidates_by_file(candidates) 

143 

144 renamed_count = 0 

145 skipped_count = 0 

146 errors = [] 

147 test_files_updated = 0 

148 test_file_errors = [] 

149 

150 # First, apply renames to the production files 

151 for file_path, file_candidates in candidates_by_file.items(): 

152 try: 

153 result = self._apply_renames_to_file(file_path, file_candidates) 

154 renamed_count += result["renamed"] 

155 skipped_count += result["skipped"] 

156 if result.get("errors"): 

157 errors.extend(result["errors"]) 

158 except Exception as e: # pylint: disable=broad-exception-caught 

159 error_msg = f"Error processing {file_path}: {str(e)}" 

160 errors.append(error_msg) 

161 skipped_count += len(file_candidates) 

162 

163 # Second, update test files if project_root is provided and we have 

164 # successful renames 

165 if ( # pylint: disable=too-many-nested-blocks 

166 project_root and renamed_count > 0 and not self.dry_run 

167 ): 

168 # Process each successfully renamed candidate 

169 for file_path, file_candidates in candidates_by_file.items(): 

170 for candidate in file_candidates: 

171 if candidate.is_safe and candidate.test_references: 

172 # Group test references by file 

173 test_refs_by_file: Dict[Path, List[FunctionTestReference]] = {} 

174 for ref in candidate.test_references: 

175 if ref.file_path not in test_refs_by_file: 

176 test_refs_by_file[ref.file_path] = [] 

177 test_refs_by_file[ref.file_path].append(ref) 

178 

179 # Update each test file that references this function 

180 for test_file_path, refs in test_refs_by_file.items(): 

181 try: 

182 test_result = self.test_updater.update_test_file( 

183 test_file_path, 

184 candidate.old_name, 

185 candidate.new_name, 

186 refs, 

187 ) 

188 

189 if test_result["success"]: 

190 test_files_updated += 1 

191 else: 

192 error_msg = ( 

193 f"Test file {test_file_path}: " 

194 f"{test_result.get('error', 'Update failed')}" 

195 ) 

196 test_file_errors.append(error_msg) 

197 except Exception as e: # pylint: disable=broad-exception-caught 

198 error_msg = ( 

199 f"Error updating test file {test_file_path}: " 

200 f"{str(e)}" 

201 ) 

202 test_file_errors.append(error_msg) 

203 

204 # Prepare comprehensive report 

205 report = { 

206 "renamed": renamed_count, 

207 "skipped": skipped_count, 

208 "errors": errors, 

209 } 

210 

211 # Add test file information if we attempted test updates 

212 if project_root: 

213 report["test_files_updated"] = test_files_updated 

214 report["test_file_errors"] = test_file_errors 

215 

216 return report 

217 

218 def detect_privacy_violations( 

219 self, 

220 files: List[Path], 

221 project_root: Path, 

222 ) -> List[RenameCandidate]: 

223 """Detect functions that should be private across multiple files. 

224 

225 Delegates to the privacy analyzer for the actual detection logic. 

226 

227 :param files: List of Python files to analyze 

228 :param project_root: Root directory of the project for cross-module analysis 

229 :returns: List of functions that violate privacy guidelines 

230 """ 

231 return self.analyzer.analyze_module_privacy(files, project_root) 

232 

233 def find_function_references( 

234 self, 

235 function_name: str, 

236 module_ast: Any, # astroid nodes.Module 

237 ) -> List[FunctionReference]: 

238 """Find all references to a function within a module. 

239 

240 Delegates to the privacy analyzer for the actual reference finding logic. 

241 

242 :param function_name: Name of the function to find references for 

243 :param module_ast: AST of the module to search in 

244 :returns: List of all references found 

245 """ 

246 return self.analyzer.find_function_references(function_name, module_ast) 

247 

248 def find_test_files(self, project_root: Path) -> List[Path]: 

249 """Find all test files in the project. 

250 

251 Delegates to the test file manager for the actual file discovery logic. 

252 

253 :param project_root: Root directory of the project 

254 :returns: List of paths to test files 

255 """ 

256 return self.test_manager.find_test_files(project_root) 

257 

258 def find_test_references( 

259 self, function_name: str, test_files: List[Path] 

260 ) -> List[FunctionTestReference]: 

261 """Find all references to a function in test files. 

262 

263 Delegates to the test file manager for the actual reference finding logic. 

264 

265 :param function_name: Name of the function to find references for 

266 :param test_files: List of test files to scan 

267 :returns: List of test file references 

268 """ 

269 return self.test_manager.find_test_references(function_name, test_files) 

270 

271 def generate_report(self, candidates: List[RenameCandidate]) -> str: 

272 """Generate a human-readable report of rename operations. 

273 

274 :param candidates: List of rename candidates 

275 :returns: Formatted report string 

276 """ 

277 if not candidates: 

278 return "No functions found that need privacy fixes." 

279 

280 report_lines = ["Privacy Fix Analysis:", ""] 

281 

282 safe_count = sum(1 for c in candidates if c.is_safe) 

283 unsafe_count = len(candidates) - safe_count 

284 

285 if safe_count > 0: 

286 report_lines.append(f"✅ Can safely rename {safe_count} functions:") 

287 for candidate in candidates: 

288 if candidate.is_safe: 

289 ref_count = len(candidate.references) 

290 report_lines.append( 

291 f" • {candidate.old_name} → {candidate.new_name} " 

292 f"({ref_count} references)" 

293 ) 

294 report_lines.append("") 

295 

296 if unsafe_count > 0: 

297 report_lines.append(f"⚠️ Cannot safely rename {unsafe_count} functions:") 

298 for candidate in candidates: 

299 if not candidate.is_safe: 

300 issues = ", ".join(candidate.safety_issues) 

301 report_lines.append(f" • {candidate.old_name}: {issues}") 

302 report_lines.append("") 

303 

304 return "\n".join(report_lines) 

305 

306 def is_safe_to_rename(self, candidate: RenameCandidate) -> Tuple[bool, List[str]]: 

307 """Check if a function can be safely renamed. 

308 

309 Conservative safety checks: 

310 1. No dynamic references (getattr, hasattr with strings) 

311 2. No string literals containing the function name 

312 3. No name conflicts with existing private functions 

313 4. All references are in contexts we can handle 

314 

315 :param candidate: The rename candidate to validate 

316 :returns: Tuple of (is_safe, list_of_issues) 

317 """ 

318 issues = [] 

319 

320 # Check for name conflicts - call our own methods that can be overridden 

321 if self._has_name_conflict(candidate): 

322 issues.append(f"Private function '{candidate.new_name}' already exists") 

323 

324 # Check for dynamic references in the module 

325 if self._has_dynamic_references(candidate): 

326 issues.append("Contains dynamic references (getattr, hasattr, etc.)") 

327 

328 # Check for string literals containing the function name 

329 if self._has_string_references(candidate): 

330 issues.append("Function name found in string literals") 

331 

332 # Check if all references are in safe contexts 

333 unsafe_contexts = self._check_reference_contexts(candidate) 

334 if unsafe_contexts: 

335 issues.append(f"Unsafe reference contexts: {', '.join(unsafe_contexts)}") 

336 

337 return len(issues) == 0, issues 

338 

339 def update_test_file( 

340 self, 

341 test_file: Path, 

342 old_name: str, 

343 new_name: str, 

344 test_references: List[FunctionTestReference], 

345 ) -> Dict[str, Any]: 

346 """Update a test file to use the new function name with backup and rollback. 

347 

348 Delegates to the test file updater for the actual update logic. 

349 

350 :param test_file: Path to the test file to update 

351 :param old_name: Original function name 

352 :param new_name: New private function name (with underscore) 

353 :param test_references: List of test references to update 

354 :returns: Report of the update operation 

355 """ 

356 return self.test_updater.update_test_file( 

357 test_file, old_name, new_name, test_references 

358 ) 

359 

360 # Private methods 

361 

362 # Additional delegation methods for backward compatibility with tests 

363 # pylint: disable=protected-access 

364 

365 def _apply_renames_to_content( 

366 self, content: str, candidates: List[RenameCandidate] 

367 ) -> str: 

368 """Apply function name renames to file content (backward compatibility).""" 

369 return self.file_ops._apply_renames_to_content(content, candidates) 

370 

371 def _apply_renames_to_file( 

372 self, file_path: Path, candidates: List[RenameCandidate] 

373 ) -> Dict[str, Any]: 

374 """Apply renames to a specific file (backward compatibility).""" 

375 return self.file_ops.apply_renames_to_file(file_path, candidates, self.dry_run) 

376 

377 def _build_import_graph(self, project_root: Path) -> Dict[Path, Set[str]]: 

378 """Build a graph of imports across the project (backward compatibility).""" 

379 return self.analyzer._build_import_graph(project_root) 

380 

381 def _check_reference_contexts(self, candidate: RenameCandidate) -> List[str]: 

382 """Check if all references are in contexts we can safely handle.""" 

383 return self.analyzer._check_reference_contexts(candidate) 

384 

385 def _extract_function_imports(self, module: Any) -> Set[str]: 

386 """Extract function names that are imported by a module.""" 

387 return self.analyzer._extract_function_imports(module) 

388 

389 def _fallback_privacy_heuristics(self, func: Any) -> bool: 

390 """Fallback heuristics when cross-module analysis isn't available.""" 

391 return self.analyzer._fallback_privacy_heuristics(func) 

392 

393 def _find_references_in_test_file( 

394 self, function_name: str, test_file: Path, module: Any, content: str 

395 ) -> List[FunctionTestReference]: 

396 """Find function references in a test file using AST analysis.""" 

397 return self.test_manager._find_references_in_test_file( 

398 function_name, test_file, module, content 

399 ) 

400 

401 def _find_string_references_in_test_file( 

402 self, function_name: str, test_file: Path, content: str 

403 ) -> List[FunctionTestReference]: 

404 """Find function references in test file using string-based analysis.""" 

405 return self.test_manager._find_string_references_in_test_file( 

406 function_name, test_file, content 

407 ) 

408 

409 def _get_functions_from_module(self, module: Any) -> List[Any]: 

410 """Extract all function definitions from a module (backward compatibility).""" 

411 return self.analyzer._get_functions_from_module(module) 

412 

413 def _group_candidates_by_file( # pylint: disable=too-many-nested-blocks 

414 self, candidates: List[RenameCandidate] 

415 ) -> Dict[Path, List[RenameCandidate]]: 

416 """Group rename candidates by the file they belong to. 

417 

418 :param candidates: List of rename candidates 

419 :returns: Dictionary mapping file paths to candidate lists 

420 """ 

421 # For MVP, we'll extract file path from function node 

422 # In a more complete implementation, we'd track file paths explicitly 

423 candidates_by_file: Dict[Path, List[RenameCandidate]] = defaultdict(list) 

424 

425 for candidate in candidates: 

426 # Extract file path from the function node 

427 file_path = None 

428 

429 # Try to get file path from the AST node 

430 try: 

431 if hasattr(candidate.function_node, "root"): 

432 root = candidate.function_node.root() 

433 if hasattr(root, "file") and root.file and root.file != "<?>": 

434 file_path = Path(root.file) 

435 elif hasattr(root, "name") and root.name and root.name != "<?>": 

436 # For astroid modules parsed with explicit names 

437 file_path = Path(root.name) 

438 except Exception: # pylint: disable=broad-exception-caught 

439 # If we can't get file path from node, continue to fallback 

440 pass 

441 

442 # Fallback: use a unique identifier based on the node 

443 if file_path is None: 

444 # For testing scenarios, create unique file names based on function name 

445 # This ensures different functions get grouped separately when needed 

446 try: 

447 node_id = ( 

448 id(candidate.function_node.root()) 

449 if hasattr(candidate.function_node, "root") 

450 else id(candidate.function_node) 

451 ) 

452 except Exception: # pylint: disable=broad-exception-caught 

453 # If even getting the root fails, use the function node itself 

454 node_id = id(candidate.function_node) 

455 file_path = Path(f"file_{node_id}.py") 

456 

457 candidates_by_file[file_path].append(candidate) 

458 

459 return dict(candidates_by_file) 

460 

461 def _has_dynamic_references(self, candidate: RenameCandidate) -> bool: 

462 """Check for dynamic references that we can't safely rename.""" 

463 return self.analyzer._has_dynamic_references(candidate) 

464 

465 def _has_name_conflict(self, candidate: RenameCandidate) -> bool: 

466 """Check if renaming would create a name conflict.""" 

467 return self.analyzer._has_name_conflict(candidate) 

468 

469 def _has_string_references(self, candidate: RenameCandidate) -> bool: 

470 """Check for string literals containing the function name.""" 

471 return self.analyzer._has_string_references(candidate) 

472 

473 def _is_function_used_externally( 

474 self, func_name: str, file_path: Path, import_graph: Dict[Path, Set[str]] 

475 ) -> bool: 

476 """Check if a function is imported by other modules (backward compatibility).""" 

477 return self.analyzer._is_function_used_externally( 

478 func_name, file_path, import_graph 

479 ) 

480 

481 def _should_function_be_private( 

482 self, func: Any, file_path: Path, project_root: Path 

483 ) -> bool: 

484 """Determine if a function should be private based on cross-module usage.""" 

485 return self.analyzer.should_function_be_private(func, file_path, project_root) 

486 

487 def _update_import_statements( 

488 self, 

489 test_file: Path, 

490 old_name: str, 

491 new_name: str, 

492 test_references: List[FunctionTestReference], 

493 ) -> bool: 

494 """Update import statements in a test file to use the new function name.""" 

495 return self.test_updater._update_import_statements( 

496 test_file, old_name, new_name, test_references 

497 ) 

498 

499 def _update_mock_patterns( 

500 self, 

501 test_file: Path, 

502 old_name: str, 

503 new_name: str, 

504 test_references: List[FunctionTestReference], 

505 ) -> bool: 

506 """Update mock patch patterns in a test file to use the new function name.""" 

507 return self.test_updater._update_mock_patterns( 

508 test_file, old_name, new_name, test_references 

509 )