Coverage for jinja2_async_environment / caching / typed.py: 100%

120 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 21:26 -0800

1"""Type-safe cache implementation with generic support.""" 

2 

3import time 

4import typing as t 

5from dataclasses import dataclass 

6from threading import RLock 

7 

8T = t.TypeVar("T") 

9 

10 

11@dataclass 

12class CacheEntry[T]: 

13 """Cache entry with value, timestamp, and TTL information.""" 

14 

15 value: T 

16 timestamp: float 

17 ttl: int 

18 access_count: int = 0 

19 last_access: float = 0.0 

20 

21 

22class TypedCache[T]: 

23 """Type-safe cache with TTL, LRU eviction, and memory management. 

24 

25 This cache provides: 

26 - Type safety through generics 

27 - TTL-based expiration 

28 - LRU eviction when cache is full 

29 - Thread-safe operations 

30 - Memory usage tracking 

31 """ 

32 

33 def __init__(self, max_size: int = 1000, default_ttl: int = 300): 

34 """Initialize the typed cache. 

35 

36 Args: 

37 max_size: Maximum number of entries to store 

38 default_ttl: Default time-to-live in seconds 

39 """ 

40 self._cache: dict[str, CacheEntry[T]] = {} 

41 self._max_size = max_size 

42 self._default_ttl = default_ttl 

43 self._access_order: list[str] = [] 

44 self._lock = RLock() 

45 

46 # Statistics 

47 self._hits = 0 

48 self._misses = 0 

49 self._evictions = 0 

50 

51 def get(self, key: str) -> T | None: 

52 """Get value from cache with TTL and LRU tracking. 

53 

54 Args: 

55 key: Cache key 

56 

57 Returns: 

58 Cached value or None if not found/expired 

59 """ 

60 with self._lock: 

61 if key not in self._cache: 

62 self._misses += 1 

63 return None 

64 

65 entry = self._cache[key] 

66 current_time = time.time() 

67 

68 # Check TTL 

69 if current_time - entry.timestamp > entry.ttl: 

70 del self._cache[key] 

71 if key in self._access_order: 

72 self._access_order.remove(key) 

73 self._misses += 1 

74 return None 

75 

76 # Update access tracking 

77 entry.access_count += 1 

78 entry.last_access = current_time 

79 

80 # Update LRU order 

81 if key in self._access_order: 

82 self._access_order.remove(key) 

83 self._access_order.append(key) 

84 

85 self._hits += 1 

86 return entry.value 

87 

88 def set(self, key: str, value: T, ttl: int | None = None) -> None: 

89 """Store value in cache with automatic eviction. 

90 

91 Args: 

92 key: Cache key 

93 value: Value to store 

94 ttl: Time-to-live in seconds (uses default if None) 

95 """ 

96 with self._lock: 

97 current_time = time.time() 

98 

99 # Check if we need to evict entries 

100 if len(self._cache) >= self._max_size and key not in self._cache: 

101 self._evict_lru() 

102 

103 # Create cache entry 

104 entry = CacheEntry( 

105 value=value, 

106 timestamp=current_time, 

107 ttl=ttl or self._default_ttl, 

108 access_count=1, 

109 last_access=current_time, 

110 ) 

111 

112 self._cache[key] = entry 

113 

114 # Update access order 

115 if key in self._access_order: 

116 self._access_order.remove(key) 

117 self._access_order.append(key) 

118 

119 def delete(self, key: str) -> bool: 

120 """Delete a key from the cache. 

121 

122 Args: 

123 key: Cache key to delete 

124 

125 Returns: 

126 True if key was deleted, False if not found 

127 """ 

128 with self._lock: 

129 if key in self._cache: 

130 del self._cache[key] 

131 if key in self._access_order: 

132 self._access_order.remove(key) 

133 return True 

134 return False 

135 

136 def clear(self) -> None: 

137 """Clear all entries from the cache.""" 

138 with self._lock: 

139 self._cache.clear() 

140 self._access_order.clear() 

141 self._hits = 0 

142 self._misses = 0 

143 self._evictions = 0 

144 

145 def _evict_lru(self) -> None: 

146 """Evict least recently used entries to make space.""" 

147 # Evict 25% of entries to avoid frequent evictions 

148 evict_count = max(1, self._max_size // 4) 

149 

150 for _ in range(min(evict_count, len(self._access_order))): 

151 if self._access_order: 

152 lru_key = self._access_order.pop(0) 

153 if lru_key in self._cache: 

154 del self._cache[lru_key] 

155 self._evictions += 1 

156 

157 def cleanup_expired(self) -> int: 

158 """Remove all expired entries. 

159 

160 Returns: 

161 Number of entries removed 

162 """ 

163 with self._lock: 

164 current_time = time.time() 

165 expired_keys = [ 

166 key 

167 for key, entry in self._cache.items() 

168 if current_time - entry.timestamp > entry.ttl 

169 ] 

170 

171 for key in expired_keys: 

172 del self._cache[key] 

173 if key in self._access_order: 

174 self._access_order.remove(key) 

175 

176 return len(expired_keys) 

177 

178 def get_statistics(self) -> dict[str, t.Any]: 

179 """Get cache statistics. 

180 

181 Returns: 

182 Dictionary with cache statistics 

183 """ 

184 with self._lock: 

185 total_requests = self._hits + self._misses 

186 hit_rate = self._hits / total_requests if total_requests > 0 else 0.0 

187 

188 return { 

189 "size": len(self._cache), 

190 "max_size": self._max_size, 

191 "hits": self._hits, 

192 "misses": self._misses, 

193 "evictions": self._evictions, 

194 "hit_rate": hit_rate, 

195 "fill_ratio": len(self._cache) / self._max_size, 

196 } 

197 

198 def resize(self, new_max_size: int) -> None: 

199 """Resize the cache maximum size. 

200 

201 Args: 

202 new_max_size: New maximum size for the cache 

203 """ 

204 with self._lock: 

205 self._max_size = new_max_size 

206 

207 # If new size is smaller, evict excess entries 

208 if new_max_size < len(self._cache): 

209 self._evict_excess_entries(new_max_size) 

210 

211 def _evict_excess_entries(self, new_max_size: int) -> None: 

212 """Evict excess entries when cache is resized to a smaller size. 

213 

214 Args: 

215 new_max_size: New maximum size for the cache 

216 """ 

217 excess = len(self._cache) - new_max_size 

218 for _ in range(excess): 

219 if self._access_order: 

220 lru_key = self._access_order.pop(0) 

221 if lru_key in self._cache: 

222 del self._cache[lru_key] 

223 self._evictions += 1 

224 

225 def contains(self, key: str) -> bool: 

226 """Check if key exists in cache (without updating access). 

227 

228 Args: 

229 key: Cache key to check 

230 

231 Returns: 

232 True if key exists and not expired, False otherwise 

233 """ 

234 with self._lock: 

235 if key not in self._cache: 

236 return False 

237 

238 entry = self._cache[key] 

239 current_time = time.time() 

240 

241 # Check TTL without updating access 

242 if current_time - entry.timestamp > entry.ttl: 

243 return False 

244 

245 return True 

246 

247 def keys(self) -> list[str]: 

248 """Get all valid (non-expired) keys. 

249 

250 Returns: 

251 List of valid cache keys 

252 """ 

253 with self._lock: 

254 current_time = time.time() 

255 valid_keys = [ 

256 key 

257 for key, entry in self._cache.items() 

258 if current_time - entry.timestamp <= entry.ttl 

259 ] 

260 

261 return valid_keys 

262 

263 def __len__(self) -> int: 

264 """Get number of entries in cache.""" 

265 return len(self._cache) 

266 

267 def __contains__(self, key: str) -> bool: 

268 """Check if key is in cache (supports 'in' operator).""" 

269 return self.contains(key)