Coverage for jinja2_async_environment / caching / strategies.py: 66%

241 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 21:26 -0800

1"""Advanced cache strategies and eviction policies.""" 

2 

3import time 

4import typing as t 

5from dataclasses import dataclass 

6from threading import RLock 

7 

8from .typed import CacheEntry, TypedCache 

9 

10T = t.TypeVar("T") 

11 

12 

13@dataclass 

14class CacheStatistics: 

15 """Extended cache statistics for monitoring and optimization.""" 

16 

17 hits: int = 0 

18 misses: int = 0 

19 evictions: int = 0 

20 memory_pressure_evictions: int = 0 

21 ttl_evictions: int = 0 

22 avg_access_time: float = 0.0 

23 peak_size: int = 0 

24 cache_efficiency: float = 0.0 

25 

26 

27class LFUCache(TypedCache[T]): 

28 """Least Frequently Used cache with advanced statistics.""" 

29 

30 def __init__(self, max_size: int = 1000, default_ttl: int = 300): 

31 super().__init__(max_size, default_ttl) 

32 self._access_counts: dict[str, int] = {} 

33 self._statistics = CacheStatistics() 

34 

35 def get(self, key: str) -> T | None: 

36 with self._lock: 

37 start_time = time.time() 

38 

39 if key not in self._cache: 

40 self._statistics.misses += 1 

41 return None 

42 

43 entry = self._cache[key] 

44 current_time = time.time() 

45 

46 # Check TTL 

47 if current_time - entry.timestamp > entry.ttl: 

48 del self._cache[key] 

49 self._access_counts.pop(key, None) 

50 if key in self._access_order: 

51 self._access_order.remove(key) 

52 self._statistics.misses += 1 

53 self._statistics.ttl_evictions += 1 

54 return None 

55 

56 # Update LFU tracking 

57 self._access_counts[key] = self._access_counts.get(key, 0) + 1 

58 entry.access_count += 1 

59 entry.last_access = current_time 

60 

61 # Update LRU order 

62 if key in self._access_order: 

63 self._access_order.remove(key) 

64 self._access_order.append(key) 

65 

66 self._statistics.hits += 1 

67 access_time = time.time() - start_time 

68 self._update_avg_access_time(access_time) 

69 

70 return entry.value 

71 

72 def set(self, key: str, value: T, ttl: int | None = None) -> None: 

73 with self._lock: 

74 current_time = time.time() 

75 

76 # Evict if needed 

77 if len(self._cache) >= self._max_size and key not in self._cache: 

78 self._evict_lfu() 

79 

80 # Create entry 

81 entry = CacheEntry( 

82 value=value, 

83 timestamp=current_time, 

84 ttl=ttl or self._default_ttl, 

85 access_count=1, 

86 last_access=current_time, 

87 ) 

88 

89 self._cache[key] = entry 

90 self._access_counts[key] = 1 

91 

92 # Update LRU order 

93 if key in self._access_order: 

94 self._access_order.remove(key) 

95 self._access_order.append(key) 

96 

97 # Update statistics 

98 if len(self._cache) > self._statistics.peak_size: 

99 self._statistics.peak_size = len(self._cache) 

100 

101 def _evict_lfu(self) -> None: 

102 """Evict least frequently used entries.""" 

103 if not self._cache: 

104 return 

105 

106 # Find LFU key 

107 lfu_key = min(self._access_counts.keys(), key=lambda k: self._access_counts[k]) 

108 

109 # Remove from all structures 

110 self._cache.pop(lfu_key, None) 

111 self._access_counts.pop(lfu_key, None) 

112 if lfu_key in self._access_order: 

113 self._access_order.remove(lfu_key) 

114 

115 self._statistics.evictions += 1 

116 self._statistics.memory_pressure_evictions += 1 

117 

118 def _update_avg_access_time(self, access_time: float) -> None: 

119 """Update running average of access times.""" 

120 total_accesses = self._statistics.hits + self._statistics.misses 

121 if total_accesses == 1: 

122 self._statistics.avg_access_time = access_time 

123 else: 

124 # Exponential moving average 

125 alpha = 0.1 

126 self._statistics.avg_access_time = ( 

127 alpha * access_time + (1 - alpha) * self._statistics.avg_access_time 

128 ) 

129 

130 def get_extended_statistics(self) -> dict[str, t.Any]: 

131 """Get extended statistics for monitoring.""" 

132 base_stats = self.get_statistics() 

133 

134 total_requests = self._statistics.hits + self._statistics.misses 

135 self._statistics.cache_efficiency = ( 

136 self._statistics.hits / total_requests if total_requests > 0 else 0.0 

137 ) 

138 

139 return base_stats | { 

140 "access_counts": self._access_counts.copy(), 

141 "avg_access_time_ms": self._statistics.avg_access_time * 1000, 

142 "peak_size": self._statistics.peak_size, 

143 "cache_efficiency": self._statistics.cache_efficiency, 

144 "ttl_evictions": self._statistics.ttl_evictions, 

145 "memory_pressure_evictions": self._statistics.memory_pressure_evictions, 

146 } 

147 

148 

149class AdaptiveCache(TypedCache[T]): 

150 """Adaptive cache that switches between LRU and LFU based on access patterns.""" 

151 

152 def __init__(self, max_size: int = 1000, default_ttl: int = 300): 

153 super().__init__(max_size, default_ttl) 

154 self._access_patterns: dict[str, list[float]] = {} 

155 self._strategy: str = "lru" # "lru" or "lfu" 

156 self._strategy_switches = 0 

157 self._last_evaluation = time.time() 

158 self._evaluation_interval = 300 # 5 minutes 

159 

160 def get(self, key: str) -> T | None: 

161 with self._lock: 

162 # Periodically evaluate and potentially switch strategy 

163 if time.time() - self._last_evaluation > self._evaluation_interval: 

164 self._evaluate_strategy() 

165 

166 # Track access pattern 

167 current_time = time.time() 

168 if key not in self._access_patterns: 

169 self._access_patterns[key] = [] 

170 self._access_patterns[key].append(current_time) 

171 

172 # Keep only recent accesses (last hour) 

173 cutoff_time = current_time - 3600 

174 self._access_patterns[key] = [ 

175 t for t in self._access_patterns[key] if t > cutoff_time 

176 ] 

177 

178 return super().get(key) 

179 

180 def _evaluate_strategy(self) -> None: 

181 """Evaluate access patterns and potentially switch strategy.""" 

182 self._last_evaluation = time.time() 

183 

184 if not self._access_patterns: 

185 return 

186 

187 # Calculate access frequency distribution 

188 frequencies = [len(accesses) for accesses in self._access_patterns.values()] 

189 

190 if not frequencies: 

191 return 

192 

193 # Calculate coefficient of variation 

194 mean_freq = sum(frequencies) / len(frequencies) 

195 if mean_freq == 0: 

196 return 

197 

198 variance = sum((f - mean_freq) ** 2 for f in frequencies) / len(frequencies) 

199 std_dev = variance**0.5 

200 cv = std_dev / mean_freq 

201 

202 # Switch strategy based on access pattern uniformity 

203 # High CV (> 0.5) suggests some items are accessed much more frequently → LFU 

204 # Low CV (< 0.3) suggests relatively uniform access → LRU 

205 old_strategy = self._strategy 

206 if cv > 0.5 and self._strategy == "lru": 

207 self._strategy = "lfu" 

208 self._strategy_switches += 1 

209 elif cv < 0.3 and self._strategy == "lfu": 

210 self._strategy = "lru" 

211 self._strategy_switches += 1 

212 

213 if old_strategy != self._strategy: 

214 # Clear old tracking data when switching 

215 self._access_patterns.clear() 

216 

217 def _evict_lru(self) -> None: 

218 """Evict using current strategy.""" 

219 if self._strategy == "lfu": 

220 self._evict_lfu_adaptive() 

221 else: 

222 super()._evict_lru() 

223 

224 def _evict_lfu_adaptive(self) -> None: 

225 """Evict least frequently used entry.""" 

226 if not self._access_patterns: 

227 super()._evict_lru() 

228 return 

229 

230 # Find key with lowest access frequency 

231 lfu_key = min( 

232 self._access_patterns.keys(), 

233 key=lambda k: len(self._access_patterns.get(k, [])), 

234 ) 

235 

236 if lfu_key in self._cache: 

237 del self._cache[lfu_key] 

238 if lfu_key in self._access_order: 

239 self._access_order.remove(lfu_key) 

240 self._evictions += 1 

241 

242 self._access_patterns.pop(lfu_key, None) 

243 

244 def get_strategy_info(self) -> dict[str, t.Any]: 

245 """Get information about current strategy and switches.""" 

246 return { 

247 "current_strategy": self._strategy, 

248 "strategy_switches": self._strategy_switches, 

249 "last_evaluation": self._last_evaluation, 

250 "access_pattern_keys": len(self._access_patterns), 

251 } 

252 

253 

254class HierarchicalCache[T]: 

255 """Multi-level cache with different strategies per level.""" 

256 

257 def __init__( 

258 self, 

259 l1_size: int = 100, 

260 l2_size: int = 1000, 

261 l1_ttl: int = 60, 

262 l2_ttl: int = 300, 

263 ): 

264 """Initialize hierarchical cache. 

265 

266 Args: 

267 l1_size: Size of L1 (fastest) cache 

268 l2_size: Size of L2 (larger) cache 

269 l1_ttl: TTL for L1 cache entries 

270 l2_ttl: TTL for L2 cache entries 

271 """ 

272 self.l1_cache = TypedCache[T](max_size=l1_size, default_ttl=l1_ttl) 

273 self.l2_cache = LFUCache[T](max_size=l2_size, default_ttl=l2_ttl) 

274 self._lock = RLock() 

275 

276 # Statistics 

277 self.l1_hits = 0 

278 self.l2_hits = 0 

279 self.total_misses = 0 

280 self.promotions = 0 # L2 → L1 promotions 

281 

282 @property 

283 def _default_ttl(self) -> int: 

284 """Get the default TTL for this cache.""" 

285 return self.l2_cache._default_ttl 

286 

287 @_default_ttl.setter 

288 def _default_ttl(self, value: int) -> None: 

289 """Set the default TTL for this cache.""" 

290 self.l1_cache._default_ttl = value // 5 # L1 TTL is 1/5 of L2 

291 self.l2_cache._default_ttl = value 

292 

293 def get(self, key: str) -> T | None: 

294 """Get value from hierarchical cache.""" 

295 with self._lock: 

296 # Try L1 first 

297 value = self.l1_cache.get(key) 

298 if value is not None: 

299 self.l1_hits += 1 

300 return value 

301 

302 # Try L2 

303 value = self.l2_cache.get(key) 

304 if value is not None: 

305 self.l2_hits += 1 

306 # Promote frequently accessed items to L1 

307 self._consider_promotion(key, value) 

308 return value 

309 

310 # Miss in both levels 

311 self.total_misses += 1 

312 return None 

313 

314 def set(self, key: str, value: T, ttl: int | None = None) -> None: 

315 """Set value in hierarchical cache.""" 

316 with self._lock: 

317 # Always start in L2, promote to L1 based on access patterns 

318 self.l2_cache.set(key, value, ttl) 

319 

320 def _consider_promotion(self, key: str, value: T) -> None: 

321 """Consider promoting item from L2 to L1.""" 

322 # Get access count from L2 

323 l2_entry = self.l2_cache._cache.get(key) 

324 if l2_entry and l2_entry.access_count >= 3: # Promote after 3 accesses 

325 # Promote to L1 

326 self.l1_cache.set(key, value, l2_entry.ttl) 

327 self.promotions += 1 

328 

329 def clear(self) -> None: 

330 """Clear all cache levels.""" 

331 with self._lock: 

332 self.l1_cache.clear() 

333 self.l2_cache.clear() 

334 self.l1_hits = 0 

335 self.l2_hits = 0 

336 self.total_misses = 0 

337 self.promotions = 0 

338 

339 def cleanup_expired(self) -> int: 

340 """Clean up expired entries from both cache levels. 

341 

342 Returns: 

343 Number of expired entries removed 

344 """ 

345 with self._lock: 

346 l1_expired = self.l1_cache.cleanup_expired() 

347 l2_expired = self.l2_cache.cleanup_expired() 

348 return l1_expired + l2_expired 

349 

350 def resize(self, new_max_size: int) -> None: 

351 """Resize the cache maximum size. 

352 

353 For hierarchical cache, we split the size between L1 and L2. 

354 

355 Args: 

356 new_max_size: New maximum size for the cache 

357 """ 

358 with self._lock: 

359 # Split size between L1 and L2 (80% to L2, 20% to L1) 

360 l1_size = max(1, new_max_size // 5) 

361 l2_size = new_max_size - l1_size 

362 self.l1_cache.resize(l1_size) 

363 self.l2_cache.resize(l2_size) 

364 

365 def delete(self, key: str) -> bool: 

366 """Delete a key from the cache. 

367 

368 Args: 

369 key: Cache key to delete 

370 

371 Returns: 

372 True if key was deleted, False if not found 

373 """ 

374 with self._lock: 

375 # Try to delete from both caches 

376 l1_deleted = self.l1_cache.delete(key) 

377 l2_deleted = self.l2_cache.delete(key) 

378 return l1_deleted or l2_deleted 

379 

380 def get_statistics(self) -> dict[str, t.Any]: 

381 """Get comprehensive statistics for all cache levels.""" 

382 total_requests = self.l1_hits + self.l2_hits + self.total_misses 

383 

384 return { 

385 "l1_cache": self.l1_cache.get_statistics(), 

386 "l2_cache": self.l2_cache.get_extended_statistics(), 

387 "l1_hits": self.l1_hits, 

388 "l2_hits": self.l2_hits, 

389 "total_misses": self.total_misses, 

390 "promotions": self.promotions, 

391 "overall_hit_rate": ( 

392 (self.l1_hits + self.l2_hits) / total_requests 

393 if total_requests > 0 

394 else 0.0 

395 ), 

396 "l1_hit_rate": ( 

397 self.l1_hits / total_requests if total_requests > 0 else 0.0 

398 ), 

399 "size": len(self.l1_cache) + len(self.l2_cache), 

400 "max_size": self.l1_cache._max_size + self.l2_cache._max_size, 

401 "hits": self.l1_hits + self.l2_hits, 

402 "misses": self.total_misses, 

403 "evictions": self.l1_cache._evictions + self.l2_cache._evictions, 

404 "hit_rate": ( 

405 (self.l1_hits + self.l2_hits) / total_requests 

406 if total_requests > 0 

407 else 0.0 

408 ), 

409 "fill_ratio": ( 

410 (len(self.l1_cache) + len(self.l2_cache)) 

411 / (self.l1_cache._max_size + self.l2_cache._max_size) 

412 ), 

413 } 

414 

415 def __len__(self) -> int: 

416 """Get total number of entries across all cache levels.""" 

417 return len(self.l1_cache) + len(self.l2_cache) 

418 

419 

420class CacheWarmer: 

421 """Utility for warming caches with commonly used templates.""" 

422 

423 def __init__(self, cache_manager: t.Any): 

424 """Initialize cache warmer. 

425 

426 Args: 

427 cache_manager: Cache manager to warm 

428 """ 

429 self.cache_manager = cache_manager 

430 self._warmed_keys: set[str] = set() 

431 

432 async def warm_template_cache( 

433 self, 

434 environment: t.Any, 

435 template_names: list[str], 

436 context_data: dict[str, t.Any] | None = None, 

437 ) -> dict[str, bool]: 

438 """Warm template cache by pre-loading common templates. 

439 

440 Args: 

441 environment: AsyncEnvironment instance 

442 template_names: List of template names to warm 

443 context_data: Optional context data for compilation 

444 

445 Returns: 

446 Dictionary mapping template names to success status 

447 """ 

448 results = {} 

449 context_data = context_data or {} 

450 

451 for template_name in template_names: 

452 try: 

453 # Pre-load template to warm the cache 

454 template = await environment.get_template_async(template_name) 

455 

456 # Also warm compilation cache if context provided 

457 if context_data: 

458 await template.render_async(**context_data) 

459 

460 self._warmed_keys.add(template_name) 

461 results[template_name] = True 

462 

463 except Exception: 

464 results[template_name] = False 

465 

466 return results 

467 

468 def warm_package_cache(self, package_names: list[str]) -> dict[str, bool]: 

469 """Warm package cache by pre-loading package specs. 

470 

471 Args: 

472 package_names: List of package names to warm 

473 

474 Returns: 

475 Dictionary mapping package names to success status 

476 """ 

477 results = {} 

478 

479 for package_name in package_names: 

480 try: 

481 import importlib.util 

482 

483 spec = importlib.util.find_spec(package_name) 

484 if spec and spec.loader: 

485 # Cache the package spec 

486 cache_key = f"spec:{package_name}" 

487 self.cache_manager.set("package", cache_key, (spec.loader, spec)) 

488 self._warmed_keys.add(cache_key) 

489 results[package_name] = True 

490 else: 

491 results[package_name] = False 

492 

493 except Exception: 

494 results[package_name] = False 

495 

496 return results 

497 

498 def get_warmed_keys(self) -> set[str]: 

499 """Get set of keys that have been warmed.""" 

500 return self._warmed_keys.copy() 

501 

502 def clear_warmed_tracking(self) -> None: 

503 """Clear the tracking of warmed keys.""" 

504 self._warmed_keys.clear()