Coverage for src / infra / tools / locking.py: 63%
275 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-04 04:43 +0000
1"""Centralized file locking for multi-agent coordination.
3Consolidates locking behavior from shell scripts.
4"""
6import hashlib
7import logging
8import os
9import sys
10from collections.abc import Callable
11from dataclasses import dataclass
12from pathlib import Path
14from .env import get_lock_dir
16logger = logging.getLogger(__name__)
18__all__ = [
19 "LockManager",
20 "canonicalize_path",
21 "get_all_locks",
22 "get_lock_dir",
23 "lock_path",
24 "parse_lock_file",
25]
28def canonicalize_path(filepath: str, repo_namespace: str | None = None) -> str:
29 """Canonicalize a file path for consistent lock key generation.
31 Public wrapper for _canonicalize_path. See _canonicalize_path for details.
33 Args:
34 filepath: The file path to canonicalize.
35 repo_namespace: Optional repo root path for resolving relative paths.
37 Returns:
38 A canonicalized absolute path string, or the literal key as-is.
39 """
40 return _canonicalize_path(filepath, repo_namespace)
43def _get_lock_dir() -> Path:
44 """Get the lock directory using the accessor from env module.
46 This allows tests to either:
47 1. Patch os.environ["MALA_LOCK_DIR"] before calling
48 2. Patch src.infra.tools.env.get_lock_dir if more control needed
49 """
50 return get_lock_dir()
53def _is_literal_key(filepath: str) -> bool:
54 """Check if a filepath is a literal key (not a real path).
56 Literal keys are special identifiers like __test_mutex__ that should
57 not be normalized as file paths. They are used for global locks that
58 are not tied to specific files.
59 """
60 return filepath.startswith("__") and filepath.endswith("__")
63def _resolve_with_parents(path: Path) -> Path:
64 """Resolve a path by resolving existing parent directories.
66 For non-existent paths, walks up to find the first existing ancestor,
67 resolves its symlinks, then appends the remaining path components.
68 This ensures consistent lock keys for paths through symlinked directories.
70 Args:
71 path: The path to resolve (should be absolute).
73 Returns:
74 The resolved path with parent symlinks resolved.
75 """
76 if path.exists():
77 return path.resolve()
79 # Walk up to find an existing ancestor
80 # Collect the parts that don't exist yet
81 missing_parts: list[str] = []
82 current = path
84 max_iterations = 100
85 iterations = 0
86 while not current.exists() and iterations < max_iterations:
87 iterations += 1
88 missing_parts.append(current.name)
89 parent = current.parent
90 if parent == current:
91 # Reached root without finding existing path
92 break
93 current = parent
95 if iterations >= max_iterations:
96 import logging
98 logging.warning(
99 f"_resolve_with_parents: max iterations reached for path {path}, "
100 "using unresolved path which may cause inconsistent lock keys"
101 )
102 return path
104 # Resolve the existing ancestor (resolves symlinks)
105 resolved_base = current.resolve()
107 # Append the missing parts back
108 for part in reversed(missing_parts):
109 resolved_base = resolved_base / part
111 return resolved_base
114def _canonicalize_path(filepath: str, repo_namespace: str | None = None) -> str:
115 """Canonicalize a file path for consistent lock key generation.
117 Normalizes paths by:
118 - Resolving symlinks (including parent directory symlinks for non-existent paths)
119 - Making paths absolute
120 - Normalizing . and .. segments
122 Literal keys (like __test_mutex__) are returned as-is without normalization.
124 This matches the shell script behavior (realpath -m), which always produces
125 absolute paths. The repo_namespace is used by _lock_key to build the final
126 key as "namespace:absolute_path".
128 Args:
129 filepath: The file path to canonicalize.
130 repo_namespace: Optional repo root path for resolving relative paths.
131 When provided and filepath is relative, the path is resolved
132 relative to the namespace directory (mimicking cwd=repo behavior).
134 Returns:
135 A canonicalized absolute path string, or the literal key as-is.
136 """
137 # Skip normalization for literal keys (non-path identifiers like __test_mutex__)
138 if _is_literal_key(filepath):
139 return filepath
141 path = Path(filepath)
143 # When we have a namespace and a relative path, resolve relative to the namespace
144 # This mimics shell script behavior when cwd is the repo directory
145 if repo_namespace and not path.is_absolute():
146 namespace_path = Path(repo_namespace).resolve()
147 candidate = namespace_path / path
149 if candidate.exists():
150 # Path exists - resolve symlinks
151 return str(candidate.resolve())
152 else:
153 # Normalize and resolve parent symlinks for non-existent paths
154 normalized = Path(os.path.normpath(candidate))
155 return str(_resolve_with_parents(normalized))
157 # Absolute path or no namespace - resolve to absolute
158 if path.exists():
159 return str(path.resolve()) # Resolves symlinks
160 else:
161 if path.is_absolute():
162 resolved = path
163 else:
164 resolved = Path.cwd() / path
165 # Normalize . and .. segments, then resolve parent symlinks
166 normalized = Path(os.path.normpath(resolved))
167 return str(_resolve_with_parents(normalized))
170def _lock_key(filepath: str, repo_namespace: str | None = None) -> str:
171 """Build a canonical key for the lock.
173 Args:
174 filepath: The file path to lock.
175 repo_namespace: Optional repo namespace for cross-repo disambiguation.
177 Returns:
178 The canonical key string.
179 """
180 # Treat empty namespace as None
181 if repo_namespace == "":
182 repo_namespace = None
184 canonical_path = _canonicalize_path(filepath, repo_namespace)
186 if repo_namespace:
187 # Use namespace as-is to match shell script behavior
188 # Shell scripts pass REPO_NAMESPACE directly without normalizing
189 return f"{repo_namespace}:{canonical_path}"
190 return canonical_path
193def lock_path(filepath: str, repo_namespace: str | None = None) -> Path:
194 """Convert a file path to its lock file path.
196 Uses SHA-256 hash of the canonical key to avoid collisions
197 (e.g., 'a/b' vs 'a_b' which would both become 'a_b.lock' with simple replacement).
199 Args:
200 filepath: The file path to lock.
201 repo_namespace: Optional repo namespace for cross-repo disambiguation.
203 Returns:
204 Path to the lock file.
205 """
206 key = _lock_key(filepath, repo_namespace)
207 key_hash = hashlib.sha256(key.encode()).hexdigest()[:16]
208 return _get_lock_dir() / f"{key_hash}.lock"
211def release_all_locks() -> None:
212 """Release all locks in the lock directory."""
213 lock_dir = _get_lock_dir()
214 if lock_dir.exists():
215 for lock in lock_dir.glob("*.lock"):
216 # Also remove companion .meta file
217 lock.with_suffix(".meta").unlink(missing_ok=True)
218 lock.unlink(missing_ok=True)
221def release_run_locks(agent_ids: list[str]) -> int:
222 """Release locks owned by the specified agent IDs.
224 Used by orchestrator shutdown to only clean up locks from this run,
225 leaving locks from other concurrent runs intact.
227 Args:
228 agent_ids: List of agent IDs whose locks should be released.
230 Returns:
231 Number of locks released.
232 """
233 lock_dir = _get_lock_dir()
234 if not lock_dir.exists() or not agent_ids:
235 return 0
237 agent_set = set(agent_ids)
238 released = 0
239 for lock in lock_dir.glob("*.lock"):
240 try:
241 if lock.is_file() and lock.read_text().strip() in agent_set:
242 # Also remove companion .meta file
243 lock.with_suffix(".meta").unlink(missing_ok=True)
244 lock.unlink()
245 released += 1
246 except OSError:
247 pass
249 return released
252def try_lock(filepath: str, agent_id: str, repo_namespace: str | None = None) -> bool:
253 """Try to acquire a lock on a file.
255 Args:
256 filepath: The file path to lock.
257 agent_id: The agent ID to record in the lock.
258 repo_namespace: Optional repo namespace for cross-repo disambiguation.
260 Returns:
261 True if lock was acquired, False if already locked.
262 """
263 lp = lock_path(filepath, repo_namespace)
264 lock_dir = _get_lock_dir()
265 lock_dir.mkdir(parents=True, exist_ok=True)
267 # Fast-path if already locked
268 if lp.exists():
269 # Get holder for contention logging
270 try:
271 holder = lp.read_text().strip()
272 logger.debug(
273 "Lock contention: path=%s holder=%s requester=%s",
274 filepath,
275 holder,
276 agent_id,
277 )
278 except OSError:
279 pass
280 return False
282 # Atomic lock creation using temp file + rename
283 import tempfile
285 try:
286 fd, tmp_path = tempfile.mkstemp(
287 prefix=f".locktmp.{agent_id}.", dir=lock_dir, text=True
288 )
289 # Lock file contains only agent_id (simple, atomic reads)
290 # Filepath is stored in companion .meta file for diagnostics
291 canonical = _canonicalize_path(filepath, repo_namespace)
292 os.write(fd, f"{agent_id}\n".encode())
293 os.close(fd)
295 # Atomic hardlink attempt
296 try:
297 os.link(tmp_path, lp)
298 os.unlink(tmp_path)
299 # Write meta file after successful lock acquisition
300 # If meta write fails, we still own the lock - proceed anyway
301 try:
302 meta_path = lp.with_suffix(".meta")
303 meta_path.write_text(f"{canonical}\n")
304 except OSError:
305 pass # Lock acquired; meta is optional
306 logger.debug("Lock acquired: path=%s agent_id=%s", filepath, agent_id)
307 return True
308 except OSError:
309 os.unlink(tmp_path)
310 return False
311 except OSError:
312 return False
315def wait_for_lock(
316 filepath: str,
317 agent_id: str,
318 repo_namespace: str | None = None,
319 timeout_seconds: float = 30.0,
320 poll_interval_ms: int = 100,
321) -> bool:
322 """Wait for and acquire a lock on a file.
324 Polls until the lock becomes available or timeout is reached.
326 Args:
327 filepath: The file path to lock.
328 agent_id: The agent ID to record in the lock.
329 repo_namespace: Optional repo namespace for cross-repo disambiguation.
330 timeout_seconds: Maximum time to wait for the lock (default 30).
331 poll_interval_ms: Polling interval in milliseconds (default 100).
333 Returns:
334 True if lock was acquired, False if timeout.
335 """
336 import time
338 deadline = time.monotonic() + timeout_seconds
339 poll_interval_sec = poll_interval_ms / 1000.0
341 while True:
342 if try_lock(filepath, agent_id, repo_namespace):
343 return True
345 if time.monotonic() >= deadline:
346 logger.warning(
347 "Lock timeout: path=%s agent_id=%s after=%.1fs",
348 filepath,
349 agent_id,
350 timeout_seconds,
351 )
352 return False
354 time.sleep(poll_interval_sec)
357def is_locked(filepath: str, repo_namespace: str | None = None) -> bool:
358 """Check if a file is currently locked.
360 Args:
361 filepath: The file path to check.
362 repo_namespace: Optional repo namespace for cross-repo disambiguation.
364 Returns:
365 True if the file is locked, False otherwise.
366 """
367 return lock_path(filepath, repo_namespace).exists()
370def release_lock(
371 filepath: str, agent_id: str, repo_namespace: str | None = None
372) -> bool:
373 """Release a lock on a file.
375 Only releases the lock if it is held by the specified agent_id.
376 This prevents accidental or malicious release of locks held by
377 other agents.
379 Args:
380 filepath: Path to the file to unlock.
381 agent_id: Identifier of the agent releasing the lock.
382 repo_namespace: Optional repo namespace for cross-repo disambiguation.
384 Returns:
385 True if lock was released, False if lock was not held by agent_id.
386 """
387 holder = get_lock_holder(filepath, repo_namespace)
388 if holder != agent_id:
389 return False
390 lp = lock_path(filepath, repo_namespace)
391 # Also remove companion .meta file
392 lp.with_suffix(".meta").unlink(missing_ok=True)
393 lp.unlink(missing_ok=True)
394 logger.debug("Lock released: path=%s agent_id=%s", filepath, agent_id)
395 return True
398def get_lock_holder(filepath: str, repo_namespace: str | None = None) -> str | None:
399 """Get the agent ID holding a lock, or None if not locked.
401 Args:
402 filepath: The file path to check.
403 repo_namespace: Optional repo namespace for cross-repo disambiguation.
405 Returns:
406 The agent ID of the lock holder, or None if not locked.
407 """
408 lp = lock_path(filepath, repo_namespace)
409 if lp.exists():
410 try:
411 return lp.read_text().strip()
412 except OSError:
413 return None
414 return None
417def parse_lock_file(lock_file: Path) -> tuple[str, str | None] | None:
418 """Parse a lock file to get agent_id and original filepath.
420 Args:
421 lock_file: Path to the lock file (.lock file).
423 Returns:
424 Tuple of (agent_id, filepath) or None if file cannot be read.
425 filepath may be None for legacy lock files without a .meta file.
426 """
427 try:
428 agent_id = lock_file.read_text().strip()
429 if not agent_id:
430 return None
431 # Read filepath from companion .meta file
432 meta_file = lock_file.with_suffix(".meta")
433 filepath = meta_file.read_text().strip() if meta_file.exists() else None
434 return (agent_id, filepath)
435 except OSError:
436 return None
439def get_all_locks() -> dict[str, list[str]]:
440 """Get all active locks grouped by agent ID.
442 Returns:
443 Dictionary mapping agent_id -> list of locked filepaths.
444 Filepaths may be the hash stem for legacy locks without filepath info.
445 """
446 lock_dir = _get_lock_dir()
447 if not lock_dir.exists():
448 return {}
450 locks_by_agent: dict[str, list[str]] = {}
451 for lock in lock_dir.glob("*.lock"):
452 parsed = parse_lock_file(lock)
453 if parsed:
454 agent_id, filepath = parsed
455 if agent_id not in locks_by_agent:
456 locks_by_agent[agent_id] = []
457 # Use filepath if available, else fall back to hash stem
458 locks_by_agent[agent_id].append(filepath or lock.stem)
460 # Clean up orphaned .meta files (whose .lock was deleted externally)
461 for meta in lock_dir.glob("*.meta"):
462 if not meta.with_suffix(".lock").exists():
463 try:
464 meta.unlink()
465 except OSError:
466 pass
468 return locks_by_agent
471def cleanup_agent_locks(agent_id: str) -> int:
472 """Remove locks held by a specific agent (crash/timeout cleanup).
474 Args:
475 agent_id: The agent ID whose locks should be cleaned up.
477 Returns:
478 Number of locks cleaned up.
479 """
480 if not _get_lock_dir().exists():
481 return 0
483 cleaned = 0
484 for lock in _get_lock_dir().glob("*.lock"):
485 try:
486 if lock.is_file() and lock.read_text().strip() == agent_id:
487 # Also remove companion .meta file
488 lock.with_suffix(".meta").unlink(missing_ok=True)
489 lock.unlink()
490 cleaned += 1
491 except OSError:
492 pass
494 logger.info("Agent locks cleaned: agent_id=%s count=%d", agent_id, cleaned)
495 return cleaned
498class LockManager:
499 """Implementation of LockManagerPort using standalone locking functions.
501 This class provides an object-oriented wrapper around the standalone locking
502 functions, enabling dependency injection into domain modules.
503 """
505 def lock_path(self, filepath: str, repo_namespace: str | None = None) -> Path:
506 """Get the lock file path for a given filepath."""
507 return lock_path(filepath, repo_namespace)
509 def try_lock(
510 self, filepath: str, agent_id: str, repo_namespace: str | None = None
511 ) -> bool:
512 """Try to acquire a lock without blocking."""
513 return try_lock(filepath, agent_id, repo_namespace)
515 def wait_for_lock(
516 self,
517 filepath: str,
518 agent_id: str,
519 repo_namespace: str | None = None,
520 timeout_seconds: float = 30.0,
521 poll_interval_ms: int = 100,
522 ) -> bool:
523 """Wait for and acquire a lock on a file."""
524 return wait_for_lock(
525 filepath, agent_id, repo_namespace, timeout_seconds, poll_interval_ms
526 )
528 def release_lock(
529 self, filepath: str, agent_id: str, repo_namespace: str | None = None
530 ) -> bool:
531 """Release a lock on a file."""
532 return release_lock(filepath, agent_id, repo_namespace)
535# ---------------------------------------------------------------------------
536# CLI Command Dispatch
537# ---------------------------------------------------------------------------
540@dataclass(frozen=True)
541class CliContext:
542 """Parsed CLI context for command dispatch."""
544 command: str
545 lock_dir: str
546 agent_id: str
547 repo_namespace: str | None
548 filepath: str | None
549 timeout: float
550 poll_ms: int
553def _cmd_try(ctx: CliContext) -> int:
554 """Handle 'try' command: attempt to acquire lock."""
555 if try_lock(ctx.filepath, ctx.agent_id, ctx.repo_namespace): # type: ignore[arg-type]
556 return 0
557 return 1
560def _cmd_wait(ctx: CliContext) -> int:
561 """Handle 'wait' command: wait for lock with timeout."""
562 if wait_for_lock(
563 ctx.filepath, # type: ignore[arg-type]
564 ctx.agent_id,
565 ctx.repo_namespace,
566 ctx.timeout,
567 ctx.poll_ms,
568 ):
569 return 0
570 return 1
573def _cmd_check(ctx: CliContext) -> int:
574 """Handle 'check' command: check if we hold the lock."""
575 holder = get_lock_holder(ctx.filepath, ctx.repo_namespace) # type: ignore[arg-type]
576 if holder == ctx.agent_id:
577 return 0
578 return 1
581def _cmd_holder(ctx: CliContext) -> int:
582 """Handle 'holder' command: print lock holder."""
583 holder = get_lock_holder(ctx.filepath, ctx.repo_namespace) # type: ignore[arg-type]
584 if holder:
585 print(holder)
586 return 0
589def _cmd_release(ctx: CliContext) -> int:
590 """Handle 'release' command: release lock if we hold it."""
591 holder = get_lock_holder(ctx.filepath, ctx.repo_namespace) # type: ignore[arg-type]
592 if holder == ctx.agent_id:
593 lp = lock_path(ctx.filepath, ctx.repo_namespace) # type: ignore[arg-type]
594 lp.with_suffix(".meta").unlink(missing_ok=True)
595 lp.unlink(missing_ok=True)
596 return 0
599def _cmd_release_all(ctx: CliContext) -> int:
600 """Handle 'release-all' command: release all locks for agent."""
601 cleanup_agent_locks(ctx.agent_id)
602 return 0
605COMMANDS: dict[str, tuple[Callable[[CliContext], int], bool, bool]] = {
606 # command: (handler, requires_filepath, requires_agent_id)
607 "try": (_cmd_try, True, True),
608 "wait": (_cmd_wait, True, True),
609 "check": (_cmd_check, True, True),
610 "holder": (_cmd_holder, True, False),
611 "release": (_cmd_release, True, True),
612 "release-all": (_cmd_release_all, False, True),
613}
616def _cli_main() -> int:
617 """CLI entry point for shell script delegation.
619 Usage:
620 python -m src.infra.tools.locking try <filepath>
621 python -m src.infra.tools.locking wait <filepath> [timeout_seconds] [poll_interval_ms]
622 python -m src.infra.tools.locking check <filepath>
623 python -m src.infra.tools.locking holder <filepath>
624 python -m src.infra.tools.locking release <filepath>
625 python -m src.infra.tools.locking release-all
627 Environment variables:
628 LOCK_DIR: Directory for lock files (required for most commands)
629 AGENT_ID: Agent identifier (required for most commands)
630 REPO_NAMESPACE: Optional repo namespace for cross-repo disambiguation
632 Exit codes:
633 0: Success (lock acquired, held, released, etc.)
634 1: Failure (lock blocked, timeout, not held, etc.)
635 2: Usage error (missing env vars, invalid arguments)
636 """
637 if len(sys.argv) < 2:
638 print(
639 "Usage: python -m src.infra.tools.locking <command> [args...]",
640 file=sys.stderr,
641 )
642 print(
643 "Commands: try, wait, check, holder, release, release-all", file=sys.stderr
644 )
645 return 2
647 command = sys.argv[1]
649 # Validate command exists
650 if command not in COMMANDS:
651 print(f"Unknown command: {command}", file=sys.stderr)
652 print(
653 "Commands: try, wait, check, holder, release, release-all", file=sys.stderr
654 )
655 return 2
657 handler, requires_filepath, requires_agent_id = COMMANDS[command]
659 # Parse environment
660 lock_dir = os.environ.get("LOCK_DIR")
661 agent_id = os.environ.get("AGENT_ID")
662 repo_namespace = os.environ.get("REPO_NAMESPACE") or None
664 # Validate LOCK_DIR (required for all commands)
665 if not lock_dir:
666 print("Error: LOCK_DIR must be set", file=sys.stderr)
667 return 2
669 # Validate AGENT_ID (required for most commands)
670 if requires_agent_id and not agent_id:
671 print("Error: AGENT_ID must be set", file=sys.stderr)
672 return 2
674 # Parse filepath argument
675 filepath: str | None = None
676 if requires_filepath:
677 if len(sys.argv) < 3:
678 print(
679 f"Usage: python -m src.infra.tools.locking {command} <filepath>",
680 file=sys.stderr,
681 )
682 return 2
683 filepath = sys.argv[2]
684 # Enforce exact arg count for commands without optional arguments
685 if command != "wait" and len(sys.argv) > 3:
686 print(
687 f"Usage: python -m src.infra.tools.locking {command} <filepath>",
688 file=sys.stderr,
689 )
690 return 2
691 elif command == "release-all" and len(sys.argv) != 2:
692 print("Usage: python -m src.infra.tools.locking release-all", file=sys.stderr)
693 return 2
695 # Parse wait-specific arguments
696 timeout = 30.0
697 poll_ms = 100
698 if command == "wait":
699 timeout = float(sys.argv[3]) if len(sys.argv) > 3 else 30.0
700 poll_ms = int(sys.argv[4]) if len(sys.argv) > 4 else 100
702 # Set MALA_LOCK_DIR so our functions use the shell script's LOCK_DIR
703 os.environ["MALA_LOCK_DIR"] = lock_dir
705 # Build context and dispatch
706 ctx = CliContext(
707 command=command,
708 lock_dir=lock_dir,
709 agent_id=agent_id or "",
710 repo_namespace=repo_namespace,
711 filepath=filepath,
712 timeout=timeout,
713 poll_ms=poll_ms,
714 )
715 return handler(ctx)
718if __name__ == "__main__":
719 sys.exit(_cli_main())