Coverage for jinja2_async_environment/bccache.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 14:09 -0700

1import typing as t 

2from types import MappingProxyType 

3 

4from jinja2 import Environment 

5from jinja2.bccache import Bucket, BytecodeCache 

6from redis.asyncio import Redis, RedisCluster 

7 

8 

9class AsyncBytecodeCache(BytecodeCache): 

10 def load_bytecode(self, bucket: Bucket) -> None: 

11 raise NotImplementedError( 

12 "AsyncBytecodeCache.load_bytecode must be implemented." 

13 ) 

14 

15 def dump_bytecode(self, bucket: Bucket) -> None: 

16 raise NotImplementedError( 

17 "AsyncBytecodeCache.dump_bytecode must be implemented." 

18 ) 

19 

20 def clear(self) -> None: 

21 raise NotImplementedError("AsyncBytecodeCache.clear must be implemented.") 

22 

23 def get_cache_key( 

24 self, name: str, filename: str | None = None, env: Environment | None = None 

25 ) -> str: 

26 return f"{name}:{filename or name}" 

27 

28 def get_source_checksum(self, source: str) -> str: 

29 import hashlib 

30 

31 return hashlib.md5(source.encode(), usedforsecurity=False).hexdigest() 

32 

33 async def get_bucket_async( 

34 self, environment: Environment, name: str, filename: str | None, source: str 

35 ) -> Bucket: 

36 key = self.get_cache_key(name, filename, environment) 

37 checksum = self.get_source_checksum(source) 

38 bucket = Bucket(environment, key, checksum) 

39 self.load_bytecode(bucket) 

40 return bucket 

41 

42 async def set_bucket_async(self, bucket: Bucket) -> None: 

43 self.dump_bytecode(bucket) 

44 

45 

46class AsyncRedisBytecodeCache(AsyncBytecodeCache): 

47 prefix: str | None 

48 client: Redis | RedisCluster 

49 configs: MappingProxyType[str, t.Any] 

50 

51 def __init__( 

52 self, 

53 prefix: str | None = None, 

54 client: Redis | RedisCluster | None = None, 

55 **configs: t.Any, 

56 ) -> None: 

57 self.prefix = prefix 

58 self.client = client or Redis(**configs) 

59 self.configs = MappingProxyType(configs) 

60 

61 def get_cache_key( 

62 self, name: str, filename: str | None = None, env: Environment | None = None 

63 ) -> str: 

64 return filename or name 

65 

66 def get_source_checksum(self, source: str) -> str: 

67 return str(hash(source)) 

68 

69 def get_bucket_name(self, key: str) -> str: 

70 return f"{self.prefix}:{key}" if self.prefix else key 

71 

72 async def load_bytecode(self, bucket: Bucket) -> bytes | None: # type: ignore[override] 

73 code = await self.client.get(self.get_bucket_name(bucket.key)) 

74 if code: 

75 code_bytes = code.encode("utf-8") if isinstance(code, str) else code 

76 bucket.bytecode_from_string(code_bytes) 

77 return code_bytes 

78 return None 

79 

80 async def dump_bytecode(self, bucket: Bucket) -> None: # type: ignore[override] 

81 await self.client.set( 

82 self.get_bucket_name(bucket.key), bucket.bytecode_to_string() 

83 ) 

84 

85 async def get_bucket( # type: ignore[override] 

86 self, environment: Environment, name: str, filename: str | None, source: str 

87 ) -> Bucket: 

88 key = self.get_cache_key(name, filename or name) 

89 checksum = self.get_source_checksum(source) 

90 bucket = Bucket(environment, key, checksum) 

91 await self.load_bytecode(bucket) 

92 return bucket 

93 

94 async def set_bucket(self, bucket: Bucket) -> None: # type: ignore[override] 

95 await self.dump_bytecode(bucket) 

96 

97 async def get_bucket_async( 

98 self, environment: Environment, name: str, filename: str | None, source: str 

99 ) -> Bucket: 

100 return await self.get_bucket(environment, name, filename, source) 

101 

102 async def set_bucket_async(self, bucket: Bucket) -> None: 

103 return await self.set_bucket(bucket)