Coverage for jinja2_async_environment/caching/typed.py: 44%

124 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 14:09 -0700

1"""Type-safe cache implementation with generic support.""" 

2 

3import time 

4import typing as t 

5from dataclasses import dataclass 

6from threading import RLock 

7 

8T = t.TypeVar("T") 

9 

10 

11@dataclass 

12class CacheEntry[T]: 

13 """Cache entry with value, timestamp, and TTL information.""" 

14 

15 value: T 

16 timestamp: float 

17 ttl: int 

18 access_count: int = 0 

19 last_access: float = 0.0 

20 

21 

22class TypedCache[T]: 

23 """Type-safe cache with TTL, LRU eviction, and memory management. 

24 

25 This cache provides: 

26 - Type safety through generics 

27 - TTL-based expiration 

28 - LRU eviction when cache is full 

29 - Thread-safe operations 

30 - Memory usage tracking 

31 """ 

32 

33 def __init__(self, max_size: int = 1000, default_ttl: int = 300): 

34 """Initialize the typed cache. 

35 

36 Args: 

37 max_size: Maximum number of entries to store 

38 default_ttl: Default time-to-live in seconds 

39 """ 

40 self._cache: dict[str, CacheEntry[T]] = {} 

41 self._max_size = max_size 

42 self._default_ttl = default_ttl 

43 self._access_order: list[str] = [] 

44 self._lock = RLock() 

45 

46 # Statistics 

47 self._hits = 0 

48 self._misses = 0 

49 self._evictions = 0 

50 

51 def get(self, key: str) -> T | None: 

52 """Get value from cache with TTL and LRU tracking. 

53 

54 Args: 

55 key: Cache key 

56 

57 Returns: 

58 Cached value or None if not found/expired 

59 """ 

60 with self._lock: 

61 if key not in self._cache: 

62 self._misses += 1 

63 return None 

64 

65 entry = self._cache[key] 

66 current_time = time.time() 

67 

68 # Check TTL 

69 if current_time - entry.timestamp > entry.ttl: 

70 del self._cache[key] 

71 if key in self._access_order: 

72 self._access_order.remove(key) 

73 self._misses += 1 

74 return None 

75 

76 # Update access tracking 

77 entry.access_count += 1 

78 entry.last_access = current_time 

79 

80 # Update LRU order 

81 if key in self._access_order: 

82 self._access_order.remove(key) 

83 self._access_order.append(key) 

84 

85 self._hits += 1 

86 return entry.value 

87 

88 def set(self, key: str, value: T, ttl: int | None = None) -> None: 

89 """Store value in cache with automatic eviction. 

90 

91 Args: 

92 key: Cache key 

93 value: Value to store 

94 ttl: Time-to-live in seconds (uses default if None) 

95 """ 

96 with self._lock: 

97 current_time = time.time() 

98 

99 # Check if we need to evict entries 

100 if len(self._cache) >= self._max_size and key not in self._cache: 

101 self._evict_lru() 

102 

103 # Create cache entry 

104 entry = CacheEntry( 

105 value=value, 

106 timestamp=current_time, 

107 ttl=ttl or self._default_ttl, 

108 access_count=1, 

109 last_access=current_time, 

110 ) 

111 

112 self._cache[key] = entry 

113 

114 # Update access order 

115 if key in self._access_order: 

116 self._access_order.remove(key) 

117 self._access_order.append(key) 

118 

119 def delete(self, key: str) -> bool: 

120 """Delete a key from the cache. 

121 

122 Args: 

123 key: Cache key to delete 

124 

125 Returns: 

126 True if key was deleted, False if not found 

127 """ 

128 with self._lock: 

129 if key in self._cache: 

130 del self._cache[key] 

131 if key in self._access_order: 

132 self._access_order.remove(key) 

133 return True 

134 return False 

135 

136 def clear(self) -> None: 

137 """Clear all entries from the cache.""" 

138 with self._lock: 

139 self._cache.clear() 

140 self._access_order.clear() 

141 self._hits = 0 

142 self._misses = 0 

143 self._evictions = 0 

144 

145 def _evict_lru(self) -> None: 

146 """Evict least recently used entries to make space.""" 

147 # Evict 25% of entries to avoid frequent evictions 

148 evict_count = max(1, self._max_size // 4) 

149 

150 for _ in range(min(evict_count, len(self._access_order))): 

151 if self._access_order: 

152 lru_key = self._access_order.pop(0) 

153 if lru_key in self._cache: 

154 del self._cache[lru_key] 

155 self._evictions += 1 

156 

157 def cleanup_expired(self) -> int: 

158 """Remove all expired entries. 

159 

160 Returns: 

161 Number of entries removed 

162 """ 

163 with self._lock: 

164 current_time = time.time() 

165 expired_keys = [] 

166 

167 for key, entry in self._cache.items(): 

168 if current_time - entry.timestamp > entry.ttl: 

169 expired_keys.append(key) 

170 

171 for key in expired_keys: 

172 del self._cache[key] 

173 if key in self._access_order: 

174 self._access_order.remove(key) 

175 

176 return len(expired_keys) 

177 

178 def get_statistics(self) -> dict[str, t.Any]: 

179 """Get cache statistics. 

180 

181 Returns: 

182 Dictionary with cache statistics 

183 """ 

184 with self._lock: 

185 total_requests = self._hits + self._misses 

186 hit_rate = self._hits / total_requests if total_requests > 0 else 0.0 

187 

188 return { 

189 "size": len(self._cache), 

190 "max_size": self._max_size, 

191 "hits": self._hits, 

192 "misses": self._misses, 

193 "evictions": self._evictions, 

194 "hit_rate": hit_rate, 

195 "fill_ratio": len(self._cache) / self._max_size, 

196 } 

197 

198 def resize(self, new_max_size: int) -> None: 

199 """Resize the cache maximum size. 

200 

201 Args: 

202 new_max_size: New maximum size for the cache 

203 """ 

204 with self._lock: 

205 self._max_size = new_max_size 

206 

207 # If new size is smaller, evict excess entries 

208 if new_max_size < len(self._cache): 

209 excess = len(self._cache) - new_max_size 

210 for _ in range(excess): 

211 if self._access_order: 

212 lru_key = self._access_order.pop(0) 

213 if lru_key in self._cache: 

214 del self._cache[lru_key] 

215 self._evictions += 1 

216 

217 def contains(self, key: str) -> bool: 

218 """Check if key exists in cache (without updating access). 

219 

220 Args: 

221 key: Cache key to check 

222 

223 Returns: 

224 True if key exists and not expired, False otherwise 

225 """ 

226 with self._lock: 

227 if key not in self._cache: 

228 return False 

229 

230 entry = self._cache[key] 

231 current_time = time.time() 

232 

233 # Check TTL without updating access 

234 if current_time - entry.timestamp > entry.ttl: 

235 return False 

236 

237 return True 

238 

239 def keys(self) -> list[str]: 

240 """Get all valid (non-expired) keys. 

241 

242 Returns: 

243 List of valid cache keys 

244 """ 

245 with self._lock: 

246 current_time = time.time() 

247 valid_keys = [] 

248 

249 for key, entry in self._cache.items(): 

250 if current_time - entry.timestamp <= entry.ttl: 

251 valid_keys.append(key) 

252 

253 return valid_keys 

254 

255 def __len__(self) -> int: 

256 """Get number of entries in cache.""" 

257 return len(self._cache) 

258 

259 def __contains__(self, key: str) -> bool: 

260 """Check if key is in cache (supports 'in' operator).""" 

261 return self.contains(key)