Coverage for jinja2_async_environment / loaders / filesystem.py: 67%

106 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 21:26 -0800

1"""Async filesystem template loader implementation.""" 

2 

3import typing as t 

4from contextlib import suppress 

5 

6from anyio import Path as AsyncPath 

7from jinja2.utils import internalcode 

8 

9from .base import AsyncBaseLoader, SourceType 

10 

11# Define TemplatePathData type for better type checking 

12# Replaced namedtuple with TypedCache for better type safety 

13 

14 

15class TemplatePathDataType(t.NamedTuple): 

16 """Type definition for template path data.""" 

17 

18 template_name: str | None 

19 should_include: bool 

20 

21 

22if t.TYPE_CHECKING: 

23 from ..environment import AsyncEnvironment 

24 

25 

26class AsyncFileSystemLoader(AsyncBaseLoader): 

27 """Async filesystem template loader with memory optimization. 

28 

29 This loader searches for templates in the filesystem asynchronously, 

30 supporting multiple search paths and proper file watching for 

31 template updates. 

32 """ 

33 

34 __slots__ = ("encoding", "followlinks", "_template_cache") 

35 

36 def __init__( 

37 self, 

38 searchpath: AsyncPath | str | t.Sequence[AsyncPath | str], 

39 encoding: str = "utf-8", 

40 followlinks: bool = False, 

41 ) -> None: 

42 """Initialize the filesystem loader. 

43 

44 Args: 

45 searchpath: Path or sequence of paths to search for templates 

46 encoding: File encoding to use when reading templates 

47 followlinks: Whether to follow symbolic links 

48 """ 

49 super().__init__(searchpath) 

50 self.encoding = encoding 

51 self.followlinks = followlinks 

52 self._template_cache: dict[str, tuple[float, str]] = {} 

53 

54 @internalcode 

55 async def get_source_async( 

56 self, environment: "AsyncEnvironment", name: str 

57 ) -> SourceType: 

58 """Get template source from filesystem asynchronously. 

59 

60 Args: 

61 environment: The async environment instance 

62 name: Template name to load 

63 

64 Returns: 

65 Tuple of (source, filename, uptodate_func) 

66 

67 Raises: 

68 TemplateNotFound: If template file cannot be found 

69 """ 

70 self._ensure_initialized() 

71 

72 for searchpath in self.searchpath: 

73 template_path = searchpath / name 

74 

75 try: 

76 if await template_path.exists(): 

77 if not await self._is_safe_path(template_path): 

78 continue 

79 

80 # Read the template file 

81 source = await template_path.read_text(encoding=self.encoding) 

82 filename = str(await template_path.resolve()) 

83 

84 # Get the modification time for uptodate check 

85 stat_result = await template_path.stat() 

86 mtime = stat_result.st_mtime 

87 

88 # Create uptodate function 

89 def uptodate() -> bool: 

90 try: 

91 from pathlib import Path 

92 

93 return Path(filename).stat().st_mtime == mtime 

94 except OSError: 

95 return False 

96 

97 return source, filename, uptodate 

98 

99 except OSError: 

100 # Continue to next search path 

101 continue 

102 

103 # Template not found in any search path 

104 self._handle_template_not_found(name) 

105 # This line should never be reached, but added for type checker 

106 raise RuntimeError("Unreachable code") 

107 

108 async def _is_safe_path(self, template_path: AsyncPath) -> bool: 

109 """Check if the template path is safe to access. 

110 

111 Args: 

112 template_path: Path to check 

113 

114 Returns: 

115 True if path is safe, False otherwise 

116 """ 

117 try: 

118 # Check if it's a file 

119 if not await template_path.is_file(): 

120 return False 

121 

122 # If not following links, check for symlinks 

123 if not self.followlinks and await template_path.is_symlink(): 

124 return False 

125 

126 # Check that the resolved path is within search paths 

127 resolved_path = await template_path.resolve() 

128 

129 for searchpath in self.searchpath: 

130 resolved_searchpath = await searchpath.resolve() 

131 try: 

132 resolved_path.relative_to(str(resolved_searchpath)) 

133 return True 

134 except ValueError: 

135 continue 

136 

137 return False 

138 

139 except OSError: 

140 return False 

141 

142 @internalcode 

143 async def list_templates_async(self) -> list[str]: 

144 """List all templates in the search paths asynchronously. 

145 

146 Returns: 

147 Sorted list of template names 

148 """ 

149 self._ensure_initialized() 

150 

151 found_templates = set() 

152 

153 for searchpath in self.searchpath: 

154 templates = await self._list_templates_in_path(searchpath) 

155 found_templates.update(templates) 

156 

157 return sorted(found_templates) 

158 

159 async def _list_templates_in_path(self, searchpath: AsyncPath) -> set[str]: 

160 """List templates in a single search path. 

161 

162 Args: 

163 searchpath: Search path to list templates from 

164 

165 Returns: 

166 Set of template names found in this path 

167 """ 

168 if not await searchpath.exists(): 

169 return set() 

170 

171 found_templates = set() 

172 with suppress(OSError): 

173 # Use rglob to find all files recursively 

174 async for template_path in searchpath.rglob("*"): 

175 if await template_path.is_file(): 

176 template_data = await self._process_template_path( 

177 searchpath, template_path 

178 ) 

179 if ( 

180 template_data.should_include 

181 and template_data.template_name is not None 

182 ): 

183 found_templates.add(template_data.template_name) 

184 

185 return found_templates 

186 

187 async def _process_template_path( 

188 self, searchpath: AsyncPath, template_path: AsyncPath 

189 ) -> TemplatePathDataType: 

190 """Process a template path to determine if it should be included. 

191 

192 Args: 

193 searchpath: Search path 

194 template_path: Template path to process 

195 

196 Returns: 

197 Named tuple with template name and whether it should be included 

198 """ 

199 template_name = await self._get_template_name(searchpath, template_path) 

200 if template_name and await self._is_safe_path(template_path): 

201 return TemplatePathDataType(template_name, True) 

202 return TemplatePathDataType(None, False) 

203 

204 async def _get_template_name( 

205 self, searchpath: AsyncPath, template_path: AsyncPath 

206 ) -> str | None: 

207 """Get template name from template path. 

208 

209 Args: 

210 searchpath: Search path 

211 template_path: Full path to template 

212 

213 Returns: 

214 Template name or None if path is not relative to searchpath 

215 """ 

216 try: 

217 relative_path = template_path.relative_to(str(searchpath)) 

218 return str(relative_path).replace("\\", "/") 

219 except ValueError: 

220 # Path is not relative to searchpath 

221 return None 

222 

223 async def _walk_directory( 

224 self, directory: AsyncPath 

225 ) -> t.AsyncGenerator[AsyncPath]: 

226 """Async generator to walk directory tree. 

227 

228 Args: 

229 directory: Directory to walk 

230 

231 Yields: 

232 AsyncPath objects for each file/directory found 

233 """ 

234 if not await directory.exists(): 

235 return 

236 

237 try: 

238 async for item in directory.iterdir(): 

239 yield item 

240 

241 # Check if we should recurse into this directory 

242 if await self._should_recurse_into_directory(item): 

243 async for subitem in self._walk_subdirectory(item): 

244 yield subitem 

245 

246 except (OSError, PermissionError): 

247 # Skip directories we can't access 

248 return 

249 

250 async def _should_recurse_into_directory(self, item: AsyncPath) -> bool: 

251 """Check if we should recurse into a directory. 

252 

253 Args: 

254 item: Directory to check 

255 

256 Returns: 

257 True if we should recurse, False otherwise 

258 """ 

259 return await item.is_dir() and (self.followlinks or not await item.is_symlink()) 

260 

261 async def _walk_subdirectory( 

262 self, directory: AsyncPath 

263 ) -> t.AsyncGenerator[AsyncPath]: 

264 """Async generator to walk subdirectory tree. 

265 

266 Args: 

267 directory: Subdirectory to walk 

268 

269 Yields: 

270 AsyncPath objects for each file/directory found 

271 """ 

272 async for subitem in self._walk_directory(directory): 

273 yield subitem 

274 

275 def _get_cache_key(self, name: str) -> str: 

276 """Generate cache key for template. 

277 

278 Args: 

279 name: Template name 

280 

281 Returns: 

282 Cache key string 

283 """ 

284 return f"fs:{name}"