Coverage for little_loops / fsm / concurrency.py: 27%
122 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-18 16:18 -0500
1"""Scope-based concurrency control for FSM loops.
3Prevents concurrent loops from conflicting when operating on
4the same files or directories through file-based locking.
6Public exports:
7 ScopeLock: Dataclass representing a scope lock
8 LockManager: Manager for acquiring/releasing scope locks
9"""
11from __future__ import annotations
13import errno
14import fcntl
15import json
16import os
17import time
18from dataclasses import dataclass
19from datetime import UTC, datetime
20from pathlib import Path
21from typing import Any
23RUNNING_DIR = ".running"
26def _process_alive(pid: int) -> bool:
27 """Check if a process is still running.
29 Returns True if alive (or alive but unreadable due to EPERM),
30 False only if process does not exist (ESRCH).
31 """
32 try:
33 os.kill(pid, 0)
34 return True
35 except OSError as e:
36 if e.errno == errno.ESRCH:
37 return False # No such process
38 return True # EPERM or other: process exists, no permission
41def _iso_now() -> str:
42 """Return current time as ISO8601 string."""
43 return datetime.now(UTC).isoformat()
46@dataclass
47class ScopeLock:
48 """Represents a lock on a set of paths for a running loop.
50 Attributes:
51 loop_name: Name of the loop holding the lock
52 scope: List of paths this loop operates on
53 pid: Process ID of the lock holder
54 started_at: ISO timestamp when lock was acquired
55 """
57 loop_name: str
58 scope: list[str]
59 pid: int
60 started_at: str
62 def to_dict(self) -> dict[str, Any]:
63 """Convert to dictionary for JSON serialization."""
64 return {
65 "loop_name": self.loop_name,
66 "scope": self.scope,
67 "pid": self.pid,
68 "started_at": self.started_at,
69 }
71 @classmethod
72 def from_dict(cls, data: dict[str, Any]) -> ScopeLock:
73 """Create from dictionary (JSON deserialization)."""
74 return cls(
75 loop_name=str(data["loop_name"]),
76 scope=list(data["scope"]) if isinstance(data["scope"], list) else [str(data["scope"])],
77 pid=int(data["pid"]),
78 started_at=str(data["started_at"]),
79 )
82class LockManager:
83 """Manage scope-based locks for concurrent loop execution.
85 Lock files are stored in .loops/.running/<name>.lock
86 and contain JSON with ScopeLock data.
87 """
89 def __init__(self, loops_dir: Path | None = None) -> None:
90 """Initialize the lock manager.
92 Args:
93 loops_dir: Base directory for loops (default: .loops)
94 """
95 self.loops_dir = loops_dir or Path(".loops")
96 self.running_dir = self.loops_dir / RUNNING_DIR
98 def acquire(self, loop_name: str, scope: list[str]) -> bool:
99 """Attempt to acquire lock for the given scope.
101 Args:
102 loop_name: Name of the loop to acquire lock for
103 scope: List of paths the loop operates on
105 Returns:
106 True if lock acquired, False if conflict exists
107 """
108 # Normalize scope - empty means whole project
109 if not scope:
110 scope = ["."]
111 scope = [self._normalize_path(p) for p in scope]
113 # Ensure running directory exists before opening sentinel lock
114 self.running_dir.mkdir(parents=True, exist_ok=True)
116 # Serialize the check-and-create sequence across processes using a
117 # sentinel file. This eliminates the TOCTOU window between
118 # find_conflict() (read) and lock-file creation (write).
119 # .acquire.lock is a dotfile so Path.glob("*.lock") will not match it
120 # and stale-lock cleanup in find_conflict/list_locks ignores it.
121 dir_lock_path = self.running_dir / ".acquire.lock"
122 with open(dir_lock_path, "w") as dir_lock:
123 fcntl.flock(dir_lock, fcntl.LOCK_EX)
125 # Check for conflicts (now atomic with write below)
126 conflict = self.find_conflict(scope)
127 if conflict:
128 return False
130 # Create lock file
131 lock_file = self.running_dir / f"{loop_name}.lock"
132 lock = ScopeLock(
133 loop_name=loop_name,
134 scope=scope,
135 pid=os.getpid(),
136 started_at=_iso_now(),
137 )
138 with open(lock_file, "w") as f:
139 json.dump(lock.to_dict(), f)
141 return True
143 def release(self, loop_name: str) -> None:
144 """Release lock for a loop.
146 Args:
147 loop_name: Name of the loop to release lock for
148 """
149 lock_file = self.running_dir / f"{loop_name}.lock"
150 lock_file.unlink(missing_ok=True)
152 def find_conflict(self, scope: list[str]) -> ScopeLock | None:
153 """Find any running loop with overlapping scope.
155 Also cleans up stale locks from dead processes.
157 Args:
158 scope: Scope to check for conflicts
160 Returns:
161 ScopeLock of conflicting loop, or None if no conflict
162 """
163 if not self.running_dir.exists():
164 return None
166 # Normalize once before the comparison loop to avoid O(n*m) stat calls
167 normalized_scope = [self._normalize_path(p) for p in scope]
169 for lock_file in self.running_dir.glob("*.lock"):
170 try:
171 with open(lock_file) as f:
172 data = json.load(f)
173 lock = ScopeLock.from_dict(data)
175 # Check if process is still alive
176 if not self._process_alive(lock.pid):
177 # Stale lock, remove it
178 lock_file.unlink(missing_ok=True)
179 continue
181 # Normalize lock scope (lock files from acquire() are already
182 # absolute, but normalize defensively in case of legacy files)
183 lock_scope = [self._normalize_path(p) for p in lock.scope]
184 if self._scopes_overlap(normalized_scope, lock_scope):
185 return lock
187 except (json.JSONDecodeError, KeyError, FileNotFoundError):
188 # Malformed or deleted lock file, skip
189 continue
191 return None
193 def list_locks(self) -> list[ScopeLock]:
194 """List all active locks.
196 Cleans up stale locks as a side effect.
198 Returns:
199 List of active ScopeLock objects
200 """
201 locks: list[ScopeLock] = []
202 if not self.running_dir.exists():
203 return locks
205 for lock_file in self.running_dir.glob("*.lock"):
206 try:
207 with open(lock_file) as f:
208 data = json.load(f)
209 lock = ScopeLock.from_dict(data)
211 if self._process_alive(lock.pid):
212 locks.append(lock)
213 else:
214 # Stale lock, remove it
215 lock_file.unlink(missing_ok=True)
216 except (json.JSONDecodeError, KeyError, FileNotFoundError):
217 continue
219 return locks
221 def wait_for_scope(self, scope: list[str], timeout: int = 300) -> bool:
222 """Wait until scope is available.
224 Args:
225 scope: Scope to wait for
226 timeout: Maximum time to wait in seconds
228 Returns:
229 True if scope became available, False if timeout
230 """
231 start = time.time()
232 while time.time() - start < timeout:
233 conflict = self.find_conflict(scope)
234 if conflict is None:
235 return True
236 time.sleep(1)
238 return False
240 def _scopes_overlap(self, scope1: list[str], scope2: list[str]) -> bool:
241 """Check if two scopes have any overlapping paths."""
242 for p1 in scope1:
243 for p2 in scope2:
244 if self._paths_overlap(p1, p2):
245 return True
246 return False
248 def _paths_overlap(self, path1: str, path2: str) -> bool:
249 """Check if two paths overlap (same, or one contains the other).
251 Assumes paths are already normalized (pre-resolved absolute strings).
252 """
253 p1 = Path(path1)
254 p2 = Path(path2)
256 # Same path
257 if p1 == p2:
258 return True
260 # One is parent of other
261 try:
262 p1.relative_to(p2)
263 return True
264 except ValueError:
265 pass
267 try:
268 p2.relative_to(p1)
269 return True
270 except ValueError:
271 pass
273 return False
275 def _normalize_path(self, path: str) -> str:
276 """Normalize path for consistent comparison."""
277 return str(Path(path).resolve())
279 def _process_alive(self, pid: int) -> bool:
280 """Check if process is still running."""
281 return _process_alive(pid)