Coverage for nexios\session\base.py: 61%

126 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1from typing import Dict, Any, Iterable, Union 

2from nexios.config import get_config, MakeConfig 

3from datetime import datetime, timedelta, timezone 

4import secrets 

5import typing 

6 

7 

8class BaseSessionInterface: 

9 

10 modified = False 

11 

12 accessed = False 

13 

14 deleted = False 

15 

16 _session_cache: Dict[str, Any] = {} 

17 

18 def __init__(self, session_key: str) -> None: 

19 

20 config: MakeConfig = get_config() 

21 self.session_key = session_key 

22 if not config.secret_key: 

23 return 

24 self.config = config 

25 self.session_config = config.session 

26 

27 def __getitem__(self, key: str) -> Any: 

28 self.accessed = True 

29 return self._session_cache[key] 

30 

31 def __setitem__(self, key: str, value: Any) -> None: 

32 self.modified = True 

33 self.accessed = True 

34 self._session_cache[key] = value 

35 

36 def __delitem__(self, key: str) -> None: 

37 self.modified = True 

38 self.deleted = True 

39 del self._session_cache[key] 

40 

41 def __iter__(self) -> Iterable[str]: 

42 self.accessed = True 

43 return iter(self._session_cache) 

44 

45 def __len__(self) -> int: 

46 self.accessed = True 

47 return len(self._session_cache) 

48 

49 def __contains__(self, key: str) -> bool: 

50 self.accessed = True 

51 return key in self._session_cache 

52 

53 def set_session(self, key: str, value: str): 

54 

55 self.modified = True 

56 self.accessed = True 

57 

58 self._session_cache[key] = value 

59 

60 def get_session(self, key: str): 

61 self.accessed = True 

62 

63 return self._session_cache.get(key, None) 

64 

65 def get_all(self): 

66 self.accessed = True 

67 return self._session_cache.items() 

68 

69 def delete_session(self, key: str): 

70 self.modified = True 

71 self.deleted = True 

72 if key in self._session_cache: 

73 del self._session_cache[key] 

74 

75 def keys(self): 

76 return self._session_cache.keys() 

77 

78 def values(self): 

79 return self._session_cache.values() 

80 

81 def is_empty(self): 

82 return self._session_cache.items().__len__() == 0 

83 

84 async def save(self): 

85 

86 raise NotImplemented 

87 

88 def get_cookie_name(self) -> str: 

89 """The name of the session cookie. Uses``app.config.SESSION_COOKIE_NAME``.""" 

90 if not self.session_config: 

91 return "session_id" 

92 return self.session_config.session_cookie_name or "session_id" 

93 

94 def get_cookie_domain(self) -> typing.Optional[str]: 

95 """Returns the domain for which the cookie is valid. Uses `config.SESSION_COOKIE_DOMAIN`.""" 

96 if not self.session_config: 

97 return None 

98 return self.session_config.session_cookie_domain 

99 

100 def get_cookie_path(self) -> Union[str, None]: 

101 """Returns the path for which the cookie is valid. Uses `config.SESSION_COOKIE_PATH`.""" 

102 if not self.session_config: 

103 return None 

104 return self.session_config.session_cookie_path 

105 

106 def get_cookie_httponly(self) -> typing.Optional[bool]: 

107 """Returns whether the session cookie should be HTTPOnly. Uses `session_config.session_cookie_httponly`.""" 

108 if not self.session_config: 

109 return None 

110 return self.session_config.session_cookie_httponly 

111 

112 def get_cookie_secure(self) -> typing.Optional[bool]: 

113 """Returns whether the session cookie should be secure. Uses `session_config.session_cookie_secure`.""" 

114 if not self.session_config: 

115 return None 

116 return self.session_config.session_cookie_secure 

117 

118 def get_cookie_samesite(self) -> typing.Optional[str]: 

119 """Returns the SameSite attribute for the cookie. Uses `session_config.session_cookie_samesite`.""" 

120 if not self.session_config: 

121 return None 

122 return self.session_config.session_cookie_samesite 

123 

124 def get_cookie_partitioned(self) -> typing.Optional[bool]: 

125 """Returns whether the cookie should be partitioned. Uses `session_config.session_cookie_partitioned`.""" 

126 if not self.session_config: 

127 return None 

128 return self.session_config.session_cookie_partitioned 

129 

130 def get_expiration_time(self) -> typing.Optional[datetime]: 

131 """Returns the expiration time for the session if one is set.""" 

132 if not self.session_config: 

133 # No config - use default 7 day expiration 

134 if not hasattr(self, "_expiration_time"): 

135 self._expiration_time = datetime.now(timezone.utc) + timedelta(days=7) 

136 return self._expiration_time 

137 

138 if not self.session_config.session_permanent: 

139 # Non-permanent session - calculate expiration if not set 

140 if not hasattr(self, "_expiration_time"): 

141 expiration_seconds = ( 

142 self.session_config.session_expiration_time or 86400 

143 ) 

144 self._expiration_time = datetime.now(timezone.utc) + timedelta( 

145 seconds=expiration_seconds 

146 ) 

147 return self._expiration_time 

148 

149 return None 

150 

151 @property 

152 def should_set_cookie(self) -> bool: 

153 """Determines if the cookie should be set. Depends on `config.SESSION_REFRESH_EACH_REQUEST`.""" 

154 

155 if not self.session_config: 

156 

157 return self.modified 

158 return self.modified or ( 

159 self.session_config.session_permanent 

160 and self.session_config.session_refresh_each_request 

161 ) 

162 

163 def has_expired(self) -> bool: 

164 """Returns True if the session has expired.""" 

165 expiration_time = self.get_expiration_time() 

166 if expiration_time and datetime.now(timezone.utc) > expiration_time: # type: ignore 

167 return True 

168 return False 

169 

170 def get_session_key(self) -> str: 

171 """Returns the session key.""" 

172 if self.session_key: 

173 return self.session_key 

174 return secrets.token_hex(32) 

175 

176 def clear(self): 

177 self._session_cache = {} 

178 

179 def get(self, key: str): 

180 return self._session_cache.get(key) 

181 

182 def set_expiration_time(self, expiration: datetime) -> None: 

183 """Set the expiration time for the session.""" 

184 self._expiration_time = expiration 

185 

186 def __str__(self) -> str: 

187 return f"<Sesion { self._session_cache} >" 

188 

189 def __del__(self) -> None: 

190 self.clear() 

191 self.session_key = None 

192 self.config = None 

193 self.session_config = None