Coverage for jinja2_async_environment/compiler_old.py: 0%
683 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:09 -0700
1import hashlib
2import re
3import typing as t
5from jinja2 import nodes
6from jinja2.compiler import (
7 CodeGenerator,
8 CompilerExit,
9 EvalContext,
10 Frame,
11 find_undeclared,
12)
15# Pre-compiled regex patterns for performance optimization
16class CompiledPatterns:
17 """Pre-compiled regex patterns for template compilation optimization."""
19 # Pattern for async yield detection
20 ASYNC_YIELD_PATTERN = re.compile(
21 r"async for event in self\._async_yield_from\([^)]+\):\s*$", re.MULTILINE
22 )
24 # Pattern for undefined variable detection
25 UNDEFINED_VAR_PATTERN = re.compile(
26 r"undefined\(name='([^']+)'\) if l_0_\1 is missing else l_0_\1"
27 )
29 # Pattern for loop variable optimization
30 LOOP_VAR_PATTERN = re.compile(r"l_0_(\w+)")
32 # Pattern for context block detection
33 CONTEXT_BLOCK_PATTERN = re.compile(r"yield from context\.blocks")
35 # Additional performance patterns
36 HASATTR_CHECK_PATTERN = re.compile(r"hasattr\(([^,]+),\s*'__await__'\)")
37 AUTO_AITER_PATTERN = re.compile(r"auto_aiter\(([^)]+)\)")
38 TEMPLATE_RUNTIME_ERROR_PATTERN = re.compile(r'TemplateRuntimeError\("([^"]+)"\)')
39 DUPLICATE_IMPORT_PATTERN = re.compile(r"^from ([\w.]+) import (.+)$", re.MULTILINE)
40 REDUNDANT_ESCAPE_PATTERN = re.compile(r"escape\(escape\(([^)]+)\)\)")
41 CONSTANT_UNDEFINED_PATTERN = re.compile(r"undefined\(name=None\)")
43 # Cached import statements for faster code generation
44 _CACHED_IMPORTS = {
45 "runtime": "from jinja2.runtime import Undefined, Macro, missing, LoopContext, AsyncLoopContext, auto_aiter, Namespace, TemplateRuntimeError",
46 "markupsafe": "from markupsafe import escape",
47 "defaults": "from jinja2.defaults import DEFAULT_FILTERS",
48 }
50 @classmethod
51 def get_optimized_imports(cls) -> str:
52 """Return optimized import statements as a single string."""
53 return "\n".join(cls._CACHED_IMPORTS.values())
55 @classmethod
56 def optimize_generated_code(cls, code: str) -> str:
57 """Apply pattern-based optimizations to generated template code."""
58 optimized_code = code
60 # Fast string replacements for common patterns
61 optimizations = [
62 ("yield from context.blocks", "pass # yield from replaced"),
63 ("undefined(name='item') if l_0_item is missing else l_0_item", "item"),
64 ("undefined(name=None)", "Undefined()"),
65 ("escape(escape(", "escape("), # Remove double escaping
66 (
67 "if hasattr(value, '__await__'):\n return await value\nelse:\n return value",
68 "return await value if hasattr(value, '__await__') else value",
69 ),
70 ]
72 for pattern, replacement in optimizations:
73 optimized_code = optimized_code.replace(pattern, replacement)
75 # Use regex patterns for more complex optimizations
76 optimized_code = cls._optimize_with_regex(optimized_code)
78 # Remove duplicate imports
79 optimized_code = cls._deduplicate_imports(optimized_code)
81 return optimized_code
83 @classmethod
84 def _optimize_with_regex(cls, code: str) -> str:
85 """Apply regex-based optimizations."""
87 # Optimize hasattr checks
88 def optimize_hasattr(match):
89 var = match.group(1)
90 return f"getattr({var}, '__await__', None) is not None"
92 code = cls.HASATTR_CHECK_PATTERN.sub(optimize_hasattr, code)
94 # Remove redundant escape calls
95 code = cls.REDUNDANT_ESCAPE_PATTERN.sub(r"escape(\1)", code)
97 return code
99 @classmethod
100 def _deduplicate_imports(cls, code: str) -> str:
101 """Remove duplicate import statements."""
102 lines = code.split("\n")
103 seen_imports = set()
104 deduplicated_lines = []
106 for line in lines:
107 if line.strip().startswith("from ") and " import " in line:
108 if line not in seen_imports:
109 seen_imports.add(line)
110 deduplicated_lines.append(line)
111 else:
112 deduplicated_lines.append(line)
114 return "\n".join(deduplicated_lines)
117class CompilationCache:
118 """Cache for compiled template code to avoid recompilation."""
120 def __init__(self, max_size: int = 1000):
121 self.max_size = max_size
122 self._cache: dict[str, str] = {}
124 def get_cache_key(self, source: str, environment_id: str) -> str:
125 """Generate a cache key for template source and environment."""
126 content = f"{source}:{environment_id}"
127 return hashlib.sha256(content.encode()).hexdigest()[:16]
129 def get(self, cache_key: str) -> str | None:
130 """Get compiled code from cache."""
131 return self._cache.get(cache_key)
133 def set(self, cache_key: str, compiled_code: str) -> None:
134 """Store compiled code in cache with size limit."""
135 if len(self._cache) >= self.max_size:
136 # Remove oldest entries (simple FIFO)
137 oldest_keys = list(self._cache.keys())[: self.max_size // 4]
138 for key in oldest_keys:
139 del self._cache[key]
141 self._cache[cache_key] = compiled_code
143 def clear(self) -> None:
144 """Clear the cache."""
145 self._cache.clear()
148# Global compilation cache instance
149_compilation_cache = CompilationCache()
152class DependencyResolver:
153 """Utility class for resolving template dependencies like filters and tests."""
155 def __init__(self, code_generator: "AsyncCodeGenerator") -> None:
156 self.code_generator = code_generator
158 def setup_filter_dependency(self, name: str) -> None:
159 """Set up a single filter dependency with error handling."""
160 if name in self.code_generator.filters:
161 return
163 self.code_generator.filters[name] = self.code_generator.temporary_identifier()
164 filter_var = self.code_generator.filters[name]
166 self.code_generator.writeline("try:")
167 self.code_generator.indent()
168 self.code_generator.writeline(f"{filter_var} = environment.filters[{name!r}]")
169 self.code_generator.outdent()
170 self.code_generator.writeline("except KeyError:")
171 self.code_generator.indent()
172 self.code_generator.writeline("@internalcode")
173 self.code_generator.writeline(f"def {filter_var}(*unused):")
174 self.code_generator.indent()
175 self.code_generator.writeline(
176 f'raise TemplateRuntimeError("No filter named {name!r} found.")'
177 )
178 self.code_generator.outdent()
179 self.code_generator.outdent()
181 def setup_test_dependency(self, name: str) -> None:
182 """Set up a single test dependency with error handling."""
183 if name in self.code_generator.tests:
184 return
186 self.code_generator.tests[name] = self.code_generator.temporary_identifier()
187 test_var = self.code_generator.tests[name]
189 self.code_generator.writeline("try:")
190 self.code_generator.indent()
191 self.code_generator.writeline(f"{test_var} = environment.tests[{name!r}]")
192 self.code_generator.outdent()
193 self.code_generator.writeline("except KeyError:")
194 self.code_generator.indent()
195 self.code_generator.writeline("@internalcode")
196 self.code_generator.writeline(f"def {test_var}(*unused):")
197 self.code_generator.indent()
198 self.code_generator.writeline(
199 f'raise TemplateRuntimeError("No test named {name!r} found.")'
200 )
201 self.code_generator.outdent()
202 self.code_generator.outdent()
205class LoopCodeGenerator:
206 """Utility class for generating for-loop specific code patterns."""
208 def __init__(self, code_generator: "AsyncCodeGenerator") -> None:
209 self.code_generator = code_generator
211 def generate_async_for_header(
212 self, node: nodes.For, target: nodes.Node, frame: "AsyncFrame"
213 ) -> None:
214 """Generate the async for loop header with proper syntax."""
215 self.code_generator.writeline(
216 self.code_generator.choose_async("async for ", "for "), node
217 )
218 self.code_generator.visit(target, frame)
220 def generate_loop_iterator(
221 self,
222 iter_node: nodes.Node,
223 frame: "AsyncFrame",
224 extended_loop: bool,
225 loop_ref: str | None,
226 loop_filter_func: str | None,
227 ) -> None:
228 """Generate the iterator part of the for loop."""
229 if extended_loop and loop_ref:
230 self.code_generator.write(
231 f", {loop_ref} in {self.code_generator.choose_async('Async')}LoopContext("
232 )
233 else:
234 self.code_generator.write(" in ")
236 if loop_filter_func:
237 self.code_generator.write(f"{loop_filter_func}(")
239 if self.code_generator.environment.is_async and not extended_loop:
240 self.code_generator.write("auto_aiter(")
241 self.code_generator.visit(iter_node, frame)
242 if self.code_generator.environment.is_async and not extended_loop:
243 self.code_generator.write(")")
245 if loop_filter_func:
246 self.code_generator.write(")")
248 if extended_loop:
249 self.code_generator.write(", undefined):")
250 else:
251 self.code_generator.write(":")
254class AsyncFrame(Frame):
255 block_frame: "AsyncFrame | None"
256 require_output_check: bool
257 has_known_extends: bool
258 toplevel: bool
259 rootlevel: bool
260 buffer: str | None
261 block_buffer: list[str]
262 extended_buffer: list[str] | None
263 require_yield: bool
264 buffer_count: int
265 is_async: bool
267 def __init__(self, eval_ctx: EvalContext | None = None) -> None:
268 if eval_ctx is None:
269 from jinja2.environment import Environment
270 from jinja2.nodes import EvalContext
272 eval_ctx = EvalContext(Environment(autoescape=True), "template")
273 super().__init__(eval_ctx)
274 self.buffer = None
275 self.block_buffer = []
276 self.extended_buffer = None
277 self.block_frame = None
278 self.require_output_check = False
279 self.has_known_extends = False
280 self.toplevel = False
281 self.rootlevel = False
282 self.require_yield = False
283 self.buffer_count = 0
284 self.is_async = False
285 self.block_counters: dict[str, int] = {}
286 self.block_frame_id = 0
288 def copy(self) -> t.Self:
289 rv = self.__class__(self.eval_ctx)
290 rv.symbols = self.symbols.copy() # noqa: FURB145
291 rv.buffer = self.buffer
292 rv.block_buffer = self.block_buffer
293 rv.extended_buffer = self.extended_buffer
294 rv.eval_ctx = self.eval_ctx
295 rv.parent = self
296 rv.require_output_check = self.require_output_check
297 rv.has_known_extends = self.has_known_extends
298 rv.toplevel = self.toplevel
299 rv.rootlevel = self.rootlevel
300 rv.block_frame = self.block_frame
301 rv.require_yield = self.require_yield
302 rv.buffer_count = self.buffer_count
303 rv.is_async = self.is_async
304 rv.block_counters = self.block_counters.copy() # noqa: FURB145
305 rv.block_frame_id = self.block_frame_id
306 return rv
308 def inspect(self, nodes: t.Any | None = None) -> None:
309 if nodes:
310 for node in nodes:
311 self.symbols.analyze_node(node)
313 def push_scope(self) -> None:
314 pass
316 def pop_scope(self) -> None:
317 pass
319 def find_break(self) -> bool:
320 return False
322 def find_continue(self) -> bool:
323 return False
325 def inner(self) -> "AsyncFrame":
326 """Create an inner frame."""
327 return self.copy()
330class AsyncCodeGenerator(CodeGenerator):
331 environment: t.Any
332 name: str
333 filename: str
334 stream: t.Any
335 extends_so_far: int
336 has_known_extends: bool
337 root_frame_class: type[AsyncFrame] = AsyncFrame
338 eval_ctx: t.Any = None
339 is_async: bool = True
340 last_identifier: int = 0
341 identifiers: dict[str, t.Any] = {}
342 import_aliases: dict[str, t.Any] = {}
343 blocks: dict[str, t.Any] = {}
344 extends_buffer: t.Any = None
345 required_blocks: set[str] = set()
346 has_super: bool = False
347 macro_frames: list[AsyncFrame] = []
349 # Fast lookup cache for common variable names
350 _COMMON_VARS = frozenset(
351 [
352 "context",
353 "environment",
354 "eval_ctx",
355 "undefined",
356 "item",
357 "loop",
358 "block",
359 "value",
360 "name",
361 "key",
362 ]
363 )
365 def __init__(
366 self, environment: t.Any, name: str, filename: str, defer_init: bool = False
367 ) -> None:
368 super().__init__(
369 environment, name, filename, stream=None, defer_init=defer_init
370 )
371 self.extends_so_far = 0
372 self.has_known_extends = False
373 self.has_super = False
374 self.last_identifier = 0
375 self.identifiers = {}
376 self.import_aliases = {}
377 self.blocks = {}
378 self.extends_buffer = None
379 self.required_blocks = set()
380 self.is_async = True
381 self.macro_frames = []
383 # Initialize assignment tracking stack
384 self._assign_stack: list[set[str]] = []
386 # Initialize utility classes for better code organization
387 self._dependency_resolver = DependencyResolver(self)
388 self._loop_generator = LoopCodeGenerator(self)
390 # Note: Don't pre-populate self.filters here as it interferes with
391 # the pull_dependencies method that creates temporary identifiers
393 from jinja2.nodes import EvalContext
395 if self.eval_ctx is None:
396 self.eval_ctx = EvalContext(self.environment, self.name)
398 def choose_async(self, async_fmt: str = "async ", sync_fmt: str = "") -> str: # type: ignore[override]
399 return async_fmt if self.environment.enable_async else sync_fmt
401 def simple_write(self, value: str, frame: Frame) -> None: # type: ignore[override]
402 self.writeline(f"yield {value}")
404 def func_code_generator(self, frame: Frame) -> str:
405 async_frame = t.cast(AsyncFrame, frame)
406 return "async def" if async_frame.is_async else "def"
408 def func(self, name: str) -> str:
409 """Generate a function declaration for the given name."""
410 return f"def {name}"
412 def enter_frame(self, frame: Frame) -> None:
413 """Enter a new frame context."""
414 pass
416 def leave_frame(self, frame: Frame, with_python_scope: bool = False) -> None:
417 """Leave a frame context."""
418 pass
420 def return_buffer_contents(
421 self,
422 frame: Frame,
423 force_unescaped: bool = False, # noqa: ARG002
424 ) -> None:
425 _ = force_unescaped
426 if frame.buffer is not None:
427 self.writeline(f"return ''.join({frame.buffer})")
429 def visit_AsyncFor(self, node: t.Any, frame: Frame) -> None:
430 frame = t.cast(AsyncFrame, frame)
431 if hasattr(node, "recursive") and node.recursive:
432 raise NotImplementedError("Recursive loops not supported")
433 target = node.target
434 item = target.name if hasattr(target, "name") else "item"
435 frame.symbols.store(item)
436 self.writeline(f"{item} = None")
437 loop_filter = None
438 if hasattr(node, "test") and node.test:
439 loop_filter = self.temporary_identifier()
440 self.writeline(f"{loop_filter} = ", node.test)
441 self.visit(node.test, frame)
442 loop_var = self.temporary_identifier()
443 self.writeline(f"{loop_var} = -1", node)
444 self.writeline(f"async for {item} in ", node.iter)
445 self.visit(node.iter, frame)
446 self.write(":")
447 self.indent()
448 self.writeline(f"{loop_var} += 1")
449 if hasattr(node, "test") and node.test and loop_filter:
450 self.writeline(f"if {loop_filter}({item}):")
451 self.indent()
452 if hasattr(node, "body"):
453 self.blockvisit(node.body, frame)
454 if hasattr(node, "test") and node.test and loop_filter:
455 self.outdent()
456 self.outdent()
457 if hasattr(node, "else_") and node.else_:
458 self.writeline(f"if {loop_var} == -1:")
459 self.indent()
460 self.blockvisit(node.else_, frame)
461 self.outdent()
463 def visit_AsyncCall(self, node: t.Any, frame: Frame) -> None:
464 self.write("await ")
465 self.visit_Call(node, frame)
467 def visit_AsyncFilterBlock(self, node: t.Any, frame: Frame) -> None:
468 frame = t.cast(AsyncFrame, frame)
469 if not hasattr(node, "filter"):
470 return
471 if not hasattr(node, "body"):
472 return
473 filter_node = node.filter
474 buffer = self.temporary_identifier()
475 self.writeline(f"{buffer} = []")
476 asyncframe = frame.copy() # noqa: FURB145
477 asyncframe.buffer = buffer
478 asyncframe.toplevel = False
479 self.blockvisit(node.body, asyncframe)
480 self.writeline("await ", filter_node)
481 self.visit(filter_node, frame)
482 self.write(f"(''.join({buffer}))")
484 def visit_AsyncBlock(self, node: t.Any, frame: Frame) -> None:
485 frame = t.cast(AsyncFrame, frame)
486 if not hasattr(node, "name"):
487 return
488 if not hasattr(node, "body"):
489 return
490 block_name = node.name
491 self.writeline(f"blocks[{block_name!r}] = []")
492 block_func_name = f"block_{block_name}"
493 self.writeline(f"async def {block_func_name}(context):")
494 self.indent()
495 self.writeline("yield ''")
496 if node.body:
497 self.blockvisit(node.body, frame)
498 self.outdent()
499 self.writeline(f"blocks[{block_name!r}].append({block_func_name})")
501 def visit_Name(self, node: nodes.Name, frame: Frame) -> None:
502 frame = t.cast(AsyncFrame, frame)
503 self._handle_assignment_tracking(node, frame)
504 if self._handle_special_names(node):
505 return
506 self._handle_symbol_name(node, frame)
508 def _handle_assignment_tracking(self, node: nodes.Name, frame: AsyncFrame) -> None:
509 if node.ctx == "store":
510 # Store the variable in the frame's symbols
511 frame.symbols.store(node.name)
512 # Add to assignment tracking for context updates
513 if frame.toplevel or frame.loop_frame or frame.block_frame:
514 if hasattr(self, "_assign_stack") and self._assign_stack:
515 self._assign_stack[-1].add(node.name)
517 def _handle_special_names(self, node: nodes.Name) -> bool:
518 if node.name in ("blocks", "debug_info"):
519 self.write(node.name)
520 return True
521 return False
523 def _handle_symbol_name(self, node: nodes.Name, frame: AsyncFrame) -> None:
524 # Fast path for common variables
525 if node.name in self._COMMON_VARS and node.ctx == "load":
526 try:
527 ref = frame.symbols.ref(node.name)
528 self.write(ref)
529 return
530 except AssertionError:
531 self.write(f"context.get({node.name!r})")
532 return
534 # Standard path for other variables
535 try:
536 ref = frame.symbols.ref(node.name)
537 if node.ctx == "load" and self._should_use_undefined_check(ref, frame):
538 self.write(
539 f"(undefined(name={node.name!r}) if {ref} is missing else {ref})"
540 )
541 else:
542 self.write(ref)
543 except AssertionError:
544 if node.ctx == "load":
545 self.write(f"context.get({node.name!r})")
546 else:
547 self.write(f"context.vars[{node.name!r}]")
549 def _should_use_undefined_check(self, ref: str, frame: AsyncFrame) -> bool:
550 from jinja2.compiler import VAR_LOAD_PARAMETER
552 load = frame.symbols.find_load(ref)
553 return not (
554 load is not None
555 and load[0] == VAR_LOAD_PARAMETER
556 and hasattr(self, "parameter_is_undeclared")
557 and not self.parameter_is_undeclared(ref)
558 )
560 def pull_dependencies(self, nodes: t.Iterable[nodes.Node]) -> None:
561 """Find all filter and test names used in the template and assign them to variables."""
562 from jinja2.compiler import DependencyFinderVisitor
564 visitor = DependencyFinderVisitor()
565 for node in nodes:
566 visitor.visit(node)
568 # Set up filter dependencies using utility class
569 for name in sorted(visitor.filters):
570 self._dependency_resolver.setup_filter_dependency(name)
572 # Set up test dependencies using utility class
573 for name in sorted(visitor.tests):
574 self._dependency_resolver.setup_test_dependency(name)
576 def generate(self, node: nodes.Template) -> str:
577 self.writeline(f"name = {self.name!r}")
578 self.writeline("blocks = {}")
579 self.writeline("debug_info = None")
581 # Use optimized cached imports for better performance
582 for import_line in CompiledPatterns.get_optimized_imports().split("\n"):
583 self.writeline(import_line)
584 self.writeline("def undefined(name=None, **_):")
585 self.indent()
586 self.writeline("return Undefined(name=name)")
587 self.outdent()
588 self.writeline("async def auto_await(value):")
589 self.indent()
590 self.writeline("if hasattr(value, '__await__'):")
591 self.indent()
592 self.writeline("return await value")
593 self.outdent()
594 self.writeline("return value")
595 self.outdent()
596 self.writeline("filters = DEFAULT_FILTERS.copy()")
597 self.writeline("filters['escape'] = escape")
598 self.writeline("async def root(context):")
599 self.indent()
600 self.writeline("parent_template = None")
601 self.writeline("environment = context.environment")
602 self.writeline("eval_ctx = context.eval_ctx")
603 self.writeline("undefined = environment.undefined")
604 from jinja2.nodes import EvalContext
606 if self.eval_ctx is None:
607 self.eval_ctx = EvalContext(self.environment, self.name)
608 frame = self.root_frame_class(eval_ctx=self.eval_ctx)
609 frame.toplevel = frame.rootlevel = True
610 frame.require_output_check = False
611 frame.buffer = None
612 for macro in node.find_all(nodes.Macro):
613 frame.symbols.store(macro.name)
615 # Pull dependencies for filters and tests
616 self.pull_dependencies(node.body)
618 self.blockvisit(node.body, frame)
619 self.outdent()
621 # Apply pattern-based optimizations to generated code
622 generated_code = self.stream.getvalue()
623 return CompiledPatterns.optimize_generated_code(generated_code)
625 @classmethod
626 def compile_with_cache(
627 cls, environment: t.Any, source: str, name: str, filename: str
628 ) -> str:
629 """Compile template with caching support for improved performance."""
630 # Generate cache key
631 env_id = f"{id(environment)}:{environment.is_async}"
632 cache_key = _compilation_cache.get_cache_key(source, env_id)
634 # Check cache first
635 cached_code = _compilation_cache.get(cache_key)
636 if cached_code is not None:
637 return cached_code
639 # Compile and cache
640 generator = cls(environment, name, filename)
641 from jinja2 import parse
643 ast = parse(source, name, filename)
644 compiled_code = generator.generate(ast)
646 # Store in cache
647 _compilation_cache.set(cache_key, compiled_code)
648 return compiled_code
650 def visit_For(self, node: nodes.For, frame: Frame) -> None:
651 frame = t.cast(AsyncFrame, frame)
652 if node.recursive:
653 raise NotImplementedError("Recursive loops not supported")
655 # Create frames and setup
656 loop_frame, test_frame, else_frame = self._setup_for_frames(frame)
657 extended_loop, loop_ref = self._setup_for_loop_context(node, loop_frame)
659 # Analyze nodes for variable declarations
660 self._analyze_for_nodes(node, loop_frame, else_frame)
662 # Handle loop filter
663 loop_filter_func = self._setup_for_filter(node, test_frame, loop_frame)
665 # Setup loop variables and checks
666 self._setup_for_variables(node, extended_loop, loop_ref)
668 # Generate main loop
669 iteration_indicator = self._generate_for_loop(
670 node, frame, loop_frame, extended_loop, loop_ref, loop_filter_func
671 )
673 # Handle else clause
674 self._handle_for_else(node, else_frame, iteration_indicator)
676 # Cleanup
677 self._cleanup_for_assignments(loop_frame)
679 def _setup_for_frames(
680 self, frame: AsyncFrame
681 ) -> tuple[AsyncFrame, AsyncFrame, AsyncFrame]:
682 """Setup frames for different scopes in for loop."""
683 loop_frame = frame.inner()
684 loop_frame.loop_frame = True
685 test_frame = frame.inner()
686 else_frame = frame.inner()
687 return loop_frame, test_frame, else_frame
689 def _setup_for_loop_context(
690 self, node: nodes.For, loop_frame: AsyncFrame
691 ) -> tuple[bool, str | None]:
692 """Setup extended loop context and loop reference."""
693 extended_loop = (
694 node.recursive
695 or "loop"
696 in find_undeclared(node.iter_child_nodes(only=("body",)), ("loop",))
697 or any(block.scoped for block in node.find_all(nodes.Block))
698 )
700 loop_ref = None
701 if extended_loop:
702 loop_ref = loop_frame.symbols.declare_parameter("loop")
704 return extended_loop, loop_ref
706 def _analyze_for_nodes(
707 self, node: nodes.For, loop_frame: AsyncFrame, else_frame: AsyncFrame
708 ) -> None:
709 """Analyze nodes for variable declarations."""
710 loop_frame.symbols.analyze_node(node, for_branch="body")
711 if node.else_:
712 else_frame.symbols.analyze_node(node, for_branch="else")
714 def _setup_for_filter(
715 self, node: nodes.For, test_frame: AsyncFrame, loop_frame: AsyncFrame
716 ) -> str | None:
717 """Setup loop filter if present."""
718 if not node.test:
719 return None
721 loop_filter_func = self.temporary_identifier()
722 test_frame.symbols.analyze_node(node, for_branch="test")
723 self.writeline(f"{self.func(loop_filter_func)}(filter):", node.test)
724 self.indent()
725 self.enter_frame(test_frame)
726 self.writeline(self.choose_async("async for ", "for "))
727 self.visit(node.target, loop_frame)
728 self.write(" in ")
729 self.write(self.choose_async("auto_aiter(filter)", "filter"))
730 self.write(":")
731 self.indent()
732 self.writeline("if ", node.test)
733 self.visit(node.test, test_frame)
734 self.write(":")
735 self.indent()
736 self.writeline("yield ")
737 self.visit(node.target, loop_frame)
738 self.outdent(3)
739 self.leave_frame(test_frame, with_python_scope=True)
740 return loop_filter_func
742 def _setup_for_variables(
743 self, node: nodes.For, extended_loop: bool, loop_ref: str | None
744 ) -> None:
745 """Setup loop variables and check for conflicts."""
746 if extended_loop and loop_ref:
747 self.writeline(f"{loop_ref} = missing")
749 for name in node.find_all(nodes.Name):
750 if name.ctx == "store" and name.name == "loop":
751 self.fail(
752 "Can't assign to special loop variable in for-loop target",
753 name.lineno,
754 )
756 def _generate_for_loop(
757 self,
758 node: nodes.For,
759 frame: AsyncFrame,
760 loop_frame: AsyncFrame,
761 extended_loop: bool,
762 loop_ref: str | None,
763 loop_filter_func: str | None,
764 ) -> str | None:
765 """Generate the main for loop code."""
766 # Handle else clause iteration indicator
767 iteration_indicator = None
768 if node.else_:
769 iteration_indicator = self.temporary_identifier()
770 self.writeline(f"{iteration_indicator} = 1")
772 # Generate the main loop using utility class
773 self._loop_generator.generate_async_for_header(node, node.target, loop_frame)
774 self._loop_generator.generate_loop_iterator(
775 node.iter, frame, extended_loop, loop_ref, loop_filter_func
776 )
778 self.indent()
779 self.enter_frame(loop_frame)
781 self.writeline("_loop_vars = {}")
782 self.blockvisit(node.body, loop_frame)
783 if node.else_:
784 self.writeline(f"{iteration_indicator} = 0")
785 self.outdent()
786 self.leave_frame(loop_frame, with_python_scope=not node.else_)
788 return iteration_indicator
790 def _handle_for_else(
791 self, node: nodes.For, else_frame: AsyncFrame, iteration_indicator: str | None
792 ) -> None:
793 """Handle the else clause of for loop."""
794 if not node.else_ or not iteration_indicator:
795 return
797 self.writeline(f"if {iteration_indicator}:")
798 self.indent()
799 self.enter_frame(else_frame)
800 self.blockvisit(node.else_, else_frame)
801 self.leave_frame(else_frame)
802 self.outdent()
804 def _cleanup_for_assignments(self, loop_frame: AsyncFrame) -> None:
805 """Clear assignments made in the loop from the top level."""
806 if hasattr(self, "_assign_stack") and self._assign_stack:
807 self._assign_stack[-1].difference_update(loop_frame.symbols.stores)
809 def visit_Macro(self, node: nodes.Macro, frame: Frame) -> None:
810 """Visit a macro node and generate async-aware code."""
811 frame = t.cast(AsyncFrame, frame)
813 # For now, let's just use the base class implementation without modification
814 # This ensures macros work in sync mode, and we can enhance async support later
815 super().visit_Macro(node, frame)
817 def visit_Block(self, node: nodes.Block, frame: Frame) -> None:
818 frame = t.cast(AsyncFrame, frame)
819 block_name = node.name
820 self.writeline(f"blocks[{block_name!r}] = []")
821 block_func_name = f"block_{block_name}"
822 self.writeline(f"{self.choose_async()}def {block_func_name}(context):")
823 self.indent()
824 self.writeline("yield ''")
825 if node.body:
826 self.blockvisit(node.body, frame)
827 self.outdent()
828 self.writeline(f"blocks[{block_name!r}].append({block_func_name})")
829 level = 0
830 if frame.toplevel:
831 if self.has_known_extends:
832 return
833 if self.extends_so_far > 0:
834 self.writeline("if parent_template is None:")
835 self.indent()
836 level += 1
837 if node.scoped:
838 context = self.derive_context(frame)
839 else:
840 context = self.get_context_ref()
841 if node.required:
842 self.writeline(f"if len(context.blocks[{node.name!r}]) <= 1:", node)
843 self.indent()
844 self.writeline(
845 f'raise TemplateRuntimeError("Required block {node.name!r} not found")',
846 node,
847 )
848 self.outdent()
849 self.writeline("try:", node)
850 self.indent()
851 self.writeline(
852 f"async for event in context.blocks[{node.name!r}][0]({context}):",
853 node,
854 )
855 self.indent()
856 self.simple_write("event", frame)
857 self.outdent()
858 self.outdent()
859 self.writeline("except KeyError:", node)
860 self.indent()
861 self.writeline("yield ''")
862 self.outdent()
863 self.outdent(level)
865 def visit_Extends(self, node: nodes.Extends, frame: Frame) -> None:
866 frame = t.cast(AsyncFrame, frame)
867 if not frame.require_output_check:
868 raise CompilerExit()
869 if not frame.toplevel:
870 self.fail("cannot use extend from a non top-level scope", node.lineno)
871 if self.extends_so_far > 0:
872 if not self.has_known_extends:
873 self.writeline("if parent_template is not None:")
874 self.indent()
875 self.writeline('raise TemplateRuntimeError("extended multiple times")')
876 if self.has_known_extends:
877 raise CompilerExit()
878 else:
879 self.outdent()
880 self.writeline("parent_template = await environment.get_template_async(", node)
881 self.visit(node.template, frame)
882 self.write(f", {self.name!r})")
883 self.writeline("for name, parent_block in parent_template.blocks.items():")
884 self.indent()
885 self.writeline("context.blocks.setdefault(name, []).append(parent_block)")
886 self.outdent()
887 if frame.rootlevel:
888 self.has_known_extends = True
889 self.extends_so_far += 1
891 def visit_Include(self, node: nodes.Include, frame: Frame) -> None:
892 frame = t.cast(AsyncFrame, frame)
893 if node.ignore_missing:
894 self.writeline("try:")
895 self.indent()
896 self.writeline("template = await environment.get_template_async(", node)
897 self.visit(node.template, frame)
898 self.write(f", {self.name!r})")
899 if node.ignore_missing:
900 self.outdent()
901 self.writeline("except TemplateNotFound:")
902 self.indent()
903 self.writeline("pass")
904 self.outdent()
905 self.writeline("else:")
906 self.indent()
907 if node.with_context:
908 self.writeline(
909 f"async for event in template.root_render_func(template.new_context(context.get_all(), True, {self.dump_local_context(frame)})):"
910 )
911 else:
912 self.writeline(
913 "async for event in (await template._get_default_module_async())._body_stream:"
914 )
915 self.indent()
916 self.simple_write("event", frame)
917 self.outdent()
918 if node.ignore_missing:
919 self.outdent()
921 def _import_common(
922 self, node: nodes.Import | nodes.FromImport, frame: Frame
923 ) -> None:
924 frame = t.cast(AsyncFrame, frame)
925 self.writeline("template = await environment.get_template_async(", node)
926 self.visit(node.template, frame)
927 self.write(f", {self.name!r})")
929 def visit_Filter(self, node: nodes.Filter, frame: Frame) -> None:
930 """Visit a filter node and generate async-aware code."""
931 frame = t.cast(AsyncFrame, frame)
933 filter_ref = self._get_filter_reference(node)
934 func = self.environment.filters.get(node.name)
936 if self.environment.is_async:
937 self.write("(await auto_await(")
939 self.write(f"{filter_ref}(")
940 self._write_filter_special_params(func)
941 self._write_filter_input(node, frame)
942 self._write_filter_arguments(node, frame)
943 self.write(")")
945 if self.environment.is_async:
946 self.write("))")
948 def _get_filter_reference(self, node: nodes.Filter) -> str:
949 """Get the filter reference from dependencies or fallback to environment."""
950 if node.name in self.filters:
951 return self.filters[node.name]
952 return f"environment.filters[{node.name!r}]"
954 def _write_filter_special_params(self, func: t.Any) -> None:
955 """Write special parameters that some filters need."""
956 from jinja2.compiler import _PassArg
958 pass_arg = None
959 if func:
960 pass_arg_type = _PassArg.from_obj(func)
961 if pass_arg_type:
962 pass_arg = {
963 _PassArg.context: "context",
964 _PassArg.eval_context: "context.eval_ctx",
965 _PassArg.environment: "environment",
966 }.get(pass_arg_type)
968 if pass_arg is not None:
969 self.write(f"{pass_arg}, ")
971 def _write_filter_input(self, node: nodes.Filter, frame: AsyncFrame) -> None:
972 """Write the filter input value."""
973 if node.node is not None:
974 self.visit(node.node, frame)
975 elif frame.buffer is not None:
976 self._write_buffer_content(frame)
978 def _write_buffer_content(self, frame: AsyncFrame) -> None:
979 """Write buffer content for filter blocks."""
980 if frame.eval_ctx.volatile:
981 self.write(
982 f"(Markup(concat({frame.buffer}))"
983 f" if context.eval_ctx.autoescape else concat({frame.buffer}))"
984 )
985 elif frame.eval_ctx.autoescape:
986 self.write(f"Markup(concat({frame.buffer}))")
987 else:
988 self.write(f"concat({frame.buffer})")
990 def _write_filter_arguments(self, node: nodes.Filter, frame: AsyncFrame) -> None:
991 """Write filter arguments and keyword arguments."""
992 for arg in node.args:
993 self.write(", ")
994 self.visit(arg, frame)
996 for kwarg in node.kwargs:
997 self.write(", ")
998 self.visit(kwarg, frame)
1000 if node.dyn_args:
1001 self.write(", *")
1002 self.visit(node.dyn_args, frame)
1004 if node.dyn_kwargs:
1005 self.write(", **")
1006 self.visit(node.dyn_kwargs, frame)
1008 def visit_Assign(self, node: nodes.Assign, frame: Frame) -> None:
1009 """Visit an assignment node ({% set %} statements)."""
1010 frame = t.cast(AsyncFrame, frame)
1011 self.push_assign_tracking()
1013 # Check for namespace assignments like `ns.var = value`
1014 seen_refs: set[str] = set()
1015 for nsref in node.find_all(nodes.NSRef):
1016 if nsref.name in seen_refs:
1017 continue
1018 seen_refs.add(nsref.name)
1019 ref = frame.symbols.ref(nsref.name)
1020 self.writeline(f"if not isinstance({ref}, Namespace):")
1021 self.indent()
1022 self.writeline(
1023 "raise TemplateRuntimeError"
1024 '("cannot assign attribute on non-namespace object")'
1025 )
1026 self.outdent()
1028 # Generate the assignment code
1029 self.newline(node)
1030 self.visit(node.target, frame)
1031 self.write(" = ")
1032 self.visit(node.node, frame)
1033 self.pop_assign_tracking(frame)
1035 def push_assign_tracking(self) -> None:
1036 """Push a new layer for assignment tracking."""
1037 self._assign_stack.append(set())
1039 def pop_assign_tracking(self, frame: Frame) -> None:
1040 """Pop the topmost level for assignment tracking and update context variables."""
1041 frame = t.cast(AsyncFrame, frame)
1042 vars_set = self._assign_stack.pop()
1044 if (
1045 not frame.block_frame
1046 and not frame.loop_frame
1047 and not frame.toplevel
1048 or not vars_set
1049 ):
1050 return
1052 public_names = [x for x in vars_set if x[:1] != "_"]
1054 if len(vars_set) == 1:
1055 name = next(iter(vars_set))
1056 ref = frame.symbols.ref(name)
1057 if frame.loop_frame:
1058 self.writeline(f"_loop_vars[{name!r}] = {ref}")
1059 return
1060 if frame.block_frame:
1061 self.writeline(f"_block_vars[{name!r}] = {ref}")
1062 return
1063 self.writeline(f"context.vars[{name!r}] = {ref}")
1064 else:
1065 if frame.loop_frame:
1066 self.writeline("_loop_vars.update({")
1067 elif frame.block_frame:
1068 self.writeline("_block_vars.update({")
1069 else:
1070 self.writeline("context.vars.update({")
1071 for idx, name in enumerate(sorted(vars_set)):
1072 if idx:
1073 self.write(", ")
1074 ref = frame.symbols.ref(name)
1075 self.write(f"{name!r}: {ref}")
1076 self.write("})")
1078 if not frame.block_frame and not frame.loop_frame and public_names:
1079 if len(public_names) == 1:
1080 self.writeline(f"context.exported_vars.add({public_names[0]!r})")
1081 else:
1082 names_str = ", ".join(map(repr, sorted(public_names)))
1083 self.writeline(f"context.exported_vars.update(({names_str}))")