Coverage for jinja2_async_environment/caching/strategies.py: 20%

220 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 14:09 -0700

1"""Advanced cache strategies and eviction policies.""" 

2 

3import time 

4import typing as t 

5from dataclasses import dataclass 

6from threading import RLock 

7 

8from .typed import CacheEntry, TypedCache 

9 

10T = t.TypeVar("T") 

11 

12 

13@dataclass 

14class CacheStatistics: 

15 """Extended cache statistics for monitoring and optimization.""" 

16 

17 hits: int = 0 

18 misses: int = 0 

19 evictions: int = 0 

20 memory_pressure_evictions: int = 0 

21 ttl_evictions: int = 0 

22 avg_access_time: float = 0.0 

23 peak_size: int = 0 

24 cache_efficiency: float = 0.0 

25 

26 

27class LFUCache(TypedCache[T]): 

28 """Least Frequently Used cache with advanced statistics.""" 

29 

30 def __init__(self, max_size: int = 1000, default_ttl: int = 300): 

31 super().__init__(max_size, default_ttl) 

32 self._access_counts: dict[str, int] = {} 

33 self._statistics = CacheStatistics() 

34 

35 def get(self, key: str) -> T | None: 

36 with self._lock: 

37 start_time = time.time() 

38 

39 if key not in self._cache: 

40 self._statistics.misses += 1 

41 return None 

42 

43 entry = self._cache[key] 

44 current_time = time.time() 

45 

46 # Check TTL 

47 if current_time - entry.timestamp > entry.ttl: 

48 del self._cache[key] 

49 self._access_counts.pop(key, None) 

50 if key in self._access_order: 

51 self._access_order.remove(key) 

52 self._statistics.misses += 1 

53 self._statistics.ttl_evictions += 1 

54 return None 

55 

56 # Update LFU tracking 

57 self._access_counts[key] = self._access_counts.get(key, 0) + 1 

58 entry.access_count += 1 

59 entry.last_access = current_time 

60 

61 # Update LRU order 

62 if key in self._access_order: 

63 self._access_order.remove(key) 

64 self._access_order.append(key) 

65 

66 self._statistics.hits += 1 

67 access_time = time.time() - start_time 

68 self._update_avg_access_time(access_time) 

69 

70 return entry.value 

71 

72 def set(self, key: str, value: T, ttl: int | None = None) -> None: 

73 with self._lock: 

74 current_time = time.time() 

75 

76 # Evict if needed 

77 if len(self._cache) >= self._max_size and key not in self._cache: 

78 self._evict_lfu() 

79 

80 # Create entry 

81 entry = CacheEntry( 

82 value=value, 

83 timestamp=current_time, 

84 ttl=ttl or self._default_ttl, 

85 access_count=1, 

86 last_access=current_time, 

87 ) 

88 

89 self._cache[key] = entry 

90 self._access_counts[key] = 1 

91 

92 # Update LRU order 

93 if key in self._access_order: 

94 self._access_order.remove(key) 

95 self._access_order.append(key) 

96 

97 # Update statistics 

98 if len(self._cache) > self._statistics.peak_size: 

99 self._statistics.peak_size = len(self._cache) 

100 

101 def _evict_lfu(self) -> None: 

102 """Evict least frequently used entries.""" 

103 if not self._cache: 

104 return 

105 

106 # Find LFU key 

107 lfu_key = min(self._access_counts.keys(), key=lambda k: self._access_counts[k]) 

108 

109 # Remove from all structures 

110 self._cache.pop(lfu_key, None) 

111 self._access_counts.pop(lfu_key, None) 

112 if lfu_key in self._access_order: 

113 self._access_order.remove(lfu_key) 

114 

115 self._statistics.evictions += 1 

116 self._statistics.memory_pressure_evictions += 1 

117 

118 def _update_avg_access_time(self, access_time: float) -> None: 

119 """Update running average of access times.""" 

120 total_accesses = self._statistics.hits + self._statistics.misses 

121 if total_accesses == 1: 

122 self._statistics.avg_access_time = access_time 

123 else: 

124 # Exponential moving average 

125 alpha = 0.1 

126 self._statistics.avg_access_time = ( 

127 alpha * access_time + (1 - alpha) * self._statistics.avg_access_time 

128 ) 

129 

130 def get_extended_statistics(self) -> dict[str, t.Any]: 

131 """Get extended statistics for monitoring.""" 

132 base_stats = self.get_statistics() 

133 

134 total_requests = self._statistics.hits + self._statistics.misses 

135 self._statistics.cache_efficiency = ( 

136 self._statistics.hits / total_requests if total_requests > 0 else 0.0 

137 ) 

138 

139 return { 

140 **base_stats, 

141 "access_counts": dict(self._access_counts), 

142 "avg_access_time_ms": self._statistics.avg_access_time * 1000, 

143 "peak_size": self._statistics.peak_size, 

144 "cache_efficiency": self._statistics.cache_efficiency, 

145 "ttl_evictions": self._statistics.ttl_evictions, 

146 "memory_pressure_evictions": self._statistics.memory_pressure_evictions, 

147 } 

148 

149 

150class AdaptiveCache(TypedCache[T]): 

151 """Adaptive cache that switches between LRU and LFU based on access patterns.""" 

152 

153 def __init__(self, max_size: int = 1000, default_ttl: int = 300): 

154 super().__init__(max_size, default_ttl) 

155 self._access_patterns: dict[str, list[float]] = {} 

156 self._strategy: str = "lru" # "lru" or "lfu" 

157 self._strategy_switches = 0 

158 self._last_evaluation = time.time() 

159 self._evaluation_interval = 300 # 5 minutes 

160 

161 def get(self, key: str) -> T | None: 

162 with self._lock: 

163 # Periodically evaluate and potentially switch strategy 

164 if time.time() - self._last_evaluation > self._evaluation_interval: 

165 self._evaluate_strategy() 

166 

167 # Track access pattern 

168 current_time = time.time() 

169 if key not in self._access_patterns: 

170 self._access_patterns[key] = [] 

171 self._access_patterns[key].append(current_time) 

172 

173 # Keep only recent accesses (last hour) 

174 cutoff_time = current_time - 3600 

175 self._access_patterns[key] = [ 

176 t for t in self._access_patterns[key] if t > cutoff_time 

177 ] 

178 

179 return super().get(key) 

180 

181 def _evaluate_strategy(self) -> None: 

182 """Evaluate access patterns and potentially switch strategy.""" 

183 self._last_evaluation = time.time() 

184 

185 if not self._access_patterns: 

186 return 

187 

188 # Calculate access frequency distribution 

189 frequencies = [] 

190 for accesses in self._access_patterns.values(): 

191 frequencies.append(len(accesses)) 

192 

193 if not frequencies: 

194 return 

195 

196 # Calculate coefficient of variation 

197 mean_freq = sum(frequencies) / len(frequencies) 

198 if mean_freq == 0: 

199 return 

200 

201 variance = sum((f - mean_freq) ** 2 for f in frequencies) / len(frequencies) 

202 std_dev = variance**0.5 

203 cv = std_dev / mean_freq 

204 

205 # Switch strategy based on access pattern uniformity 

206 # High CV (> 0.5) suggests some items are accessed much more frequently → LFU 

207 # Low CV (< 0.3) suggests relatively uniform access → LRU 

208 old_strategy = self._strategy 

209 if cv > 0.5 and self._strategy == "lru": 

210 self._strategy = "lfu" 

211 self._strategy_switches += 1 

212 elif cv < 0.3 and self._strategy == "lfu": 

213 self._strategy = "lru" 

214 self._strategy_switches += 1 

215 

216 if old_strategy != self._strategy: 

217 # Clear old tracking data when switching 

218 self._access_patterns.clear() 

219 

220 def _evict_lru(self) -> None: 

221 """Evict using current strategy.""" 

222 if self._strategy == "lfu": 

223 self._evict_lfu_adaptive() 

224 else: 

225 super()._evict_lru() 

226 

227 def _evict_lfu_adaptive(self) -> None: 

228 """Evict least frequently used entry.""" 

229 if not self._access_patterns: 

230 super()._evict_lru() 

231 return 

232 

233 # Find key with lowest access frequency 

234 lfu_key = min( 

235 self._access_patterns.keys(), 

236 key=lambda k: len(self._access_patterns.get(k, [])), 

237 ) 

238 

239 if lfu_key in self._cache: 

240 del self._cache[lfu_key] 

241 if lfu_key in self._access_order: 

242 self._access_order.remove(lfu_key) 

243 self._evictions += 1 

244 

245 self._access_patterns.pop(lfu_key, None) 

246 

247 def get_strategy_info(self) -> dict[str, t.Any]: 

248 """Get information about current strategy and switches.""" 

249 return { 

250 "current_strategy": self._strategy, 

251 "strategy_switches": self._strategy_switches, 

252 "last_evaluation": self._last_evaluation, 

253 "access_pattern_keys": len(self._access_patterns), 

254 } 

255 

256 

257class HierarchicalCache[T]: 

258 """Multi-level cache with different strategies per level.""" 

259 

260 def __init__( 

261 self, 

262 l1_size: int = 100, 

263 l2_size: int = 1000, 

264 l1_ttl: int = 60, 

265 l2_ttl: int = 300, 

266 ): 

267 """Initialize hierarchical cache. 

268 

269 Args: 

270 l1_size: Size of L1 (fastest) cache 

271 l2_size: Size of L2 (larger) cache 

272 l1_ttl: TTL for L1 cache entries 

273 l2_ttl: TTL for L2 cache entries 

274 """ 

275 self.l1_cache = TypedCache[T](max_size=l1_size, default_ttl=l1_ttl) 

276 self.l2_cache = LFUCache[T](max_size=l2_size, default_ttl=l2_ttl) 

277 self._lock = RLock() 

278 

279 # Statistics 

280 self.l1_hits = 0 

281 self.l2_hits = 0 

282 self.total_misses = 0 

283 self.promotions = 0 # L2 → L1 promotions 

284 

285 def get(self, key: str) -> T | None: 

286 """Get value from hierarchical cache.""" 

287 with self._lock: 

288 # Try L1 first 

289 value = self.l1_cache.get(key) 

290 if value is not None: 

291 self.l1_hits += 1 

292 return value 

293 

294 # Try L2 

295 value = self.l2_cache.get(key) 

296 if value is not None: 

297 self.l2_hits += 1 

298 # Promote frequently accessed items to L1 

299 self._consider_promotion(key, value) 

300 return value 

301 

302 # Miss in both levels 

303 self.total_misses += 1 

304 return None 

305 

306 def set(self, key: str, value: T, ttl: int | None = None) -> None: 

307 """Set value in hierarchical cache.""" 

308 with self._lock: 

309 # Always start in L2, promote to L1 based on access patterns 

310 self.l2_cache.set(key, value, ttl) 

311 

312 def _consider_promotion(self, key: str, value: T) -> None: 

313 """Consider promoting item from L2 to L1.""" 

314 # Get access count from L2 

315 l2_entry = self.l2_cache._cache.get(key) 

316 if l2_entry and l2_entry.access_count >= 3: # Promote after 3 accesses 

317 # Promote to L1 

318 self.l1_cache.set(key, value, l2_entry.ttl) 

319 self.promotions += 1 

320 

321 def clear(self) -> None: 

322 """Clear all cache levels.""" 

323 with self._lock: 

324 self.l1_cache.clear() 

325 self.l2_cache.clear() 

326 self.l1_hits = 0 

327 self.l2_hits = 0 

328 self.total_misses = 0 

329 self.promotions = 0 

330 

331 def get_statistics(self) -> dict[str, t.Any]: 

332 """Get comprehensive statistics for all cache levels.""" 

333 total_requests = self.l1_hits + self.l2_hits + self.total_misses 

334 

335 return { 

336 "l1_cache": self.l1_cache.get_statistics(), 

337 "l2_cache": self.l2_cache.get_extended_statistics(), 

338 "l1_hits": self.l1_hits, 

339 "l2_hits": self.l2_hits, 

340 "total_misses": self.total_misses, 

341 "promotions": self.promotions, 

342 "overall_hit_rate": ( 

343 (self.l1_hits + self.l2_hits) / total_requests 

344 if total_requests > 0 

345 else 0.0 

346 ), 

347 "l1_hit_rate": ( 

348 self.l1_hits / total_requests if total_requests > 0 else 0.0 

349 ), 

350 "size": len(self.l1_cache) + len(self.l2_cache), 

351 "max_size": self.l1_cache._max_size + self.l2_cache._max_size, 

352 "hits": self.l1_hits + self.l2_hits, 

353 "misses": self.total_misses, 

354 "evictions": self.l1_cache._evictions + self.l2_cache._evictions, 

355 "hit_rate": ( 

356 (self.l1_hits + self.l2_hits) / total_requests 

357 if total_requests > 0 

358 else 0.0 

359 ), 

360 "fill_ratio": ( 

361 (len(self.l1_cache) + len(self.l2_cache)) 

362 / (self.l1_cache._max_size + self.l2_cache._max_size) 

363 ), 

364 } 

365 

366 def __len__(self) -> int: 

367 """Get total number of entries across all cache levels.""" 

368 return len(self.l1_cache) + len(self.l2_cache) 

369 

370 

371class CacheWarmer: 

372 """Utility for warming caches with commonly used templates.""" 

373 

374 def __init__(self, cache_manager: t.Any): 

375 """Initialize cache warmer. 

376 

377 Args: 

378 cache_manager: Cache manager to warm 

379 """ 

380 self.cache_manager = cache_manager 

381 self._warmed_keys: set[str] = set() 

382 

383 async def warm_template_cache( 

384 self, 

385 environment: t.Any, 

386 template_names: list[str], 

387 context_data: dict[str, t.Any] | None = None, 

388 ) -> dict[str, bool]: 

389 """Warm template cache by pre-loading common templates. 

390 

391 Args: 

392 environment: AsyncEnvironment instance 

393 template_names: List of template names to warm 

394 context_data: Optional context data for compilation 

395 

396 Returns: 

397 Dictionary mapping template names to success status 

398 """ 

399 results = {} 

400 context_data = context_data or {} 

401 

402 for template_name in template_names: 

403 try: 

404 # Pre-load template to warm the cache 

405 template = await environment.get_template_async(template_name) 

406 

407 # Also warm compilation cache if context provided 

408 if context_data: 

409 await template.render_async(**context_data) 

410 

411 self._warmed_keys.add(template_name) 

412 results[template_name] = True 

413 

414 except Exception: 

415 results[template_name] = False 

416 

417 return results 

418 

419 def warm_package_cache(self, package_names: list[str]) -> dict[str, bool]: 

420 """Warm package cache by pre-loading package specs. 

421 

422 Args: 

423 package_names: List of package names to warm 

424 

425 Returns: 

426 Dictionary mapping package names to success status 

427 """ 

428 results = {} 

429 

430 for package_name in package_names: 

431 try: 

432 import importlib.util 

433 

434 spec = importlib.util.find_spec(package_name) 

435 if spec and spec.loader: 

436 # Cache the package spec 

437 cache_key = f"spec:{package_name}" 

438 self.cache_manager.set("package", cache_key, (spec.loader, spec)) 

439 self._warmed_keys.add(cache_key) 

440 results[package_name] = True 

441 else: 

442 results[package_name] = False 

443 

444 except Exception: 

445 results[package_name] = False 

446 

447 return results 

448 

449 def get_warmed_keys(self) -> set[str]: 

450 """Get set of keys that have been warmed.""" 

451 return self._warmed_keys.copy() 

452 

453 def clear_warmed_tracking(self) -> None: 

454 """Clear the tracking of warmed keys.""" 

455 self._warmed_keys.clear()