Coverage for little_loops / cli / deps.py: 3%
152 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""ll-deps: Cross-issue dependency discovery and validation."""
3from __future__ import annotations
5import argparse
6from pathlib import Path
9def _load_issues(
10 issues_dir: Path,
11 only_ids: set[str] | None = None,
12) -> tuple[list, dict[str, str], set[str]]:
13 """Load issues from directory for CLI use.
15 Args:
16 issues_dir: Path to the issues base directory (e.g., .issues)
17 only_ids: If provided, only include issues with these IDs
19 Returns:
20 Tuple of (active issues, issue contents map, completed issue IDs)
21 """
22 from little_loops.config import BRConfig
23 from little_loops.issue_parser import find_issues
25 # Find project root by walking up from issues_dir
26 project_root = issues_dir.resolve().parent
27 if issues_dir.name != ".issues":
28 # If issues_dir is already absolute, try to find config relative to it
29 project_root = issues_dir.parent
31 config = BRConfig(project_root)
32 issues = find_issues(config, only_ids=only_ids)
34 # Build contents map
35 issue_contents: dict[str, str] = {}
36 for info in issues:
37 if info.path.exists():
38 issue_contents[info.issue_id] = info.path.read_text(encoding="utf-8")
40 # Gather completed and deferred issue IDs
41 import re as _re
43 completed_ids: set[str] = set()
44 for non_active_dir in [config.get_completed_dir(), config.get_deferred_dir()]:
45 if non_active_dir.exists():
46 for f in non_active_dir.glob("*.md"):
47 match = _re.search(r"(BUG|FEAT|ENH)-(\d+)", f.name)
48 if match:
49 completed_ids.add(f"{match.group(1)}-{match.group(2)}")
51 return issues, issue_contents, completed_ids
54def main_deps() -> int:
55 """Entry point for ll-deps command.
57 Analyze cross-issue dependencies and validate existing references.
59 Returns:
60 Exit code (0 = success, 1 = failure)
61 """
62 import json as _json
63 import sys
65 from little_loops.dependency_mapper import (
66 analyze_dependencies,
67 fix_dependencies,
68 format_report,
69 format_text_graph,
70 gather_all_issue_ids,
71 validate_dependencies,
72 )
74 parser = argparse.ArgumentParser(
75 prog="ll-deps",
76 description="Cross-issue dependency discovery and validation",
77 formatter_class=argparse.RawDescriptionHelpFormatter,
78 epilog="""
79Examples:
80 %(prog)s analyze # Full analysis with markdown output
81 %(prog)s analyze --format json # JSON output for programmatic use
82 %(prog)s analyze --graph # Include ASCII dependency graph
83 %(prog)s analyze --sprint my-sprint # Analyze only issues in a sprint
84 %(prog)s validate # Validation only (broken refs, cycles)
85 %(prog)s validate --sprint my-sprint # Validate only sprint issue deps
86 %(prog)s fix # Auto-fix broken refs, stale refs, backlinks
87 %(prog)s fix --dry-run # Preview fixes without modifying files
88""",
89 )
91 parser.add_argument(
92 "-d",
93 "--issues-dir",
94 type=Path,
95 default=None,
96 help="Path to issues directory (default: .issues)",
97 )
99 subparsers = parser.add_subparsers(dest="command", help="Available commands")
101 # analyze subcommand
102 analyze_parser = subparsers.add_parser(
103 "analyze",
104 help="Full dependency analysis (file overlaps + validation)",
105 )
106 analyze_parser.add_argument(
107 "-f",
108 "--format",
109 type=str,
110 choices=["text", "json"],
111 default="text",
112 help="Output format (default: text/markdown)",
113 )
114 analyze_parser.add_argument(
115 "--graph",
116 action="store_true",
117 help="Include ASCII dependency graph in output",
118 )
119 analyze_parser.add_argument(
120 "--sprint",
121 type=str,
122 default=None,
123 help="Restrict analysis to issues in the named sprint",
124 )
126 # validate subcommand
127 validate_parser = subparsers.add_parser(
128 "validate",
129 help="Validate existing dependency references only",
130 )
131 validate_parser.add_argument(
132 "--sprint",
133 type=str,
134 default=None,
135 help="Restrict validation to issues in the named sprint",
136 )
138 # fix subcommand
139 fix_parser = subparsers.add_parser(
140 "fix",
141 help="Auto-fix broken refs, stale refs, and missing backlinks",
142 )
143 fix_parser.add_argument(
144 "--dry-run",
145 "-n",
146 action="store_true",
147 help="Show what would be fixed without making changes",
148 )
149 fix_parser.add_argument(
150 "--sprint",
151 type=str,
152 default=None,
153 help="Restrict fixes to issues in the named sprint",
154 )
156 args = parser.parse_args()
158 if not args.command:
159 parser.print_help()
160 return 1
162 issues_dir = args.issues_dir or Path.cwd() / ".issues"
163 if not issues_dir.exists():
164 print(f"Error: Issues directory not found: {issues_dir}", file=sys.stderr)
165 return 1
167 # Sprint-scoped filtering
168 only_ids: set[str] | None = None
169 if getattr(args, "sprint", None):
170 from little_loops.config import BRConfig as _BRConfig
171 from little_loops.sprint import Sprint
173 project_root = issues_dir.resolve().parent
174 if issues_dir.name != ".issues":
175 project_root = issues_dir.parent
176 _config = _BRConfig(project_root)
177 sprints_dir = Path(_config.sprints.sprints_dir)
178 if not sprints_dir.is_absolute():
179 sprints_dir = project_root / sprints_dir
181 sprint = Sprint.load(sprints_dir, args.sprint)
182 if sprint is None:
183 print(f"Error: Sprint not found: {args.sprint}", file=sys.stderr)
184 return 1
185 only_ids = set(sprint.issues)
186 if not only_ids:
187 print(f"Sprint '{args.sprint}' has no issues.")
188 return 0
190 try:
191 issues, issue_contents, completed_ids = _load_issues(issues_dir, only_ids=only_ids)
192 except Exception as e:
193 print(f"Error loading issues: {e}", file=sys.stderr)
194 return 1
196 if not issues:
197 print("No active issues found.")
198 return 0
200 # Gather all issue IDs on disk to avoid false "nonexistent" warnings
201 # when sprint-scoped analysis references issues outside the sprint
202 try:
203 from little_loops.config import BRConfig as _BRConfig
205 _dm_config = _BRConfig(issues_dir.resolve().parent)
206 except Exception:
207 _dm_config = None
208 all_known_ids = gather_all_issue_ids(issues_dir, config=_dm_config)
210 # Load dependency mapping config
211 dep_config = _dm_config.dependency_mapping if _dm_config else None
213 if args.command == "analyze":
214 report = analyze_dependencies(
215 issues, issue_contents, completed_ids, all_known_ids, config=dep_config
216 )
218 if args.format == "json":
219 data = {
220 "issue_count": report.issue_count,
221 "existing_dep_count": report.existing_dep_count,
222 "proposals": [
223 {
224 "source_id": p.source_id,
225 "target_id": p.target_id,
226 "reason": p.reason,
227 "confidence": p.confidence,
228 "rationale": p.rationale,
229 "overlapping_files": p.overlapping_files,
230 "conflict_score": p.conflict_score,
231 }
232 for p in report.proposals
233 ],
234 "parallel_safe": [
235 {
236 "issue_a": ps.issue_a,
237 "issue_b": ps.issue_b,
238 "shared_files": ps.shared_files,
239 "conflict_score": ps.conflict_score,
240 "reason": ps.reason,
241 }
242 for ps in report.parallel_safe
243 ],
244 "validation": {
245 "broken_refs": report.validation.broken_refs,
246 "missing_backlinks": report.validation.missing_backlinks,
247 "cycles": report.validation.cycles,
248 "stale_completed_refs": report.validation.stale_completed_refs,
249 "has_issues": report.validation.has_issues,
250 },
251 }
252 print(_json.dumps(data, indent=2))
253 else:
254 print(format_report(report, config=dep_config))
255 if args.graph:
256 print()
257 print("## Dependency Graph")
258 print()
259 print(format_text_graph(issues, report.proposals))
261 return 0
263 if args.command == "validate":
264 result = validate_dependencies(issues, completed_ids, all_known_ids)
266 if not result.has_issues:
267 print("No validation issues found.")
268 return 0
270 lines: list[str] = []
271 lines.append("# Dependency Validation Report")
272 lines.append("")
274 if result.broken_refs:
275 lines.append("## Broken References")
276 lines.append("")
277 for issue_id, ref_id in result.broken_refs:
278 lines.append(f"- {issue_id}: references nonexistent {ref_id}")
279 lines.append("")
281 if result.missing_backlinks:
282 lines.append("## Missing Backlinks")
283 lines.append("")
284 for issue_id, ref_id in result.missing_backlinks:
285 lines.append(
286 f"- {issue_id} is blocked by {ref_id}, "
287 f"but {ref_id} does not list {issue_id} in Blocks"
288 )
289 lines.append("")
291 if result.cycles:
292 lines.append("## Dependency Cycles")
293 lines.append("")
294 for cycle in result.cycles:
295 lines.append(f"- {' -> '.join(cycle)}")
296 lines.append("")
298 if result.stale_completed_refs:
299 lines.append("## Stale References (to completed issues)")
300 lines.append("")
301 for issue_id, ref_id in result.stale_completed_refs:
302 lines.append(f"- {issue_id}: blocked by {ref_id} (completed)")
303 lines.append("")
305 print("\n".join(lines))
306 return 0
308 if args.command == "fix":
309 fix_result = fix_dependencies(issues, completed_ids, all_known_ids, dry_run=args.dry_run)
311 if not fix_result.changes:
312 print("No fixable issues found.")
313 if fix_result.skipped_cycles:
314 print(f"({fix_result.skipped_cycles} cycle(s) detected — resolve manually)")
315 return 0
317 prefix = "[DRY RUN] " if args.dry_run else ""
318 print(f"# {prefix}Dependency Fix Report")
319 print()
320 for change in fix_result.changes:
321 print(f" {prefix}{change}")
322 print()
323 print(f"{prefix}{len(fix_result.changes)} fix(es) applied.")
325 if fix_result.modified_files:
326 print()
327 print("Modified files:")
328 for fpath in sorted(fix_result.modified_files):
329 print(f" {fpath}")
331 if fix_result.skipped_cycles:
332 print()
333 print(f"({fix_result.skipped_cycles} cycle(s) detected — resolve manually)")
335 return 0
337 return 1