Coverage for nexios\auth\backends\jwt.py: 89%

44 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1try: 

2 import jwt 

3except ImportError: 

4 jwt = None 

5from typing import Optional, Dict, Any, List 

6from nexios.auth.base import AuthenticationBackend 

7from nexios.http import Request, Response 

8from nexios.auth.base import UnauthenticatedUser 

9from nexios.config import get_config 

10 

11 

12def create_jwt( 

13 payload: Dict[str, Any], secret: Optional[str] = None, algorithm: str = "HS256" 

14) -> str: 

15 """ 

16 Create a JWT token. 

17 Args: 

18 payload (dict): Data to include in the token. 

19 secret (str): Secret key to sign the token. 

20 algorithm (str): Algorithm to use for signing the token. 

21 Returns: 

22 str: Encoded JWT token. 

23 """ 

24 if jwt is None: 

25 raise ImportError("JWT support is not installed.") 

26 secret = secret or get_config().secret_key 

27 return jwt.encode(payload, secret, algorithm=algorithm) # type:ignore 

28 

29 

30def decode_jwt( 

31 token: str, secret: Optional[str] = None, algorithms: List[str] = ["HS256"] 

32) -> Dict[str, Any]: 

33 """ 

34 Decode a JWT token. 

35 Args: 

36 token (str): Encoded JWT token. 

37 secret (str): Secret key used to sign the token. 

38 algorithms (list): List of algorithms to decode the token. 

39 Returns: 

40 dict: Decoded token payload. 

41 """ 

42 if jwt is None: 

43 raise ImportError("JWT support is not installed.") 

44 secret = secret or get_config().secret_key 

45 try: 

46 return jwt.decode(token, secret, algorithms=algorithms) # type:ignore 

47 except jwt.ExpiredSignatureError: # type:ignore 

48 raise ValueError("Token has expired") # type:ignore 

49 except jwt.InvalidTokenError: # type:ignore 

50 raise ValueError("Invalid token") 

51 

52 

53class JWTAuthBackend(AuthenticationBackend): 

54 def __init__(self, authenticate_func): # type:ignore 

55 self.authenticate_func = authenticate_func 

56 

57 async def authenticate( 

58 self, request: Request, response: Response 

59 ) -> Any: # type:ignore 

60 app_config = get_config() 

61 self.secret = app_config.secret_key 

62 self.algorithms = app_config.jwt_algorithms or ["HS256"] 

63 

64 auth_header = request.headers.get("Authorization") 

65 if not auth_header or not auth_header.startswith("Bearer "): 

66 response.set_header("WWW-Authenticate", 'Bearer realm="Access to the API"') 

67 return None 

68 

69 token = auth_header.split(" ")[1] 

70 try: 

71 payload = decode_jwt(token, self.secret, self.algorithms) 

72 except ValueError as _: 

73 

74 return None 

75 

76 user: Any = await self.authenticate_func(**payload) 

77 if not user: 

78 return UnauthenticatedUser() 

79 

80 return user, "jwt"