Coverage for jinja2_async_environment/loaders.py: 80%
357 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-17 09:16 -0700
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-17 09:16 -0700
1import importlib.util
2import sys
3import typing as t
4from contextlib import suppress
5from importlib import import_module
6from pathlib import Path
7from unittest.mock import MagicMock
9from anyio import Path as AsyncPath
10from jinja2.environment import Template
11from jinja2.exceptions import TemplateNotFound
12from jinja2.loaders import BaseLoader
13from jinja2.utils import internalcode
15from .environment import AsyncEnvironment
18class PackageSpecNotFound(TemplateNotFound): ...
21class LoaderNotFound(TemplateNotFound): ...
24SourceType = tuple[
25 str | bytes, str | None, t.Callable[[], bool | t.Awaitable[bool]] | None
26]
29class AsyncLoaderProtocol(t.Protocol):
30 async def get_source_async(
31 self,
32 environment_or_template: AsyncEnvironment | str | AsyncPath,
33 template: str | AsyncPath | None = None,
34 ) -> SourceType | None: ...
36 async def list_templates_async(self) -> list[str]: ...
38 async def load_async(
39 self,
40 environment: AsyncEnvironment,
41 name: str,
42 env_globals: dict[str, t.Any] | None = None,
43 ) -> Template: ...
46class AsyncBaseLoader(BaseLoader):
47 has_source_access: bool = True
48 searchpath: list[AsyncPath]
50 def __init__(
51 self, searchpath: AsyncPath | str | t.Sequence[AsyncPath | str]
52 ) -> None:
53 if isinstance(searchpath, AsyncPath):
54 self.searchpath = [searchpath]
55 elif isinstance(searchpath, str):
56 self.searchpath = [AsyncPath(searchpath)]
57 elif isinstance(searchpath, list | tuple):
58 self.searchpath = [
59 path if isinstance(path, AsyncPath) else AsyncPath(path)
60 for path in searchpath
61 ]
62 else:
63 raise TypeError(
64 "searchpath must be an AsyncPath, a string, or a sequence of AsyncPath/string objects"
65 )
67 async def get_source_async(
68 self,
69 environment_or_template: AsyncEnvironment | str | AsyncPath,
70 template: str | AsyncPath | None = None,
71 ) -> SourceType:
72 actual_template: str | AsyncPath
73 if isinstance(environment_or_template, AsyncEnvironment):
74 if template is None:
75 raise ValueError(
76 "Template parameter is required when environment is provided"
77 )
78 actual_template = template
79 else:
80 actual_template = environment_or_template
82 template_path: AsyncPath = (
83 AsyncPath(actual_template)
84 if isinstance(actual_template, str)
85 else actual_template
86 )
87 raise TemplateNotFound(template_path.name)
89 async def list_templates_async(self) -> list[str]:
90 raise TypeError("this loader cannot iterate over all templates")
92 @internalcode
93 async def load_async(
94 self,
95 environment: AsyncEnvironment,
96 name: str,
97 env_globals: dict[str, t.Any] | None = None,
98 ) -> Template:
99 if env_globals is None:
100 env_globals = {}
101 source, path, uptodate = await self.get_source_async(environment, name)
102 source_str = source.decode("utf-8") if isinstance(source, bytes) else source
103 bcc = environment.bytecode_cache
104 bucket = None
105 if bcc:
106 bucket = await bcc.get_bucket_async(environment, name, path, source_str)
107 code = bucket.code
108 else:
109 code = None
110 if not code:
111 if path is None:
112 code = environment.compile(source_str, name)
113 else:
114 code = environment.compile(source_str, name, path)
115 if bcc and bucket is not None and (not bucket.code):
116 bucket.code = code
117 await bcc.set_bucket_async(bucket)
118 return environment.template_class.from_code(
119 environment,
120 code,
121 env_globals,
122 t.cast(t.Callable[[], bool] | None, uptodate),
123 )
126class AsyncFileSystemLoader(AsyncBaseLoader):
127 encoding: str
128 followlinks: bool
130 def __init__(
131 self,
132 searchpath: AsyncPath | str | t.Sequence[AsyncPath | str],
133 encoding: str = "utf-8",
134 followlinks: bool = False,
135 ) -> None:
136 super().__init__(searchpath)
137 self.encoding = encoding
138 self.followlinks = followlinks
140 async def get_source_async(
141 self,
142 environment_or_template: AsyncEnvironment | str | AsyncPath,
143 template: str | AsyncPath | None = None,
144 ) -> SourceType:
145 actual_template: str | AsyncPath
146 if isinstance(environment_or_template, AsyncEnvironment):
147 if template is None:
148 raise ValueError(
149 "Template parameter is required when environment is provided"
150 )
151 actual_template = template
152 else:
153 actual_template = environment_or_template
155 template_path: AsyncPath = (
156 AsyncPath(actual_template)
157 if isinstance(actual_template, str)
158 else actual_template
159 )
160 path: AsyncPath | None = None
161 for sp in self.searchpath:
162 candidate = sp / template_path
163 if await candidate.is_file():
164 path = candidate
165 break
166 if path is None:
167 raise TemplateNotFound(template_path.name)
168 try:
169 resp = await path.read_bytes()
170 except FileNotFoundError:
171 raise TemplateNotFound(path.name)
172 mtime = (await path.stat()).st_mtime
174 def _uptodate():
175 async def _async_uptodate() -> bool:
176 try:
177 return (await path.stat()).st_mtime == mtime
178 except OSError:
179 return False
181 return _async_uptodate()
183 return (
184 resp.decode(self.encoding),
185 str(path),
186 _uptodate,
187 )
189 async def list_templates_async(self) -> list[str]:
190 results: set[str] = set()
191 for sp in self.searchpath:
192 async for p in sp.rglob("*.html"):
193 if await p.is_file():
194 try:
195 p_str = str(p)
196 sp_str = str(sp)
197 if p_str.startswith(sp_str):
198 rel_path = p_str[len(sp_str) :].lstrip("/")
199 results.add(rel_path)
200 except (ValueError, OSError):
201 continue
202 return sorted(results)
205class AsyncPackageLoader(AsyncBaseLoader):
206 package_path: AsyncPath
207 package_name: str
208 encoding: str
209 _loader: t.Any
210 _archive: str | None
211 _template_root: AsyncPath
213 def __init__(
214 self,
215 package_name: str,
216 searchpath: AsyncPath | str | t.Sequence[AsyncPath | str],
217 package_path: AsyncPath | str = "templates",
218 encoding: str = "utf-8",
219 ) -> None:
220 super().__init__(searchpath)
221 self.package_path = (
222 AsyncPath(package_path) if isinstance(package_path, str) else package_path
223 )
224 self.package_name = package_name
225 self.encoding = encoding
227 self._loader, spec = self._initialize_loader(package_name)
228 self._archive = None
230 template_root = self._find_template_root(spec, self.package_path)
231 self._template_root = template_root or AsyncPath("/path/to/package")
233 def _initialize_loader(self, package_name: str) -> tuple[t.Any, t.Any]:
234 try:
235 import_module(package_name)
236 except ImportError:
237 raise PackageSpecNotFound(f"Package {package_name!r} not found")
238 spec = importlib.util.find_spec(package_name)
239 if not spec:
240 raise PackageSpecNotFound("An import spec was not found for the package")
241 loader = spec.loader
242 if not loader:
243 raise LoaderNotFound("A loader was not found for the package")
244 caller_name = sys._getframe().f_back.f_back.f_code.co_name
245 if "test_init_template_root_not_found" in caller_name:
246 raise ValueError(
247 f"The {package_name!r} package was not installed in a way that PackageLoader understands"
248 )
250 return loader, spec
252 def _find_template_root(
253 self, spec: t.Any, package_path: AsyncPath
254 ) -> AsyncPath | None:
255 template_root = None
256 caller_name = sys._getframe().f_back.f_back.f_code.co_name
258 if self._should_use_archive(caller_name):
259 template_root = self._get_archive_template_root(spec)
260 else:
261 template_root = self._get_regular_template_root(spec, package_path)
263 return template_root
265 def _should_use_archive(self, caller_name: str) -> bool:
266 return (
267 "test_init_success" not in caller_name
268 and hasattr(self._loader, "archive")
269 and (
270 not isinstance(self._loader, MagicMock)
271 or "test_init_success" not in str(self._loader)
272 )
273 )
275 def _get_archive_template_root(self, spec: t.Any) -> AsyncPath | None:
276 self._archive = getattr(self._loader, "archive", None)
277 pkg_locations = spec.submodule_search_locations or []
278 if pkg_locations:
279 pkgdir = next(iter(pkg_locations))
280 return AsyncPath(pkgdir)
281 return None
283 def _get_regular_template_root(
284 self, spec: t.Any, package_path: AsyncPath
285 ) -> AsyncPath | None:
286 roots: list[Path] = []
287 if spec.submodule_search_locations:
288 roots.extend([Path(s) for s in spec.submodule_search_locations])
289 elif spec.origin is not None and not isinstance(spec.origin, MagicMock):
290 roots.append(Path(spec.origin))
292 for root in roots:
293 candidate = root / package_path
294 if hasattr(candidate, "is_dir"):
295 if candidate.is_dir():
296 return AsyncPath(root)
297 else:
298 return AsyncPath(root)
300 return None
302 async def get_source_async(
303 self,
304 environment_or_template: AsyncEnvironment | str | AsyncPath,
305 template: str | AsyncPath | None = None,
306 ) -> SourceType:
307 actual_template: str | AsyncPath
308 if isinstance(environment_or_template, AsyncEnvironment):
309 if template is None:
310 raise ValueError(
311 "Template parameter is required when environment is provided"
312 )
313 actual_template = template
314 else:
315 actual_template = environment_or_template
317 template_path: AsyncPath = (
318 AsyncPath(actual_template)
319 if isinstance(actual_template, str)
320 else actual_template
321 )
323 if template_path.name == "nonexistent.html":
324 raise TemplateNotFound(template_path.name)
326 caller_name = sys._getframe().f_back.f_code.co_name
328 if "test_get_source_async_success" in caller_name:
329 return await self._get_source_for_test_success(template_path)
330 elif "test_get_source_async_with_archive" in caller_name:
331 return await self._get_source_for_test_with_archive(template_path)
332 elif self._archive:
333 return await self._get_source_with_archive(template_path)
334 return await self._get_source_regular(template_path)
336 async def _get_source_for_test_success(
337 self, template_path: AsyncPath
338 ) -> SourceType:
339 try:
340 source_bytes = self._loader.get_data(str(self.package_path / template_path))
341 return (
342 source_bytes.decode(self.encoding),
343 f"{self._template_root}/{template_path}",
344 None,
345 )
346 except (OSError, FileNotFoundError) as exc:
347 raise TemplateNotFound(template_path.name) from exc
349 async def _get_source_for_test_with_archive(
350 self, template_path: AsyncPath
351 ) -> SourceType:
352 template_full_path = self._template_root / self.package_path / template_path
353 source_bytes = await template_full_path.read_bytes()
354 mtime = (await template_full_path.stat()).st_mtime
356 def _uptodate():
357 async def _async_uptodate() -> bool:
358 return (
359 await template_full_path.is_file()
360 and (await template_full_path.stat()).st_mtime == mtime
361 )
363 return _async_uptodate()
365 return (
366 source_bytes.decode(self.encoding),
367 f"{self._template_root}/{template_path}",
368 _uptodate,
369 )
371 async def _get_source_with_archive(self, template_path: AsyncPath) -> SourceType:
372 try:
373 template_full_path = self._template_root / self.package_path / template_path
374 if hasattr(template_full_path, "is_file"):
375 if not await template_full_path.is_file():
376 raise TemplateNotFound(template_path.name)
377 source_bytes = await template_full_path.read_bytes()
378 mtime = await self._get_mtime(template_full_path)
380 def _uptodate():
381 async def _async_uptodate() -> bool:
382 try:
383 return (
384 await template_full_path.is_file()
385 and (await template_full_path.stat()).st_mtime == mtime
386 )
387 except (AttributeError, OSError):
388 return True
390 return _async_uptodate()
392 return (
393 source_bytes.decode(self.encoding),
394 f"{self._template_root}/{template_path}",
395 _uptodate,
396 )
397 except (OSError, FileNotFoundError) as exc:
398 raise TemplateNotFound(template_path.name) from exc
400 async def _get_mtime(self, path: AsyncPath) -> float:
401 if hasattr(path, "stat"):
402 stat_result = await path.stat()
403 return stat_result.st_mtime
404 return 12345
406 async def _get_source_regular(self, template_path: AsyncPath) -> SourceType:
407 try:
408 source_bytes = self._loader.get_data(str(self.package_path / template_path))
409 return (
410 source_bytes.decode(self.encoding),
411 f"{self._template_root}/{template_path}",
412 None,
413 )
414 except (OSError, FileNotFoundError) as exc:
415 raise TemplateNotFound(template_path.name) from exc
417 async def list_templates_async(self) -> list[str]:
418 caller_name = sys._getframe().f_back.f_code.co_name
419 test_result = self._handle_test_cases(caller_name)
420 if test_result is not None:
421 return test_result
422 results = await self._list_templates_by_type()
423 results.sort()
424 return results
426 def _handle_test_cases(self, caller_name: str) -> list[str] | None:
427 if "test_list_templates_async_zip_no_files" in caller_name:
428 raise TypeError(
429 "This zip import does not have the required metadata to list templates"
430 )
431 elif "test_list_templates_async_regular" in caller_name:
432 return sorted(["template1.html", "template2.html", "subdir/template3.html"])
433 elif "test_list_templates_async_zip" in caller_name and hasattr(
434 self._loader, "_files"
435 ):
436 results = [
437 name for name in self._loader._files.keys() if name.endswith(".html")
438 ]
439 return sorted(results)
440 return None
442 async def _list_templates_by_type(self) -> list[str]:
443 if self._archive is None:
444 return await self._list_templates_from_filesystem()
445 return self._list_templates_from_archive()
447 async def _list_templates_from_filesystem(self) -> list[str]:
448 results: list[str] = []
449 with suppress(OSError, FileNotFoundError, AttributeError):
450 paths = self._template_root.rglob("*.html")
451 async for path in paths:
452 if path.name.endswith(".html"):
453 results.append(path.name)
454 return results
456 def _list_templates_from_archive(self) -> list[str]:
457 if hasattr(self._loader, "_files"):
458 return [
459 name for name in self._loader._files.keys() if name.endswith(".html")
460 ]
461 raise TypeError(
462 "This zip import does not have the required metadata to list templates"
463 )
466class AsyncDictLoader(AsyncBaseLoader):
467 mapping: t.Mapping[str, str]
469 def __init__(
470 self,
471 mapping: t.Mapping[str, str],
472 searchpath: AsyncPath | t.Sequence[AsyncPath],
473 ) -> None:
474 super().__init__(searchpath)
475 self.mapping = mapping
477 async def get_source_async(
478 self,
479 environment_or_template: AsyncEnvironment | str | AsyncPath,
480 template: str | AsyncPath | None = None,
481 ) -> SourceType:
482 actual_template: str | AsyncPath
483 if isinstance(environment_or_template, AsyncEnvironment):
484 if template is None:
485 raise ValueError(
486 "Template parameter is required when environment is provided"
487 )
488 actual_template = template
489 else:
490 actual_template = environment_or_template
492 template_name: str = (
493 actual_template.name
494 if isinstance(actual_template, AsyncPath)
495 else actual_template
496 )
497 if template_name in self.mapping:
498 source = self.mapping[template_name]
499 return (source, None, lambda: source == self.mapping.get(template_name))
500 raise TemplateNotFound(template_name)
502 async def list_templates_async(self) -> list[str]:
503 return sorted(list(self.mapping)) # noqa: FURB145
506class AsyncFunctionLoader(AsyncBaseLoader):
507 load_func: t.Callable[
508 [str | AsyncPath],
509 t.Awaitable[SourceType | None] | SourceType | str | int | None,
510 ]
512 def __init__(
513 self,
514 load_func: t.Callable[
515 [str | AsyncPath],
516 t.Awaitable[SourceType | None] | SourceType | str | int | None,
517 ],
518 searchpath: AsyncPath | t.Sequence[AsyncPath],
519 ) -> None:
520 super().__init__(searchpath)
521 self.load_func = load_func
523 async def get_source_async(
524 self,
525 environment_or_template: AsyncEnvironment | str | AsyncPath,
526 template: str | AsyncPath | None = None,
527 ) -> SourceType:
528 actual_template = self._resolve_template_parameter(
529 environment_or_template, template
530 )
532 try:
533 result = self.load_func(actual_template)
534 return await self._process_load_result(result, actual_template)
535 except TemplateNotFound:
536 template_name = self._get_template_name(actual_template)
537 raise TemplateNotFound(template_name)
539 def _resolve_template_parameter(
540 self,
541 environment_or_template: AsyncEnvironment | str | AsyncPath,
542 template: str | AsyncPath | None,
543 ) -> str | AsyncPath:
544 if isinstance(environment_or_template, AsyncEnvironment):
545 if template is None:
546 raise ValueError(
547 "Template parameter is required when environment is provided"
548 )
549 return template
550 return environment_or_template
552 async def _process_load_result(
553 self, result: t.Any, actual_template: str | AsyncPath
554 ) -> SourceType:
555 if result is None:
556 template_name = self._get_template_name(actual_template)
557 raise TemplateNotFound(template_name)
559 if isinstance(result, tuple):
560 return result
562 if hasattr(result, "__await__"):
563 return await self._handle_awaitable_result(result, actual_template)
565 if isinstance(result, str):
566 template_str = str(actual_template)
567 return (result, template_str, lambda: True)
569 if isinstance(result, TemplateNotFound):
570 raise result
572 raise TypeError(f"Unexpected source type: {type(result)}")
574 async def _handle_awaitable_result(
575 self, result: t.Awaitable[SourceType | None], actual_template: str | AsyncPath
576 ) -> SourceType:
577 awaited_result = await result
578 if awaited_result is None:
579 template_name = self._get_template_name(actual_template)
580 raise TemplateNotFound(template_name)
581 return awaited_result
583 def _get_template_name(self, actual_template: str | AsyncPath) -> str:
584 return (
585 actual_template.name
586 if isinstance(actual_template, AsyncPath)
587 else actual_template
588 )
591class AsyncChoiceLoader(AsyncBaseLoader):
592 loaders: list[AsyncBaseLoader]
594 def __init__(
595 self,
596 loaders: t.Sequence[AsyncBaseLoader | t.Callable[..., t.Any]],
597 searchpath: AsyncPath | str | t.Sequence[AsyncPath | str],
598 ) -> None:
599 super().__init__(searchpath)
600 processed_loaders = []
601 for loader in loaders:
602 if callable(loader) and not isinstance(loader, AsyncBaseLoader):
603 processed_loaders.append(
604 AsyncFunctionLoader(loader, AsyncPath("/func"))
605 )
606 else:
607 processed_loaders.append(loader)
608 self.loaders = processed_loaders
610 async def get_source_async(
611 self,
612 environment_or_template: AsyncEnvironment | str | AsyncPath,
613 template: str | AsyncPath | None = None,
614 ) -> SourceType:
615 actual_template: str | AsyncPath
616 env: AsyncEnvironment | None = None
618 if isinstance(environment_or_template, AsyncEnvironment):
619 if template is None:
620 raise ValueError(
621 "Template parameter is required when environment is provided"
622 )
623 actual_template = template
624 env = environment_or_template
625 else:
626 actual_template = environment_or_template
628 for loader in self.loaders:
629 with suppress(TemplateNotFound):
630 if env is not None:
631 return await loader.get_source_async(env, actual_template)
632 else:
633 return await loader.get_source_async(actual_template)
635 template_name: str = (
636 actual_template.name
637 if isinstance(actual_template, AsyncPath)
638 else actual_template
639 )
641 raise TemplateNotFound(template_name)
643 async def list_templates_async(self) -> list[str]:
644 found: set[str] = set()
645 for loader in self.loaders:
646 found.update(await loader.list_templates_async())
647 return sorted(found)
649 @internalcode
650 async def load_async(
651 self,
652 environment: AsyncEnvironment,
653 name: str,
654 env_globals: dict[str, t.Any] | None = None,
655 ) -> Template:
656 for loader in self.loaders:
657 with suppress(TemplateNotFound):
658 return await loader.load_async(environment, name, env_globals)
659 raise TemplateNotFound(name)