Coverage for nexios\auth\backends\jwt.py: 89%
44 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-21 20:31 +0100
1try:
2 import jwt
3except ImportError:
4 jwt = None
5from typing import Optional, Dict, Any, List
6from nexios.auth.base import AuthenticationBackend
7from nexios.http import Request, Response
8from nexios.auth.base import UnauthenticatedUser
9from nexios.config import get_config
12def create_jwt(
13 payload: Dict[str, Any], secret: Optional[str] = None, algorithm: str = "HS256"
14) -> str:
15 """
16 Create a JWT token.
17 Args:
18 payload (dict): Data to include in the token.
19 secret (str): Secret key to sign the token.
20 algorithm (str): Algorithm to use for signing the token.
21 Returns:
22 str: Encoded JWT token.
23 """
24 if jwt is None:
25 raise ImportError("JWT support is not installed.")
26 secret = secret or get_config().secret_key
27 return jwt.encode(payload, secret, algorithm=algorithm) # type:ignore
30def decode_jwt(
31 token: str, secret: Optional[str] = None, algorithms: List[str] = ["HS256"]
32) -> Dict[str, Any]:
33 """
34 Decode a JWT token.
35 Args:
36 token (str): Encoded JWT token.
37 secret (str): Secret key used to sign the token.
38 algorithms (list): List of algorithms to decode the token.
39 Returns:
40 dict: Decoded token payload.
41 """
42 if jwt is None:
43 raise ImportError("JWT support is not installed.")
44 secret = secret or get_config().secret_key
45 try:
46 return jwt.decode(token, secret, algorithms=algorithms) # type:ignore
47 except jwt.ExpiredSignatureError: # type:ignore
48 raise ValueError("Token has expired") # type:ignore
49 except jwt.InvalidTokenError: # type:ignore
50 raise ValueError("Invalid token")
53class JWTAuthBackend(AuthenticationBackend):
54 def __init__(self, authenticate_func): # type:ignore
55 self.authenticate_func = authenticate_func
57 async def authenticate(
58 self, request: Request, response: Response
59 ) -> Any: # type:ignore
60 app_config = get_config()
61 self.secret = app_config.secret_key
62 self.algorithms = app_config.jwt_algorithms or ["HS256"]
64 auth_header = request.headers.get("Authorization")
65 if not auth_header or not auth_header.startswith("Bearer "):
66 response.set_header("WWW-Authenticate", 'Bearer realm="Access to the API"')
67 return None
69 token = auth_header.split(" ")[1]
70 try:
71 payload = decode_jwt(token, self.secret, self.algorithms)
72 except ValueError as _:
74 return None
76 user: Any = await self.authenticate_func(**payload)
77 if not user:
78 return UnauthenticatedUser()
80 return user, "jwt"