Coverage for netrun_rbac \ dependencies.py: 76%
58 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 14:16 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-28 14:16 -0800
1"""
2FastAPI Dependencies for RBAC Enforcement
4Extracted from: Intirkast middleware/rbac.py + middleware/auth.py
5Generalized for multi-project reuse with placeholder patterns
7Usage:
8 from netrun_rbac import require_role, require_admin
10 @router.get("/admin/dashboard")
11 async def admin_dashboard(user: dict = Depends(require_admin)):
12 return {"message": "Admin access granted"}
13"""
15import logging
16from typing import Callable, List
18from fastapi import Depends, HTTPException
20from .exceptions import InsufficientPermissionsError, ResourceOwnershipError
21from .models import RoleHierarchy
23logger = logging.getLogger(__name__)
26# PLACEHOLDER: Replace with your authentication dependency
27# This should return user context dict with: user_id, tenant_id, roles
28def get_current_user() -> dict:
29 """
30 PLACEHOLDER: Replace with your authentication dependency
32 Expected return format:
33 {
34 "user_id": "uuid-string",
35 "tenant_id": "uuid-string",
36 "email": "user@example.com",
37 "roles": ["admin"], # or single role as string
38 "auth_source": "jwt|session|azure_ad"
39 }
41 Example replacement:
42 from your_app.auth import get_current_user_token
44 def get_current_user(token: dict = Depends(get_current_user_token)):
45 return {
46 "user_id": token["sub"],
47 "tenant_id": token["tenant_id"],
48 "roles": token.get("roles", ["member"])
49 }
50 """
51 raise HTTPException(
52 status_code=500,
53 detail="RBAC not configured: Replace get_current_user placeholder with your auth dependency",
54 )
57def require_role(required_role: str) -> Callable:
58 """
59 Dependency factory to enforce role requirements (hierarchical)
61 Extracted from: Intirkast middleware/rbac.py (require_role pattern)
63 Args:
64 required_role: Minimum required role (viewer|member|admin|owner)
66 Returns:
67 FastAPI dependency function
69 Usage:
70 @router.get("/api/admin/settings")
71 async def get_settings(user: dict = Depends(require_role("admin"))):
72 # Only admin and owner can access
73 return {"settings": "..."}
75 @router.delete("/api/tenant")
76 async def delete_tenant(user: dict = Depends(require_role("owner"))):
77 # Only owner can delete tenant
78 return {"status": "deleted"}
79 """
81 async def role_checker(user: dict = Depends(get_current_user)) -> dict:
82 """
83 Validate user has required role level
85 Args:
86 user: User context from authentication
88 Returns:
89 User context if authorized
91 Raises:
92 InsufficientPermissionsError: If user lacks required role
93 """
94 user_roles = user.get("roles", [])
96 # Ensure roles is a list
97 if isinstance(user_roles, str):
98 user_roles = [user_roles]
100 # Get highest role level from user's roles
101 user_role_level = max(
102 [RoleHierarchy.get_role_level(role) for role in user_roles], default=-1
103 )
105 required_level = RoleHierarchy.get_role_level(required_role)
107 if user_role_level < required_level:
108 logger.warning(
109 f"Access denied: User {user.get('user_id')} has roles {user_roles}, "
110 f"but {required_role} required"
111 )
112 raise InsufficientPermissionsError(
113 required_role=required_role, user_role=user_roles[0] if user_roles else None
114 )
116 return user
118 return role_checker
121def require_roles(allowed_roles: List[str]) -> Callable:
122 """
123 Dependency factory to enforce multiple allowed roles (non-hierarchical)
125 Extracted from: Intirkast middleware/rbac.py (require_any_role pattern)
127 Args:
128 allowed_roles: List of allowed roles
130 Returns:
131 FastAPI dependency function
133 Usage:
134 @router.patch("/api/posts/{post_id}")
135 async def update_post(
136 post_id: str,
137 user: dict = Depends(require_roles(["member", "admin", "owner"]))
138 ):
139 # Members, admins, or owners can update posts
140 return {"status": "updated"}
141 """
143 async def role_checker(user: dict = Depends(get_current_user)) -> dict:
144 """
145 Validate user has one of the allowed roles
147 Args:
148 user: User context from authentication
150 Returns:
151 User context if authorized
153 Raises:
154 InsufficientPermissionsError: If user lacks any allowed role
155 """
156 user_roles = user.get("roles", [])
158 # Ensure roles is a list
159 if isinstance(user_roles, str):
160 user_roles = [user_roles]
162 # Check if user has any of the required roles
163 has_required_role = any(role in user_roles for role in allowed_roles)
165 if not has_required_role:
166 logger.warning(
167 f"Access denied: User {user.get('user_id')} has roles {user_roles}, "
168 f"but one of {allowed_roles} required"
169 )
170 raise HTTPException(
171 status_code=403,
172 detail=f"Insufficient permissions. Required roles: {', '.join(allowed_roles)}",
173 )
175 return user
177 return role_checker
180def require_owner() -> Callable:
181 """
182 Dependency to enforce owner-only access
184 Convenience wrapper for require_role("owner")
186 Usage:
187 @router.delete("/api/tenant/{tenant_id}")
188 async def delete_tenant(
189 tenant_id: str,
190 user: dict = Depends(require_owner())
191 ):
192 # Only tenant owner can delete
193 return {"status": "deleted"}
194 """
195 return require_role("owner")
198def require_admin() -> Callable:
199 """
200 Dependency to enforce admin or owner access
202 Convenience wrapper for require_role("admin")
204 Usage:
205 @router.post("/api/users/invite")
206 async def invite_user(
207 invite_data: InviteRequest,
208 user: dict = Depends(require_admin())
209 ):
210 # Admins and owners can invite users
211 return {"status": "invited"}
212 """
213 return require_role("admin")
216def require_member() -> Callable:
217 """
218 Dependency to enforce member, admin, or owner access
220 Convenience wrapper for require_role("member")
222 Usage:
223 @router.post("/api/content/schedule")
224 async def schedule_content(
225 content_data: ContentRequest,
226 user: dict = Depends(require_member())
227 ):
228 # Members, admins, and owners can schedule content
229 return {"status": "scheduled"}
230 """
231 return require_role("member")
234def check_resource_ownership(user: dict, resource_user_id: str) -> bool:
235 """
236 Helper function to check if user owns a resource or is owner/admin
238 Extracted from: Intirkast middleware/rbac.py (check_resource_ownership)
240 Args:
241 user: User context from authentication
242 resource_user_id: User ID who owns the resource
244 Returns:
245 True if user can access the resource
247 Usage in endpoint logic:
248 @router.patch("/api/posts/{post_id}")
249 async def update_post(
250 post_id: str,
251 post_data: UpdatePostRequest,
252 user: dict = Depends(get_current_user),
253 db: AsyncSession = Depends(get_db)
254 ):
255 # Get post from database
256 post = await db.get(Post, post_id)
258 # Check ownership
259 if not check_resource_ownership(user, post.user_id):
260 raise HTTPException(status_code=403, detail="Not authorized")
262 # Update post...
263 """
264 user_roles = user.get("roles", [])
265 current_user_id = user.get("user_id")
267 # Ensure roles is a list
268 if isinstance(user_roles, str):
269 user_roles = [user_roles]
271 # Owners and admins can access all resources
272 if "owner" in user_roles or "admin" in user_roles:
273 return True
275 # Otherwise, must be resource owner
276 return current_user_id == resource_user_id
279def require_owner_or_self(resource_user_id_getter: Callable) -> Callable:
280 """
281 Dependency factory to enforce owner role OR self-access
283 Extracted from: Intirkast middleware/rbac.py (require_owner_or_self pattern)
285 Args:
286 resource_user_id_getter: Async function to get resource's user_id
288 Returns:
289 FastAPI dependency function
291 Usage:
292 async def get_user_id_from_path(user_id: str) -> str:
293 return user_id
295 @router.patch("/api/users/{user_id}")
296 async def update_user(
297 user_id: str,
298 user_data: UpdateUserRequest,
299 current_user: dict = Depends(require_owner_or_self(get_user_id_from_path))
300 ):
301 # Users can update their own profile, or owners can update anyone
302 return {"status": "updated"}
303 """
305 async def access_checker(
306 current_user: dict = Depends(get_current_user),
307 resource_user_id: str = Depends(resource_user_id_getter),
308 ) -> dict:
309 """
310 Validate user is owner OR accessing their own resource
312 Args:
313 current_user: User context from authentication
314 resource_user_id: User ID who owns the resource
316 Returns:
317 User context if authorized
319 Raises:
320 ResourceOwnershipError: If user lacks access
321 """
322 user_roles = current_user.get("roles", [])
323 current_user_id = current_user.get("user_id")
325 # Ensure roles is a list
326 if isinstance(user_roles, str):
327 user_roles = [user_roles]
329 # Check if user is owner OR accessing their own resource
330 is_owner = "owner" in user_roles
331 is_self = current_user_id == resource_user_id
333 if not (is_owner or is_self):
334 logger.warning(
335 f"Access denied: User {current_user_id} attempting to access "
336 f"resource owned by {resource_user_id}"
337 )
338 raise ResourceOwnershipError()
340 return current_user
342 return access_checker