Coverage for little_loops / fsm / concurrency.py: 27%

122 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""Scope-based concurrency control for FSM loops. 

2 

3Prevents concurrent loops from conflicting when operating on 

4the same files or directories through file-based locking. 

5 

6Public exports: 

7 ScopeLock: Dataclass representing a scope lock 

8 LockManager: Manager for acquiring/releasing scope locks 

9""" 

10 

11from __future__ import annotations 

12 

13import errno 

14import fcntl 

15import json 

16import os 

17import time 

18from dataclasses import dataclass 

19from datetime import UTC, datetime 

20from pathlib import Path 

21from typing import Any 

22 

23RUNNING_DIR = ".running" 

24 

25 

26def _process_alive(pid: int) -> bool: 

27 """Check if a process is still running. 

28 

29 Returns True if alive (or alive but unreadable due to EPERM), 

30 False only if process does not exist (ESRCH). 

31 """ 

32 try: 

33 os.kill(pid, 0) 

34 return True 

35 except OSError as e: 

36 if e.errno == errno.ESRCH: 

37 return False # No such process 

38 return True # EPERM or other: process exists, no permission 

39 

40 

41def _iso_now() -> str: 

42 """Return current time as ISO8601 string.""" 

43 return datetime.now(UTC).isoformat() 

44 

45 

46@dataclass 

47class ScopeLock: 

48 """Represents a lock on a set of paths for a running loop. 

49 

50 Attributes: 

51 loop_name: Name of the loop holding the lock 

52 scope: List of paths this loop operates on 

53 pid: Process ID of the lock holder 

54 started_at: ISO timestamp when lock was acquired 

55 """ 

56 

57 loop_name: str 

58 scope: list[str] 

59 pid: int 

60 started_at: str 

61 

62 def to_dict(self) -> dict[str, Any]: 

63 """Convert to dictionary for JSON serialization.""" 

64 return { 

65 "loop_name": self.loop_name, 

66 "scope": self.scope, 

67 "pid": self.pid, 

68 "started_at": self.started_at, 

69 } 

70 

71 @classmethod 

72 def from_dict(cls, data: dict[str, Any]) -> ScopeLock: 

73 """Create from dictionary (JSON deserialization).""" 

74 return cls( 

75 loop_name=str(data["loop_name"]), 

76 scope=list(data["scope"]) if isinstance(data["scope"], list) else [str(data["scope"])], 

77 pid=int(data["pid"]), 

78 started_at=str(data["started_at"]), 

79 ) 

80 

81 

82class LockManager: 

83 """Manage scope-based locks for concurrent loop execution. 

84 

85 Lock files are stored in .loops/.running/<name>.lock 

86 and contain JSON with ScopeLock data. 

87 """ 

88 

89 def __init__(self, loops_dir: Path | None = None) -> None: 

90 """Initialize the lock manager. 

91 

92 Args: 

93 loops_dir: Base directory for loops (default: .loops) 

94 """ 

95 self.loops_dir = loops_dir or Path(".loops") 

96 self.running_dir = self.loops_dir / RUNNING_DIR 

97 

98 def acquire(self, loop_name: str, scope: list[str]) -> bool: 

99 """Attempt to acquire lock for the given scope. 

100 

101 Args: 

102 loop_name: Name of the loop to acquire lock for 

103 scope: List of paths the loop operates on 

104 

105 Returns: 

106 True if lock acquired, False if conflict exists 

107 """ 

108 # Normalize scope - empty means whole project 

109 if not scope: 

110 scope = ["."] 

111 scope = [self._normalize_path(p) for p in scope] 

112 

113 # Ensure running directory exists before opening sentinel lock 

114 self.running_dir.mkdir(parents=True, exist_ok=True) 

115 

116 # Serialize the check-and-create sequence across processes using a 

117 # sentinel file. This eliminates the TOCTOU window between 

118 # find_conflict() (read) and lock-file creation (write). 

119 # .acquire.lock is a dotfile so Path.glob("*.lock") will not match it 

120 # and stale-lock cleanup in find_conflict/list_locks ignores it. 

121 dir_lock_path = self.running_dir / ".acquire.lock" 

122 with open(dir_lock_path, "w") as dir_lock: 

123 fcntl.flock(dir_lock, fcntl.LOCK_EX) 

124 

125 # Check for conflicts (now atomic with write below) 

126 conflict = self.find_conflict(scope) 

127 if conflict: 

128 return False 

129 

130 # Create lock file 

131 lock_file = self.running_dir / f"{loop_name}.lock" 

132 lock = ScopeLock( 

133 loop_name=loop_name, 

134 scope=scope, 

135 pid=os.getpid(), 

136 started_at=_iso_now(), 

137 ) 

138 with open(lock_file, "w") as f: 

139 json.dump(lock.to_dict(), f) 

140 

141 return True 

142 

143 def release(self, loop_name: str) -> None: 

144 """Release lock for a loop. 

145 

146 Args: 

147 loop_name: Name of the loop to release lock for 

148 """ 

149 lock_file = self.running_dir / f"{loop_name}.lock" 

150 lock_file.unlink(missing_ok=True) 

151 

152 def find_conflict(self, scope: list[str]) -> ScopeLock | None: 

153 """Find any running loop with overlapping scope. 

154 

155 Also cleans up stale locks from dead processes. 

156 

157 Args: 

158 scope: Scope to check for conflicts 

159 

160 Returns: 

161 ScopeLock of conflicting loop, or None if no conflict 

162 """ 

163 if not self.running_dir.exists(): 

164 return None 

165 

166 # Normalize once before the comparison loop to avoid O(n*m) stat calls 

167 normalized_scope = [self._normalize_path(p) for p in scope] 

168 

169 for lock_file in self.running_dir.glob("*.lock"): 

170 try: 

171 with open(lock_file) as f: 

172 data = json.load(f) 

173 lock = ScopeLock.from_dict(data) 

174 

175 # Check if process is still alive 

176 if not self._process_alive(lock.pid): 

177 # Stale lock, remove it 

178 lock_file.unlink(missing_ok=True) 

179 continue 

180 

181 # Normalize lock scope (lock files from acquire() are already 

182 # absolute, but normalize defensively in case of legacy files) 

183 lock_scope = [self._normalize_path(p) for p in lock.scope] 

184 if self._scopes_overlap(normalized_scope, lock_scope): 

185 return lock 

186 

187 except (json.JSONDecodeError, KeyError, FileNotFoundError): 

188 # Malformed or deleted lock file, skip 

189 continue 

190 

191 return None 

192 

193 def list_locks(self) -> list[ScopeLock]: 

194 """List all active locks. 

195 

196 Cleans up stale locks as a side effect. 

197 

198 Returns: 

199 List of active ScopeLock objects 

200 """ 

201 locks: list[ScopeLock] = [] 

202 if not self.running_dir.exists(): 

203 return locks 

204 

205 for lock_file in self.running_dir.glob("*.lock"): 

206 try: 

207 with open(lock_file) as f: 

208 data = json.load(f) 

209 lock = ScopeLock.from_dict(data) 

210 

211 if self._process_alive(lock.pid): 

212 locks.append(lock) 

213 else: 

214 # Stale lock, remove it 

215 lock_file.unlink(missing_ok=True) 

216 except (json.JSONDecodeError, KeyError, FileNotFoundError): 

217 continue 

218 

219 return locks 

220 

221 def wait_for_scope(self, scope: list[str], timeout: int = 300) -> bool: 

222 """Wait until scope is available. 

223 

224 Args: 

225 scope: Scope to wait for 

226 timeout: Maximum time to wait in seconds 

227 

228 Returns: 

229 True if scope became available, False if timeout 

230 """ 

231 start = time.time() 

232 while time.time() - start < timeout: 

233 conflict = self.find_conflict(scope) 

234 if conflict is None: 

235 return True 

236 time.sleep(1) 

237 

238 return False 

239 

240 def _scopes_overlap(self, scope1: list[str], scope2: list[str]) -> bool: 

241 """Check if two scopes have any overlapping paths.""" 

242 for p1 in scope1: 

243 for p2 in scope2: 

244 if self._paths_overlap(p1, p2): 

245 return True 

246 return False 

247 

248 def _paths_overlap(self, path1: str, path2: str) -> bool: 

249 """Check if two paths overlap (same, or one contains the other). 

250 

251 Assumes paths are already normalized (pre-resolved absolute strings). 

252 """ 

253 p1 = Path(path1) 

254 p2 = Path(path2) 

255 

256 # Same path 

257 if p1 == p2: 

258 return True 

259 

260 # One is parent of other 

261 try: 

262 p1.relative_to(p2) 

263 return True 

264 except ValueError: 

265 pass 

266 

267 try: 

268 p2.relative_to(p1) 

269 return True 

270 except ValueError: 

271 pass 

272 

273 return False 

274 

275 def _normalize_path(self, path: str) -> str: 

276 """Normalize path for consistent comparison.""" 

277 return str(Path(path).resolve()) 

278 

279 def _process_alive(self, pid: int) -> bool: 

280 """Check if process is still running.""" 

281 return _process_alive(pid)