Coverage for netrun / rbac / dependencies.py: 76%

58 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-18 22:20 +0000

1""" 

2FastAPI Dependencies for RBAC Enforcement 

3 

4Extracted from: Intirkast middleware/rbac.py + middleware/auth.py 

5Generalized for multi-project reuse with placeholder patterns 

6 

7Usage: 

8 from netrun.rbac import require_role, require_admin 

9 

10 @router.get("/admin/dashboard") 

11 async def admin_dashboard(user: dict = Depends(require_admin)): 

12 return {"message": "Admin access granted"} 

13""" 

14 

15import logging 

16from typing import Callable, List 

17 

18from fastapi import Depends, HTTPException 

19 

20from .exceptions import InsufficientPermissionsError, ResourceOwnershipError 

21from .models import RoleHierarchy 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26# PLACEHOLDER: Replace with your authentication dependency 

27# This should return user context dict with: user_id, tenant_id, roles 

28def get_current_user() -> dict: 

29 """ 

30 PLACEHOLDER: Replace with your authentication dependency 

31 

32 Expected return format: 

33 { 

34 "user_id": "uuid-string", 

35 "tenant_id": "uuid-string", 

36 "email": "user@example.com", 

37 "roles": ["admin"], # or single role as string 

38 "auth_source": "jwt|session|azure_ad" 

39 } 

40 

41 Example replacement: 

42 from your_app.auth import get_current_user_token 

43 

44 def get_current_user(token: dict = Depends(get_current_user_token)): 

45 return { 

46 "user_id": token["sub"], 

47 "tenant_id": token["tenant_id"], 

48 "roles": token.get("roles", ["member"]) 

49 } 

50 """ 

51 raise HTTPException( 

52 status_code=500, 

53 detail="RBAC not configured: Replace get_current_user placeholder with your auth dependency", 

54 ) 

55 

56 

57def require_role(required_role: str) -> Callable: 

58 """ 

59 Dependency factory to enforce role requirements (hierarchical) 

60 

61 Extracted from: Intirkast middleware/rbac.py (require_role pattern) 

62 

63 Args: 

64 required_role: Minimum required role (viewer|member|admin|owner) 

65 

66 Returns: 

67 FastAPI dependency function 

68 

69 Usage: 

70 @router.get("/api/admin/settings") 

71 async def get_settings(user: dict = Depends(require_role("admin"))): 

72 # Only admin and owner can access 

73 return {"settings": "..."} 

74 

75 @router.delete("/api/tenant") 

76 async def delete_tenant(user: dict = Depends(require_role("owner"))): 

77 # Only owner can delete tenant 

78 return {"status": "deleted"} 

79 """ 

80 

81 async def role_checker(user: dict = Depends(get_current_user)) -> dict: 

82 """ 

83 Validate user has required role level 

84 

85 Args: 

86 user: User context from authentication 

87 

88 Returns: 

89 User context if authorized 

90 

91 Raises: 

92 InsufficientPermissionsError: If user lacks required role 

93 """ 

94 user_roles = user.get("roles", []) 

95 

96 # Ensure roles is a list 

97 if isinstance(user_roles, str): 

98 user_roles = [user_roles] 

99 

100 # Get highest role level from user's roles 

101 user_role_level = max( 

102 [RoleHierarchy.get_role_level(role) for role in user_roles], default=-1 

103 ) 

104 

105 required_level = RoleHierarchy.get_role_level(required_role) 

106 

107 if user_role_level < required_level: 

108 logger.warning( 

109 f"Access denied: User {user.get('user_id')} has roles {user_roles}, " 

110 f"but {required_role} required" 

111 ) 

112 raise InsufficientPermissionsError( 

113 required_role=required_role, user_role=user_roles[0] if user_roles else None 

114 ) 

115 

116 return user 

117 

118 return role_checker 

119 

120 

121def require_roles(allowed_roles: List[str]) -> Callable: 

122 """ 

123 Dependency factory to enforce multiple allowed roles (non-hierarchical) 

124 

125 Extracted from: Intirkast middleware/rbac.py (require_any_role pattern) 

126 

127 Args: 

128 allowed_roles: List of allowed roles 

129 

130 Returns: 

131 FastAPI dependency function 

132 

133 Usage: 

134 @router.patch("/api/posts/{post_id}") 

135 async def update_post( 

136 post_id: str, 

137 user: dict = Depends(require_roles(["member", "admin", "owner"])) 

138 ): 

139 # Members, admins, or owners can update posts 

140 return {"status": "updated"} 

141 """ 

142 

143 async def role_checker(user: dict = Depends(get_current_user)) -> dict: 

144 """ 

145 Validate user has one of the allowed roles 

146 

147 Args: 

148 user: User context from authentication 

149 

150 Returns: 

151 User context if authorized 

152 

153 Raises: 

154 InsufficientPermissionsError: If user lacks any allowed role 

155 """ 

156 user_roles = user.get("roles", []) 

157 

158 # Ensure roles is a list 

159 if isinstance(user_roles, str): 

160 user_roles = [user_roles] 

161 

162 # Check if user has any of the required roles 

163 has_required_role = any(role in user_roles for role in allowed_roles) 

164 

165 if not has_required_role: 

166 logger.warning( 

167 f"Access denied: User {user.get('user_id')} has roles {user_roles}, " 

168 f"but one of {allowed_roles} required" 

169 ) 

170 raise HTTPException( 

171 status_code=403, 

172 detail=f"Insufficient permissions. Required roles: {', '.join(allowed_roles)}", 

173 ) 

174 

175 return user 

176 

177 return role_checker 

178 

179 

180def require_owner() -> Callable: 

181 """ 

182 Dependency to enforce owner-only access 

183 

184 Convenience wrapper for require_role("owner") 

185 

186 Usage: 

187 @router.delete("/api/tenant/{tenant_id}") 

188 async def delete_tenant( 

189 tenant_id: str, 

190 user: dict = Depends(require_owner()) 

191 ): 

192 # Only tenant owner can delete 

193 return {"status": "deleted"} 

194 """ 

195 return require_role("owner") 

196 

197 

198def require_admin() -> Callable: 

199 """ 

200 Dependency to enforce admin or owner access 

201 

202 Convenience wrapper for require_role("admin") 

203 

204 Usage: 

205 @router.post("/api/users/invite") 

206 async def invite_user( 

207 invite_data: InviteRequest, 

208 user: dict = Depends(require_admin()) 

209 ): 

210 # Admins and owners can invite users 

211 return {"status": "invited"} 

212 """ 

213 return require_role("admin") 

214 

215 

216def require_member() -> Callable: 

217 """ 

218 Dependency to enforce member, admin, or owner access 

219 

220 Convenience wrapper for require_role("member") 

221 

222 Usage: 

223 @router.post("/api/content/schedule") 

224 async def schedule_content( 

225 content_data: ContentRequest, 

226 user: dict = Depends(require_member()) 

227 ): 

228 # Members, admins, and owners can schedule content 

229 return {"status": "scheduled"} 

230 """ 

231 return require_role("member") 

232 

233 

234def check_resource_ownership(user: dict, resource_user_id: str) -> bool: 

235 """ 

236 Helper function to check if user owns a resource or is owner/admin 

237 

238 Extracted from: Intirkast middleware/rbac.py (check_resource_ownership) 

239 

240 Args: 

241 user: User context from authentication 

242 resource_user_id: User ID who owns the resource 

243 

244 Returns: 

245 True if user can access the resource 

246 

247 Usage in endpoint logic: 

248 @router.patch("/api/posts/{post_id}") 

249 async def update_post( 

250 post_id: str, 

251 post_data: UpdatePostRequest, 

252 user: dict = Depends(get_current_user), 

253 db: AsyncSession = Depends(get_db) 

254 ): 

255 # Get post from database 

256 post = await db.get(Post, post_id) 

257 

258 # Check ownership 

259 if not check_resource_ownership(user, post.user_id): 

260 raise HTTPException(status_code=403, detail="Not authorized") 

261 

262 # Update post... 

263 """ 

264 user_roles = user.get("roles", []) 

265 current_user_id = user.get("user_id") 

266 

267 # Ensure roles is a list 

268 if isinstance(user_roles, str): 

269 user_roles = [user_roles] 

270 

271 # Owners and admins can access all resources 

272 if "owner" in user_roles or "admin" in user_roles: 

273 return True 

274 

275 # Otherwise, must be resource owner 

276 return current_user_id == resource_user_id 

277 

278 

279def require_owner_or_self(resource_user_id_getter: Callable) -> Callable: 

280 """ 

281 Dependency factory to enforce owner role OR self-access 

282 

283 Extracted from: Intirkast middleware/rbac.py (require_owner_or_self pattern) 

284 

285 Args: 

286 resource_user_id_getter: Async function to get resource's user_id 

287 

288 Returns: 

289 FastAPI dependency function 

290 

291 Usage: 

292 async def get_user_id_from_path(user_id: str) -> str: 

293 return user_id 

294 

295 @router.patch("/api/users/{user_id}") 

296 async def update_user( 

297 user_id: str, 

298 user_data: UpdateUserRequest, 

299 current_user: dict = Depends(require_owner_or_self(get_user_id_from_path)) 

300 ): 

301 # Users can update their own profile, or owners can update anyone 

302 return {"status": "updated"} 

303 """ 

304 

305 async def access_checker( 

306 current_user: dict = Depends(get_current_user), 

307 resource_user_id: str = Depends(resource_user_id_getter), 

308 ) -> dict: 

309 """ 

310 Validate user is owner OR accessing their own resource 

311 

312 Args: 

313 current_user: User context from authentication 

314 resource_user_id: User ID who owns the resource 

315 

316 Returns: 

317 User context if authorized 

318 

319 Raises: 

320 ResourceOwnershipError: If user lacks access 

321 """ 

322 user_roles = current_user.get("roles", []) 

323 current_user_id = current_user.get("user_id") 

324 

325 # Ensure roles is a list 

326 if isinstance(user_roles, str): 

327 user_roles = [user_roles] 

328 

329 # Check if user is owner OR accessing their own resource 

330 is_owner = "owner" in user_roles 

331 is_self = current_user_id == resource_user_id 

332 

333 if not (is_owner or is_self): 

334 logger.warning( 

335 f"Access denied: User {current_user_id} attempting to access " 

336 f"resource owned by {resource_user_id}" 

337 ) 

338 raise ResourceOwnershipError() 

339 

340 return current_user 

341 

342 return access_checker