Coverage for jinja2_async_environment / loaders / function.py: 82%

90 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 21:26 -0800

1# mypy: disable-error-code="return-value" 

2"""Async function template loader implementation.""" 

3 

4import inspect 

5import typing as t 

6from typing import Any 

7 

8from anyio import Path as AsyncPath 

9from jinja2.utils import internalcode 

10 

11from .base import AsyncBaseLoader, SourceType 

12 

13if t.TYPE_CHECKING: 

14 from ..environment import AsyncEnvironment 

15 

16# Type alias for loader functions 

17LoaderFunction = t.Callable[[str], str | None] 

18AsyncLoaderFunction = t.Callable[[str], t.Awaitable[str | tuple[Any, ...] | None]] 

19 

20 

21class AsyncFunctionLoader(AsyncBaseLoader): 

22 """Async function-based template loader with memory optimization. 

23 

24 This loader uses a callable function to load templates, allowing for 

25 custom template retrieval logic such as loading from databases, 

26 remote services, or other dynamic sources. 

27 """ 

28 

29 __slots__ = ("load_func", "is_async_func") 

30 

31 load_func: LoaderFunction | AsyncLoaderFunction 

32 is_async_func: bool 

33 

34 def __init__( 

35 self, 

36 load_func: LoaderFunction | AsyncLoaderFunction, 

37 searchpath: AsyncPath | str | t.Sequence[AsyncPath | str] | None = None, 

38 ) -> None: 

39 """Initialize the function loader. 

40 

41 Args: 

42 load_func: Function that takes a template name and returns 

43 template source or None if not found. Can be sync or async. 

44 searchpath: Path or sequence of paths for compatibility (not used) 

45 """ 

46 # Call parent with provided searchpath or empty list 

47 if searchpath is None: 

48 searchpath = [] 

49 super().__init__(searchpath) 

50 self.load_func = load_func 

51 

52 # Detect if the function is async 

53 import inspect 

54 

55 self.is_async_func = inspect.iscoroutinefunction(load_func) 

56 

57 @internalcode 

58 async def get_source_async( 

59 self, environment: "AsyncEnvironment", name: str 

60 ) -> SourceType: 

61 """Get template source using the loader function asynchronously. 

62 

63 Args: 

64 environment: The async environment instance 

65 name: Template name to load 

66 

67 Returns: 

68 Tuple of (source, filename, uptodate_func) 

69 

70 Raises: 

71 TemplateNotFound: If template cannot be loaded by the function 

72 """ 

73 self._ensure_initialized() 

74 

75 # Call the loader function (async or sync) 

76 result = await self._call_load_function(name) 

77 

78 # Handle None result (this should raise TemplateNotFound) 

79 if result is None: 

80 self._handle_template_not_found(name) 

81 # This line should never be reached, but mypy needs it for type checking 

82 raise RuntimeError( 

83 "Template not found handler should have raised exception" 

84 ) 

85 

86 # At this point, result is guaranteed to be not None 

87 assert result is not None 

88 

89 return self._process_load_result(result, name) 

90 

91 async def _call_load_function(self, name: str) -> t.Any: 

92 """Call the loader function (async or sync). 

93 

94 Args: 

95 name: Template name to load 

96 

97 Returns: 

98 Result from the loader function 

99 """ 

100 if self.is_async_func: 

101 result = await self._call_async_load_function(name) 

102 return result 

103 return self._call_sync_load_function(name) 

104 

105 async def _call_async_load_function( 

106 self, name: str 

107 ) -> str | tuple[Any, ...] | None: 

108 """Call the async loader function and await result.""" 

109 # Call async function and await result 

110 initial_result = self.load_func(name) 

111 

112 # Check if result is awaitable and await it if needed 

113 import inspect 

114 

115 if inspect.isawaitable(initial_result): 

116 result: str | tuple[Any, ...] | None = await initial_result 

117 else: 

118 result = initial_result 

119 

120 # Keep awaiting until we get a non-awaitable result 

121 while inspect.isawaitable(result): 

122 awaited_result: str | tuple[Any, ...] | None = await result 

123 result = awaited_result 

124 

125 # At this point, result should be str | None | tuple[Any, ...] 

126 return result 

127 

128 def _call_sync_load_function(self, name: str) -> str | tuple[Any, ...] | None: 

129 """Call the sync loader function.""" 

130 # Call sync function directly 

131 result = self.load_func(name) 

132 # Ensure we're not returning an awaitable for sync calls 

133 if inspect.isawaitable(result): 

134 raise RuntimeError("Sync loader function returned an awaitable") 

135 # Type assertion: at this point result cannot be awaitable 

136 return t.cast(str | tuple[Any, ...] | None, result) 

137 

138 def _process_load_result( 

139 self, result: str | tuple[Any, ...], name: str 

140 ) -> SourceType: 

141 """Process the result from the loader function. 

142 

143 Args: 

144 result: Result from the loader function 

145 name: Template name 

146 

147 Returns: 

148 SourceType tuple (source, filename, uptodate_func) 

149 """ 

150 # Handle different return types from the load function 

151 if isinstance(result, tuple) and len(result) == 3: 

152 return self._process_tuple_result(result) 

153 elif isinstance(result, str): 

154 return self._process_string_result(result, name) 

155 else: 

156 # Unexpected return type 

157 raise TypeError(f"Unexpected source type: {type(result)}") 

158 

159 def _process_tuple_result(self, result: tuple[Any, ...]) -> SourceType: 

160 """Process tuple result from loader function. 

161 

162 Args: 

163 result: 3-element tuple (source, filename, uptodate_func) 

164 

165 Returns: 

166 SourceType tuple 

167 """ 

168 # load_func returned a full SourceType tuple 

169 source_val: str | bytes = result[0] 

170 filename_val: str | None = result[1] 

171 # Check if the third element is callable or None 

172 uptodate_candidate = result[2] 

173 uptodate_val: t.Callable[[], bool] | None = ( 

174 t.cast(t.Callable[[], bool], uptodate_candidate) 

175 if callable(uptodate_candidate) or uptodate_candidate is None 

176 else None 

177 ) 

178 # Ensure types match SourceType definition 

179 source_typed: str | bytes = source_val 

180 filename_typed: str | None = filename_val 

181 uptodate_typed: t.Callable[[], bool] | None = uptodate_val 

182 return source_typed, filename_typed, uptodate_typed 

183 

184 def _process_string_result(self, source: str, name: str) -> SourceType: 

185 """Process string result from loader function. 

186 

187 Args: 

188 source: Template source 

189 name: Template name 

190 

191 Returns: 

192 SourceType tuple 

193 """ 

194 

195 # For function loader, we use the template name as filename 

196 # and create an uptodate function that re-checks the loader 

197 def uptodate() -> bool: 

198 try: 

199 if self.is_async_func: 

200 # Can't call async function from sync context 

201 # Always return False to force reload 

202 return False 

203 else: 

204 return self._check_sync_template_update(name, source) 

205 except Exception: 

206 return False 

207 

208 uptodate_func: t.Callable[[], bool] | None = uptodate 

209 return source, name, uptodate_func 

210 

211 def _check_sync_template_update(self, name: str, source: str) -> bool: 

212 """Check if a sync template has been updated. 

213 

214 Args: 

215 name: Template name 

216 source: Current template source 

217 

218 Returns: 

219 True if template hasn't changed, False otherwise 

220 """ 

221 current_result = self.load_func(name) 

222 if current_result is None: 

223 return False 

224 if isinstance(current_result, tuple) and len(current_result) == 3: 

225 current_source = current_result[0] 

226 else: 

227 current_source = current_result 

228 return current_source == source 

229 

230 @internalcode 

231 async def list_templates_async(self) -> list[str]: 

232 """List templates (not supported by function loader). 

233 

234 Returns: 

235 Empty list (function loaders cannot enumerate templates) 

236 

237 Raises: 

238 TypeError: Always raised as function loaders cannot list templates 

239 """ 

240 raise TypeError("this loader cannot iterate over all templates") 

241 

242 def update_function(self, load_func: LoaderFunction | AsyncLoaderFunction) -> None: 

243 """Update the loader function. 

244 

245 Args: 

246 load_func: New loader function to use 

247 """ 

248 import inspect 

249 

250 self.load_func = load_func 

251 self.is_async_func = inspect.iscoroutinefunction(load_func)