Coverage for jinja2_async_environment/bccache.py: 100%
57 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
1import typing as t
2from types import MappingProxyType
4from jinja2 import Environment
5from jinja2.bccache import Bucket, BytecodeCache
6from redis.asyncio import Redis, RedisCluster
9class AsyncBytecodeCache(BytecodeCache):
10 def load_bytecode(self, bucket: Bucket) -> None:
11 raise NotImplementedError(
12 "AsyncBytecodeCache.load_bytecode must be implemented."
13 )
15 def dump_bytecode(self, bucket: Bucket) -> None:
16 raise NotImplementedError(
17 "AsyncBytecodeCache.dump_bytecode must be implemented."
18 )
20 def clear(self) -> None:
21 raise NotImplementedError("AsyncBytecodeCache.clear must be implemented.")
23 def get_cache_key(
24 self, name: str, filename: str | None = None, env: Environment | None = None
25 ) -> str:
26 return f"{name}:{filename or name}"
28 def get_source_checksum(self, source: str) -> str:
29 import hashlib
31 return hashlib.md5(source.encode(), usedforsecurity=False).hexdigest()
33 async def get_bucket_async(
34 self, environment: Environment, name: str, filename: str | None, source: str
35 ) -> Bucket:
36 key = self.get_cache_key(name, filename, environment)
37 checksum = self.get_source_checksum(source)
38 bucket = Bucket(environment, key, checksum)
39 self.load_bytecode(bucket)
40 return bucket
42 async def set_bucket_async(self, bucket: Bucket) -> None:
43 self.dump_bytecode(bucket)
46class AsyncRedisBytecodeCache(AsyncBytecodeCache):
47 prefix: str | None
48 client: Redis | RedisCluster
49 configs: MappingProxyType[str, t.Any]
51 def __init__(
52 self,
53 prefix: str | None = None,
54 client: Redis | RedisCluster | None = None,
55 **configs: t.Any,
56 ) -> None:
57 self.prefix = prefix
58 self.client = client or Redis(**configs)
59 self.configs = MappingProxyType(configs)
61 def get_cache_key(
62 self, name: str, filename: str | None = None, env: Environment | None = None
63 ) -> str:
64 return filename or name
66 def get_source_checksum(self, source: str) -> str:
67 return str(hash(source))
69 def get_bucket_name(self, key: str) -> str:
70 return f"{self.prefix}:{key}" if self.prefix else key
72 async def load_bytecode(self, bucket: Bucket) -> bytes | None: # type: ignore[override]
73 code = await self.client.get(self.get_bucket_name(bucket.key))
74 if code:
75 code_bytes = code.encode("utf-8") if isinstance(code, str) else code
76 bucket.bytecode_from_string(code_bytes)
77 return code_bytes
78 return None
80 async def dump_bytecode(self, bucket: Bucket) -> None: # type: ignore[override]
81 await self.client.set(
82 self.get_bucket_name(bucket.key), bucket.bytecode_to_string()
83 )
85 async def get_bucket( # type: ignore[override]
86 self, environment: Environment, name: str, filename: str | None, source: str
87 ) -> Bucket:
88 key = self.get_cache_key(name, filename or name)
89 checksum = self.get_source_checksum(source)
90 bucket = Bucket(environment, key, checksum)
91 await self.load_bytecode(bucket)
92 return bucket
94 async def set_bucket(self, bucket: Bucket) -> None: # type: ignore[override]
95 await self.dump_bytecode(bucket)
97 async def get_bucket_async(
98 self, environment: Environment, name: str, filename: str | None, source: str
99 ) -> Bucket:
100 return await self.get_bucket(environment, name, filename, source)
102 async def set_bucket_async(self, bucket: Bucket) -> None:
103 return await self.set_bucket(bucket)