Coverage for src / infra / clients / beads_client.py: 15%

324 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-04 04:43 +0000

1"""BeadsClient: Wrapper for bd CLI calls used by MalaOrchestrator. 

2 

3This module provides an async-only client for interacting with the beads issue 

4tracker via the bd CLI. All public methods are async to support non-blocking 

5concurrent execution in the orchestrator. 

6 

7Design note: This client intentionally provides only async implementations. 

8Sync versions were removed to eliminate duplication and ensure consistent 

9behavior across the codebase. 

10""" 

11 

12from __future__ import annotations 

13 

14import asyncio 

15import json 

16import re 

17from typing import TYPE_CHECKING 

18 

19from src.infra.issue_manager import IssueManager 

20from src.infra.tools.command_runner import CommandResult, CommandRunner 

21 

22if TYPE_CHECKING: 

23 from pathlib import Path 

24 from collections.abc import Callable 

25 

26# Default timeout for bd/git subprocess calls (seconds) 

27DEFAULT_COMMAND_TIMEOUT = 30.0 

28 

29 

30class BeadsClient: 

31 """Client for interacting with beads via the bd CLI.""" 

32 

33 def __init__( 

34 self, 

35 repo_path: Path, 

36 log_warning: Callable[[str], None] | None = None, 

37 timeout_seconds: float = DEFAULT_COMMAND_TIMEOUT, 

38 ): 

39 """Initialize BeadsClient. 

40 

41 Args: 

42 repo_path: Path to the repository with beads issues. 

43 log_warning: Optional callback for logging warnings. 

44 timeout_seconds: Timeout for bd/git subprocess calls. 

45 """ 

46 self.repo_path = repo_path 

47 self._log_warning = log_warning or (lambda msg: None) 

48 self.timeout_seconds = timeout_seconds 

49 self._runner = CommandRunner(cwd=repo_path, timeout_seconds=timeout_seconds) 

50 # Cache for parent epic lookups to avoid repeated subprocess calls 

51 self._parent_epic_cache: dict[str, str | None] = {} 

52 # Locks for in-flight lookups to prevent duplicate concurrent calls 

53 self._parent_epic_locks: dict[str, asyncio.Lock] = {} 

54 # Cache for epic blocked status lookups 

55 self._blocked_epic_cache: dict[str, bool] = {} 

56 # Locks for in-flight blocked epic lookups 

57 self._blocked_epic_locks: dict[str, asyncio.Lock] = {} 

58 

59 async def _run_subprocess_async( 

60 self, 

61 cmd: list[str], 

62 timeout: float | None = None, 

63 ) -> CommandResult: 

64 """Run subprocess asynchronously with timeout and proper termination. 

65 

66 Uses CommandRunner for consistent subprocess handling with process-group 

67 termination. When a timeout occurs, the entire process group is terminated 

68 (SIGTERM) and then killed (SIGKILL) if it doesn't exit promptly. 

69 

70 Args: 

71 cmd: Command to run. 

72 timeout: Override timeout (uses self.timeout_seconds if None). 

73 

74 Returns: 

75 CommandResult with execution details. On timeout, returns 

76 returncode=1 and stderr="timeout" for backward compatibility. 

77 """ 

78 effective_timeout = timeout if timeout is not None else self.timeout_seconds 

79 result = await self._runner.run_async(cmd, timeout=effective_timeout) 

80 

81 if result.timed_out: 

82 self._log_warning( 

83 f"Command timed out after {effective_timeout}s: {' '.join(cmd)}" 

84 ) 

85 # Return backward-compatible timeout result 

86 return CommandResult( 

87 command=cmd, 

88 returncode=1, 

89 stdout="", 

90 stderr="timeout", 

91 duration_seconds=result.duration_seconds, 

92 timed_out=True, 

93 ) 

94 

95 if not result.ok and result.stderr: 

96 self._log_warning(f"Command failed: {' '.join(cmd)}: {result.stderr}") 

97 

98 return result 

99 

100 # --- Async methods (non-blocking, use in async context) --- 

101 

102 async def get_epic_children_async(self, epic_id: str) -> set[str]: 

103 """Get IDs of all children of an epic (async version). 

104 

105 Args: 

106 epic_id: The epic ID to get children for. 

107 

108 Returns: 

109 Set of issue IDs that are children of the epic. 

110 """ 

111 result = await self._run_subprocess_async( 

112 ["bd", "dep", "tree", epic_id, "--direction=up", "--json"] 

113 ) 

114 if result.returncode != 0: 

115 self._log_warning(f"bd dep tree failed for {epic_id}: {result.stderr}") 

116 return set() 

117 try: 

118 tree = json.loads(result.stdout) 

119 return {item["id"] for item in tree if item.get("depth", 0) > 0} 

120 except json.JSONDecodeError: 

121 return set() 

122 

123 async def _sort_by_epic_groups( 

124 self, issues: list[dict[str, object]] 

125 ) -> list[dict[str, object]]: 

126 """Sort issues by epic groups for focus mode. 

127 

128 Groups issues by parent epic, then sorts: 

129 1. Groups by (min_priority, max_updated DESC) 

130 2. Within groups by (priority, updated DESC) 

131 

132 Orphan tasks (no parent epic) form a virtual group with the same rules. 

133 

134 Note: This async version fetches parent epics first, then delegates 

135 to IssueManager.sort_by_epic_groups for the actual sorting. 

136 

137 Args: 

138 issues: List of issue dicts to sort. 

139 

140 Returns: 

141 Sorted list of issue dicts. 

142 """ 

143 if not issues: 

144 return issues 

145 

146 # Get parent epics for all issues 

147 issue_ids = [str(i["id"]) for i in issues] 

148 parent_epics = await self.get_parent_epics_async(issue_ids) 

149 

150 # Enrich issues with parent_epic field 

151 enriched = [ 

152 {**issue, "parent_epic": parent_epics.get(str(issue["id"]))} 

153 for issue in issues 

154 ] 

155 

156 # Delegate to IssueManager for pure sorting logic 

157 return IssueManager.sort_by_epic_groups(enriched) 

158 

159 def _negate_timestamp(self, timestamp: object) -> str: 

160 """Negate a timestamp string for descending sort. 

161 

162 Delegates to IssueManager.negate_timestamp for actual logic. 

163 """ 

164 return IssueManager.negate_timestamp(timestamp) 

165 

166 # ─────────────────────────────────────────────────────────────────────────── 

167 # Pipeline steps for _fetch_and_filter_issues 

168 # 

169 # Design: Pipeline has two types of steps: 

170 # - I/O steps (fetch_ready_issues_async, fetch_wip_issues_async, enrich_with_epics_async): 

171 # Perform async/subprocess calls, return raw data 

172 # - Pure transformation steps (in IssueManager): 

173 # Static methods with no I/O, directly testable with fixture data 

174 # 

175 # BeadsClient provides raw I/O methods; IssueManager handles all processing. 

176 # ─────────────────────────────────────────────────────────────────────────── 

177 

178 async def fetch_ready_issues_async(self) -> tuple[list[dict[str, object]], bool]: 

179 """Fetch ready issues from bd CLI (raw I/O, no processing). 

180 

181 Returns: 

182 Tuple of (issues list, success flag). Returns ([], False) on error. 

183 """ 

184 result = await self._run_subprocess_async( 

185 ["bd", "ready", "--json", "-t", "task"] 

186 ) 

187 if result.returncode != 0: 

188 self._log_warning(f"bd ready failed: {result.stderr}") 

189 return [], False 

190 try: 

191 issues = json.loads(result.stdout) 

192 return (list(issues), True) if isinstance(issues, list) else ([], True) 

193 except json.JSONDecodeError: 

194 return [], False 

195 

196 async def fetch_wip_issues_async(self) -> list[dict[str, object]]: 

197 """Fetch in_progress issues from bd CLI (raw I/O, no processing).""" 

198 result = await self._run_subprocess_async( 

199 ["bd", "list", "--status", "in_progress", "--json", "-t", "task"] 

200 ) 

201 if result.returncode != 0: 

202 return [] 

203 try: 

204 wip = json.loads(result.stdout) 

205 if not isinstance(wip, list): 

206 return [] 

207 return list(wip) 

208 except json.JSONDecodeError: 

209 return [] 

210 

211 async def enrich_with_epics_async( 

212 self, issues: list[dict[str, object]] 

213 ) -> list[dict[str, object]]: 

214 """Add parent_epic info (I/O step). 

215 

216 Returns a new list with enriched copies; does not mutate input. 

217 """ 

218 if not issues: 

219 return issues 

220 ids = [str(i["id"]) for i in issues] 

221 epics = await self.get_parent_epics_async(ids) 

222 return [{**i, "parent_epic": epics.get(str(i["id"]))} for i in issues] 

223 

224 # ─────────────────────────────────────────────────────────────────────────── 

225 # Legacy pipeline methods (delegate to public methods or IssueManager) 

226 # ─────────────────────────────────────────────────────────────────────────── 

227 

228 async def _fetch_base_issues(self) -> tuple[list[dict[str, object]], bool]: 

229 """Fetch ready issues from bd CLI (pipeline step 1). 

230 

231 Delegates to fetch_ready_issues_async for actual I/O. 

232 """ 

233 return await self.fetch_ready_issues_async() 

234 

235 @staticmethod 

236 def _merge_wip_issues( 

237 base_issues: list[dict[str, object]], wip_issues: list[dict[str, object]] 

238 ) -> list[dict[str, object]]: 

239 """Merge WIP issues into base list (pipeline step 2, pure function). 

240 

241 Delegates to IssueManager.merge_wip_issues for actual logic. 

242 """ 

243 return IssueManager.merge_wip_issues(base_issues, wip_issues) 

244 

245 @staticmethod 

246 def _apply_filters( 

247 issues: list[dict[str, object]], 

248 exclude_ids: set[str], 

249 epic_children: set[str] | None, 

250 only_ids: set[str] | None, 

251 ) -> list[dict[str, object]]: 

252 """Apply only_ids and epic filters (pipeline step 3, pure function). 

253 

254 Delegates to IssueManager.apply_filters for actual logic. 

255 """ 

256 return IssueManager.apply_filters(issues, exclude_ids, epic_children, only_ids) 

257 

258 async def _enrich_with_epics( 

259 self, issues: list[dict[str, object]] 

260 ) -> list[dict[str, object]]: 

261 """Add parent_epic info and filter blocked epics (pipeline step 4). 

262 

263 Delegates I/O to enrich_with_epics_async and filtering to IssueManager. 

264 """ 

265 enriched = await self.enrich_with_epics_async(issues) 

266 if not enriched: 

267 return enriched 

268 epic_ids = {str(i["parent_epic"]) for i in enriched if i.get("parent_epic")} 

269 blocked = await self._get_blocked_epics_async(epic_ids) 

270 return IssueManager.filter_blocked_epics(enriched, blocked) 

271 

272 def _sort_issues( 

273 self, issues: list[dict[str, object]], focus: bool, prioritize_wip: bool 

274 ) -> list[dict[str, object]]: 

275 """Sort issues by focus mode vs priority (pipeline step 5, pure function). 

276 

277 Delegates to IssueManager.sort_issues for actual logic. 

278 """ 

279 return IssueManager.sort_issues(issues, focus, prioritize_wip) 

280 

281 def _sort_by_epic_groups_sync( 

282 self, issues: list[dict[str, object]] 

283 ) -> list[dict[str, object]]: 

284 """Sort issues by epic groups for focus mode. 

285 

286 Delegates to IssueManager.sort_by_epic_groups for actual logic. 

287 """ 

288 return IssueManager.sort_by_epic_groups(issues) 

289 

290 async def _fetch_wip_issues(self) -> list[dict[str, object]]: 

291 """Fetch in_progress issues from bd CLI.""" 

292 return await self.fetch_wip_issues_async() 

293 

294 def _warn_missing_ids( 

295 self, 

296 only_ids: set[str] | None, 

297 issues: list[dict[str, object]], 

298 suppress_ids: set[str], 

299 ) -> None: 

300 """Log warning for specified IDs not found in issues. 

301 

302 Delegates to IssueManager.find_missing_ids for actual logic. 

303 """ 

304 bad = IssueManager.find_missing_ids(only_ids, issues, suppress_ids) 

305 if bad: 

306 self._log_warning(f"Specified IDs not ready: {', '.join(sorted(bad))}") 

307 

308 async def _resolve_epic_children(self, epic_id: str | None) -> set[str] | None: 

309 """Resolve epic children, logging warning if epic has none. Returns None to abort.""" 

310 if not epic_id: 

311 return None # Sentinel: no epic filter 

312 children = await self.get_epic_children_async(epic_id) 

313 if not children: 

314 self._log_warning(f"No children found for epic {epic_id}") 

315 return children # Empty set signals abort, non-empty signals filter 

316 

317 async def _fetch_and_filter_issues( 

318 self, 

319 exclude_ids: set[str] | None = None, 

320 epic_id: str | None = None, 

321 only_ids: set[str] | None = None, 

322 suppress_warn_ids: set[str] | None = None, 

323 prioritize_wip: bool = False, 

324 focus: bool = True, 

325 orphans_only: bool = False, 

326 ) -> list[dict[str, object]]: 

327 """Fetch, filter, enrich, and sort ready issues.""" 

328 exclude_ids = exclude_ids or set() 

329 epic_children = await self._resolve_epic_children(epic_id) 

330 if epic_id and not epic_children: 

331 return [] 

332 issues, ok = await self._fetch_base_issues() 

333 if not ok and not prioritize_wip: 

334 # WIP fallback: when prioritize_wip=True, continue even if bd ready fails 

335 # so we can still return in-progress issues (intentional design) 

336 return [] 

337 if prioritize_wip: 

338 wip = await self._fetch_wip_issues() 

339 wip = IssueManager.filter_blocked_wip(wip) 

340 issues = self._merge_wip_issues(issues, wip) 

341 self._warn_missing_ids(only_ids, issues, suppress_warn_ids or set()) 

342 filtered = self._apply_filters(issues, exclude_ids, epic_children, only_ids) 

343 enriched = await self._enrich_with_epics(filtered) 

344 # Apply orphans_only filter after enrichment (needs parent_epic info) 

345 if orphans_only: 

346 enriched = IssueManager.filter_orphans_only(enriched) 

347 return self._sort_issues(enriched, focus, prioritize_wip) 

348 

349 async def get_ready_async( 

350 self, 

351 exclude_ids: set[str] | None = None, 

352 epic_id: str | None = None, 

353 only_ids: set[str] | None = None, 

354 suppress_warn_ids: set[str] | None = None, 

355 prioritize_wip: bool = False, 

356 focus: bool = True, 

357 orphans_only: bool = False, 

358 ) -> list[str]: 

359 """Get list of ready issue IDs via bd CLI, sorted by priority (async version). 

360 

361 Args: 

362 exclude_ids: Set of issue IDs to exclude from results. 

363 epic_id: Optional epic ID to filter by - only return children of this epic. 

364 only_ids: Optional set of issue IDs to include exclusively. 

365 suppress_warn_ids: Optional set of issue IDs to suppress from warnings. 

366 prioritize_wip: If True, sort in_progress issues before open issues. 

367 focus: If True, group tasks by parent epic and complete one epic at a time. 

368 When focus=True, groups are sorted by (min_priority, max_updated DESC) 

369 and within groups by (priority, updated DESC). Orphan tasks form a 

370 virtual group with the same sorting rules. 

371 orphans_only: If True, only return issues with no parent epic. 

372 

373 Returns: 

374 List of issue IDs sorted by priority (lower = higher priority). 

375 When prioritize_wip is True, in_progress issues come first. 

376 When focus is True, tasks are grouped by epic. 

377 """ 

378 filtered = await self._fetch_and_filter_issues( 

379 exclude_ids=exclude_ids, 

380 epic_id=epic_id, 

381 only_ids=only_ids, 

382 suppress_warn_ids=suppress_warn_ids, 

383 prioritize_wip=prioritize_wip, 

384 focus=focus, 

385 orphans_only=orphans_only, 

386 ) 

387 return [str(i["id"]) for i in filtered] 

388 

389 async def get_ready_issues_async( 

390 self, 

391 exclude_ids: set[str] | None = None, 

392 epic_id: str | None = None, 

393 only_ids: set[str] | None = None, 

394 suppress_warn_ids: set[str] | None = None, 

395 prioritize_wip: bool = False, 

396 focus: bool = True, 

397 orphans_only: bool = False, 

398 ) -> list[dict[str, object]]: 

399 """Get list of ready issues with full metadata, sorted by priority (async version). 

400 

401 Similar to get_ready_async but returns full issue dicts with parent epic info. 

402 Used for dry-run preview to display task details before processing. 

403 

404 Args: 

405 exclude_ids: Set of issue IDs to exclude from results. 

406 epic_id: Optional epic ID to filter by - only return children of this epic. 

407 only_ids: Optional set of issue IDs to include exclusively. 

408 suppress_warn_ids: Optional set of issue IDs to suppress from warnings. 

409 prioritize_wip: If True, sort in_progress issues before open issues. 

410 focus: If True, group tasks by parent epic and complete one epic at a time. 

411 orphans_only: If True, only return issues with no parent epic. 

412 

413 Returns: 

414 List of issue dicts with id, title, priority, status, and parent_epic fields. 

415 Sorted by priority (lower = higher priority) with optional epic grouping. 

416 """ 

417 return await self._fetch_and_filter_issues( 

418 exclude_ids=exclude_ids, 

419 epic_id=epic_id, 

420 only_ids=only_ids, 

421 suppress_warn_ids=suppress_warn_ids, 

422 prioritize_wip=prioritize_wip, 

423 focus=focus, 

424 orphans_only=orphans_only, 

425 ) 

426 

427 async def claim_async(self, issue_id: str) -> bool: 

428 """Claim an issue by setting status to in_progress (async version). 

429 

430 Args: 

431 issue_id: The issue ID to claim. 

432 

433 Returns: 

434 True if successfully claimed, False otherwise. 

435 """ 

436 result = await self._run_subprocess_async( 

437 ["bd", "update", issue_id, "--status", "in_progress"] 

438 ) 

439 return result.returncode == 0 

440 

441 async def reset_async( 

442 self, issue_id: str, log_path: Path | None = None, error: str = "" 

443 ) -> None: 

444 """Reset failed issue to ready status with failure context (async version). 

445 

446 Args: 

447 issue_id: The issue ID to reset. 

448 log_path: Optional path to the JSONL log file from the failed attempt. 

449 error: Optional error summary describing the failure. 

450 """ 

451 args = ["bd", "update", issue_id, "--status", "ready"] 

452 if log_path or error: 

453 notes_parts = [] 

454 if error: 

455 notes_parts.append(f"Failed: {error}") 

456 if log_path: 

457 notes_parts.append(f"Log: {log_path}") 

458 args.extend(["--notes", "\n".join(notes_parts)]) 

459 await self._run_subprocess_async(args) 

460 

461 async def get_issue_status_async(self, issue_id: str) -> str | None: 

462 """Get the current status of an issue (async version). 

463 

464 Args: 

465 issue_id: The issue ID to check. 

466 

467 Returns: 

468 The issue status string, or None if not found. 

469 """ 

470 result = await self._run_subprocess_async(["bd", "show", issue_id, "--json"]) 

471 if result.returncode != 0: 

472 return None 

473 try: 

474 issue_data = json.loads(result.stdout) 

475 if isinstance(issue_data, list) and issue_data: 

476 issue_data = issue_data[0] 

477 if isinstance(issue_data, dict): 

478 return issue_data.get("status") 

479 except json.JSONDecodeError: 

480 pass 

481 return None 

482 

483 async def get_issue_description_async(self, issue_id: str) -> str | None: 

484 """Get the description of an issue (async version). 

485 

486 Args: 

487 issue_id: The issue ID to get description for. 

488 

489 Returns: 

490 The issue description string, or None if not found. 

491 """ 

492 result = await self._run_subprocess_async(["bd", "show", issue_id, "--json"]) 

493 if result.returncode != 0: 

494 return None 

495 try: 

496 issue_data = json.loads(result.stdout) 

497 if isinstance(issue_data, list) and issue_data: 

498 issue_data = issue_data[0] 

499 if isinstance(issue_data, dict): 

500 # Build a comprehensive description including title and scope 

501 parts = [] 

502 if title := issue_data.get("title"): 

503 parts.append(f"Title: {title}") 

504 if desc := issue_data.get("description"): 

505 parts.append(f"\n{desc}") 

506 if acceptance := issue_data.get("acceptance"): 

507 parts.append(f"\nAcceptance Criteria:\n{acceptance}") 

508 return "\n".join(parts) if parts else None 

509 except json.JSONDecodeError: 

510 pass 

511 return None 

512 

513 async def commit_issues_async(self) -> bool: 

514 """Export and commit .beads/issues.jsonl if it has changes (async version). 

515 

516 Uses `bd sync --no-pull --no-push` to ensure the JSONL is exported from 

517 SQLite before committing. This handles cases where epic closures are 

518 persisted in SQLite but not yet exported to JSONL (e.g., when the bd 

519 daemon is slow or not running). 

520 

521 Returns: 

522 True if sync succeeded (or no changes to commit), False otherwise. 

523 """ 

524 result = await self._run_subprocess_async( 

525 [ 

526 "bd", 

527 "sync", 

528 "--no-pull", 

529 "--no-push", 

530 "-m", 

531 "beads: close completed issues", 

532 ] 

533 ) 

534 return result.returncode == 0 

535 

536 async def close_eligible_epics_async(self) -> bool: 

537 """Auto-close epics where all children are complete (async version). 

538 

539 Returns: 

540 True if any epics were closed, False otherwise. 

541 """ 

542 result = await self._run_subprocess_async(["bd", "epic", "close-eligible"]) 

543 return result.returncode == 0 and bool(result.stdout.strip()) 

544 

545 async def mark_needs_followup_async( 

546 self, issue_id: str, reason: str, log_path: Path | None = None 

547 ) -> bool: 

548 """Mark an issue as needing follow-up (async version). 

549 

550 Args: 

551 issue_id: The issue ID to mark. 

552 reason: Description of why the quality gate failed. 

553 log_path: Optional path to the JSONL log file from the attempt. 

554 

555 Returns: 

556 True if successfully marked, False otherwise. 

557 """ 

558 notes = f"Quality gate failed: {reason}" 

559 if log_path: 

560 notes += f"\nLog: {log_path}" 

561 result = await self._run_subprocess_async( 

562 [ 

563 "bd", 

564 "update", 

565 issue_id, 

566 "--add-label", 

567 "needs-followup", 

568 "--notes", 

569 notes, 

570 ] 

571 ) 

572 return result.returncode == 0 

573 

574 async def close_async(self, issue_id: str) -> bool: 

575 """Close an issue by setting status to closed (async version). 

576 

577 Args: 

578 issue_id: The issue ID to close. 

579 

580 Returns: 

581 True if successfully closed, False otherwise. 

582 """ 

583 result = await self._run_subprocess_async(["bd", "close", issue_id]) 

584 return result.returncode == 0 

585 

586 async def add_dependency_async(self, issue_id: str, depends_on_id: str) -> bool: 

587 """Add a dependency between two issues. 

588 

589 Creates a "blocks" relationship where depends_on_id blocks issue_id. 

590 Uses `bd dep add <issue_id> <depends_on_id>`. 

591 

592 Args: 

593 issue_id: The issue that depends on another. 

594 depends_on_id: The issue that blocks issue_id. 

595 

596 Returns: 

597 True if dependency added successfully, False otherwise. 

598 """ 

599 result = await self._run_subprocess_async( 

600 ["bd", "dep", "add", issue_id, depends_on_id] 

601 ) 

602 return result.returncode == 0 

603 

604 async def create_issue_async( 

605 self, 

606 title: str, 

607 description: str, 

608 priority: str, 

609 tags: list[str] | None = None, 

610 parent_id: str | None = None, 

611 ) -> str | None: 

612 """Create a new issue via bd CLI (async version). 

613 

614 Args: 

615 title: Issue title. 

616 description: Issue description (supports markdown). 

617 priority: Priority string (P1, P2, P3, etc.). 

618 tags: Optional list of tags to apply. 

619 parent_id: Optional parent epic ID to attach this issue to. 

620 

621 Returns: 

622 Created issue ID, or None on failure. 

623 """ 

624 cmd = [ 

625 "bd", 

626 "create", 

627 "--title", 

628 title, 

629 "--description", 

630 description, 

631 "--priority", 

632 priority, 

633 "--silent", 

634 ] 

635 if parent_id: 

636 cmd.extend(["--parent", parent_id]) 

637 if tags: 

638 cmd.extend(["--labels", ",".join(tags)]) 

639 

640 result = await self._run_subprocess_async(cmd) 

641 if result.returncode != 0: 

642 self._log_warning(f"bd create failed: {result.stderr}") 

643 return None 

644 

645 # Parse issue ID from output (typically "Created issue: <id>" or silent id) 

646 match = re.search(r"Created issue:\s*(\S+)", result.stdout) 

647 if match: 

648 return match.group(1) 

649 

650 # Try parsing as JSON if the CLI returns JSON 

651 try: 

652 data = json.loads(result.stdout) 

653 if isinstance(data, dict): 

654 issue_id = data.get("id") 

655 if issue_id: 

656 return str(issue_id) 

657 except json.JSONDecodeError: 

658 pass 

659 

660 # Fallback: try using stripped output as bare ID 

661 issue_id = result.stdout.strip() 

662 return issue_id if issue_id else None 

663 

664 async def find_issue_by_tag_async(self, tag: str) -> str | None: 

665 """Find an existing issue with the given tag. 

666 

667 Args: 

668 tag: The tag to search for. 

669 

670 Returns: 

671 Issue ID if found, None otherwise. 

672 """ 

673 result = await self._run_subprocess_async( 

674 ["bd", "list", "--label", tag, "--json"] 

675 ) 

676 if result.returncode != 0: 

677 return None 

678 try: 

679 issues = json.loads(result.stdout) 

680 if isinstance(issues, list) and issues: 

681 # Return first matching issue (should be only one due to dedup) 

682 return str(issues[0].get("id", "")) 

683 return None 

684 except json.JSONDecodeError: 

685 return None 

686 

687 async def update_issue_description_async( 

688 self, issue_id: str, description: str 

689 ) -> bool: 

690 """Update an issue's description. 

691 

692 Args: 

693 issue_id: The issue ID to update. 

694 description: New description content (replaces existing). 

695 

696 Returns: 

697 True if successfully updated, False otherwise. 

698 """ 

699 result = await self._run_subprocess_async( 

700 ["bd", "update", issue_id, "--description", description] 

701 ) 

702 return result.returncode == 0 

703 

704 async def update_issue_async( 

705 self, 

706 issue_id: str, 

707 *, 

708 title: str | None = None, 

709 priority: str | None = None, 

710 ) -> bool: 

711 """Update an issue's title and/or priority. 

712 

713 Args: 

714 issue_id: The issue ID to update. 

715 title: New title (optional). 

716 priority: New priority string like "P2" (optional). 

717 

718 Returns: 

719 True if successfully updated, False otherwise. 

720 """ 

721 if title is None and priority is None: 

722 return True # Nothing to update 

723 

724 cmd = ["bd", "update", issue_id] 

725 if title is not None: 

726 cmd.extend(["--title", title]) 

727 if priority is not None: 

728 cmd.extend(["--priority", priority]) 

729 

730 result = await self._run_subprocess_async(cmd) 

731 return result.returncode == 0 

732 

733 async def get_parent_epic_async(self, issue_id: str) -> str | None: 

734 """Get the parent epic ID for an issue. 

735 

736 Uses `bd dep tree <id> --direction=down` to get the ancestor chain. 

737 The parent epic is the first ancestor with issue_type == "epic". 

738 Results are cached to avoid repeated subprocess calls. 

739 

740 Args: 

741 issue_id: The issue ID to find the parent epic for. 

742 

743 Returns: 

744 The parent epic ID, or None if no parent epic exists (orphan). 

745 """ 

746 # Check cache first (no lock needed for read) 

747 if issue_id in self._parent_epic_cache: 

748 return self._parent_epic_cache[issue_id] 

749 

750 # Get or create lock for this issue to prevent duplicate concurrent calls 

751 if issue_id not in self._parent_epic_locks: 

752 self._parent_epic_locks[issue_id] = asyncio.Lock() 

753 lock = self._parent_epic_locks[issue_id] 

754 

755 async with lock: 

756 # Check cache again after acquiring lock (another coroutine may have populated it) 

757 if issue_id in self._parent_epic_cache: 

758 return self._parent_epic_cache[issue_id] 

759 

760 result = await self._run_subprocess_async( 

761 ["bd", "dep", "tree", issue_id, "--direction=down", "--json"] 

762 ) 

763 if result.returncode != 0: 

764 self._log_warning(f"bd dep tree failed for {issue_id}: {result.stderr}") 

765 self._parent_epic_cache[issue_id] = None 

766 return None 

767 try: 

768 tree = json.loads(result.stdout) 

769 # Find the first ancestor (depth > 0) with issue_type == "epic" 

770 parent_epic: str | None = None 

771 for item in tree: 

772 if item.get("depth", 0) > 0 and item.get("issue_type") == "epic": 

773 parent_epic = item["id"] 

774 break 

775 

776 # Cache for this issue only 

777 # Note: We don't cache all children of the epic here because 

778 # get_epic_children_async returns ALL descendants (not just direct 

779 # children), which would incorrectly cache nested epic children 

780 # as belonging to the top-level epic. 

781 self._parent_epic_cache[issue_id] = parent_epic 

782 

783 return parent_epic 

784 except json.JSONDecodeError: 

785 self._parent_epic_cache[issue_id] = None 

786 return None 

787 

788 async def get_parent_epics_async( 

789 self, issue_ids: list[str] 

790 ) -> dict[str, str | None]: 

791 """Get parent epic IDs for multiple issues efficiently. 

792 

793 Processes unique issues concurrently with caching: 

794 - Duplicate issue IDs in the input are deduped before processing 

795 - Previously looked-up issues return immediately from cache 

796 - Concurrent lookups for the same issue are deduplicated via locks 

797 

798 Args: 

799 issue_ids: List of issue IDs to find parent epics for. 

800 

801 Returns: 

802 Dict mapping each issue ID to its parent epic ID (or None for orphans). 

803 """ 

804 # Dedupe issue_ids to minimize subprocess calls 

805 unique_ids = list(dict.fromkeys(issue_ids)) 

806 

807 # Process unique issues concurrently (locks prevent duplicate subprocess calls) 

808 tasks = [self.get_parent_epic_async(issue_id) for issue_id in unique_ids] 

809 parent_epics = await asyncio.gather(*tasks) 

810 

811 # Build result mapping from unique lookups 

812 unique_results = dict(zip(unique_ids, parent_epics, strict=True)) 

813 

814 # Return mapping for all input issue_ids (including duplicates) 

815 return {issue_id: unique_results[issue_id] for issue_id in issue_ids} 

816 

817 async def get_epic_blockers_async(self, epic_id: str) -> set[str]: 

818 """Get the set of issue IDs that are blocking an epic. 

819 

820 Retrieves the blocked_by field from the epic and returns it as a set. 

821 These are typically remediation issues created by epic verification 

822 that must be resolved before the epic can be closed. 

823 

824 Args: 

825 epic_id: The epic ID to get blockers for. 

826 

827 Returns: 

828 Set of issue IDs that are blocking the epic. 

829 """ 

830 result = await self._run_subprocess_async(["bd", "show", epic_id, "--json"]) 

831 if result.returncode != 0: 

832 return set() 

833 

834 try: 

835 issue_data = json.loads(result.stdout) 

836 if isinstance(issue_data, list) and issue_data: 

837 issue_data = issue_data[0] 

838 if isinstance(issue_data, dict): 

839 blocked_by = issue_data.get("blocked_by") 

840 if blocked_by is None: 

841 return set() 

842 # blocked_by can be a string or a list 

843 if isinstance(blocked_by, str): 

844 return {blocked_by} if blocked_by else set() 

845 if isinstance(blocked_by, list): 

846 return {str(b) for b in blocked_by if b} 

847 except json.JSONDecodeError: 

848 pass 

849 

850 return set() 

851 

852 async def _is_epic_blocked_async(self, epic_id: str) -> bool: 

853 """Check if an epic is blocked (has blocked_by or status=blocked). 

854 

855 An epic is considered blocked if: 

856 - It has status "blocked", OR 

857 - It has a non-empty blocked_by field (unmet dependencies) 

858 

859 Results are cached to avoid repeated subprocess calls. 

860 

861 Args: 

862 epic_id: The epic ID to check. 

863 

864 Returns: 

865 True if the epic is blocked, False otherwise. 

866 """ 

867 # Check cache first 

868 if epic_id in self._blocked_epic_cache: 

869 return self._blocked_epic_cache[epic_id] 

870 

871 # Get or create lock for this epic 

872 if epic_id not in self._blocked_epic_locks: 

873 self._blocked_epic_locks[epic_id] = asyncio.Lock() 

874 lock = self._blocked_epic_locks[epic_id] 

875 

876 async with lock: 

877 # Check cache again after acquiring lock 

878 if epic_id in self._blocked_epic_cache: 

879 return self._blocked_epic_cache[epic_id] 

880 

881 result = await self._run_subprocess_async(["bd", "show", epic_id, "--json"]) 

882 if result.returncode != 0: 

883 # If we can't get epic info, assume not blocked to avoid hiding tasks 

884 self._blocked_epic_cache[epic_id] = False 

885 return False 

886 

887 try: 

888 issue_data = json.loads(result.stdout) 

889 if isinstance(issue_data, list) and issue_data: 

890 issue_data = issue_data[0] 

891 if isinstance(issue_data, dict): 

892 status = issue_data.get("status") 

893 blocked_by = issue_data.get("blocked_by") 

894 is_blocked = status == "blocked" or bool(blocked_by) 

895 self._blocked_epic_cache[epic_id] = is_blocked 

896 return is_blocked 

897 except json.JSONDecodeError: 

898 pass 

899 

900 self._blocked_epic_cache[epic_id] = False 

901 return False 

902 

903 async def _get_blocked_epics_async(self, epic_ids: set[str]) -> set[str]: 

904 """Get the set of epics that are blocked. 

905 

906 Args: 

907 epic_ids: Set of epic IDs to check. 

908 

909 Returns: 

910 Set of epic IDs that are blocked. 

911 """ 

912 if not epic_ids: 

913 return set() 

914 

915 # Convert to list once to ensure consistent iteration order 

916 epic_ids_list = list(epic_ids) 

917 tasks = [self._is_epic_blocked_async(epic_id) for epic_id in epic_ids_list] 

918 results = await asyncio.gather(*tasks) 

919 return { 

920 epic_id 

921 for epic_id, is_blocked in zip(epic_ids_list, results, strict=True) 

922 if is_blocked 

923 }