123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660 |
- import asyncio
- import pytest
- from jinja2 import ChainableUndefined
- from jinja2 import DictLoader
- from jinja2 import Environment
- from jinja2 import Template
- from jinja2.async_utils import auto_aiter
- from jinja2.exceptions import TemplateNotFound
- from jinja2.exceptions import TemplatesNotFound
- from jinja2.exceptions import UndefinedError
- from jinja2.nativetypes import NativeEnvironment
- def test_basic_async():
- t = Template(
- "{% for item in [1, 2, 3] %}[{{ item }}]{% endfor %}", enable_async=True
- )
- async def func():
- return await t.render_async()
- rv = asyncio.run(func())
- assert rv == "[1][2][3]"
- def test_await_on_calls():
- t = Template("{{ async_func() + normal_func() }}", enable_async=True)
- async def async_func():
- return 42
- def normal_func():
- return 23
- async def func():
- return await t.render_async(async_func=async_func, normal_func=normal_func)
- rv = asyncio.run(func())
- assert rv == "65"
- def test_await_on_calls_normal_render():
- t = Template("{{ async_func() + normal_func() }}", enable_async=True)
- async def async_func():
- return 42
- def normal_func():
- return 23
- rv = t.render(async_func=async_func, normal_func=normal_func)
- assert rv == "65"
- def test_await_and_macros():
- t = Template(
- "{% macro foo(x) %}[{{ x }}][{{ async_func() }}]{% endmacro %}{{ foo(42) }}",
- enable_async=True,
- )
- async def async_func():
- return 42
- async def func():
- return await t.render_async(async_func=async_func)
- rv = asyncio.run(func())
- assert rv == "[42][42]"
- def test_async_blocks():
- t = Template(
- "{% block foo %}<Test>{% endblock %}{{ self.foo() }}",
- enable_async=True,
- autoescape=True,
- )
- async def func():
- return await t.render_async()
- rv = asyncio.run(func())
- assert rv == "<Test><Test>"
- def test_async_generate():
- t = Template("{% for x in [1, 2, 3] %}{{ x }}{% endfor %}", enable_async=True)
- rv = list(t.generate())
- assert rv == ["1", "2", "3"]
- def test_async_iteration_in_templates():
- t = Template("{% for x in rng %}{{ x }}{% endfor %}", enable_async=True)
- async def async_iterator():
- for item in [1, 2, 3]:
- yield item
- rv = list(t.generate(rng=async_iterator()))
- assert rv == ["1", "2", "3"]
- def test_async_iteration_in_templates_extended():
- t = Template(
- "{% for x in rng %}{{ loop.index0 }}/{{ x }}{% endfor %}", enable_async=True
- )
- stream = t.generate(rng=auto_aiter(range(1, 4)))
- assert next(stream) == "0"
- assert "".join(stream) == "/11/22/3"
- @pytest.fixture
- def test_env_async():
- env = Environment(
- loader=DictLoader(
- dict(
- module="{% macro test() %}[{{ foo }}|{{ bar }}]{% endmacro %}",
- header="[{{ foo }}|{{ 23 }}]",
- o_printer="({{ o }})",
- )
- ),
- enable_async=True,
- )
- env.globals["bar"] = 23
- return env
- class TestAsyncImports:
- def test_context_imports(self, test_env_async):
- t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}')
- assert t.render(foo=42) == "[|23]"
- t = test_env_async.from_string(
- '{% import "module" as m without context %}{{ m.test() }}'
- )
- assert t.render(foo=42) == "[|23]"
- t = test_env_async.from_string(
- '{% import "module" as m with context %}{{ m.test() }}'
- )
- assert t.render(foo=42) == "[42|23]"
- t = test_env_async.from_string('{% from "module" import test %}{{ test() }}')
- assert t.render(foo=42) == "[|23]"
- t = test_env_async.from_string(
- '{% from "module" import test without context %}{{ test() }}'
- )
- assert t.render(foo=42) == "[|23]"
- t = test_env_async.from_string(
- '{% from "module" import test with context %}{{ test() }}'
- )
- assert t.render(foo=42) == "[42|23]"
- def test_trailing_comma(self, test_env_async):
- test_env_async.from_string('{% from "foo" import bar, baz with context %}')
- test_env_async.from_string('{% from "foo" import bar, baz, with context %}')
- test_env_async.from_string('{% from "foo" import bar, with context %}')
- test_env_async.from_string('{% from "foo" import bar, with, context %}')
- test_env_async.from_string('{% from "foo" import bar, with with context %}')
- def test_exports(self, test_env_async):
- coro = test_env_async.from_string(
- """
- {% macro toplevel() %}...{% endmacro %}
- {% macro __private() %}...{% endmacro %}
- {% set variable = 42 %}
- {% for item in [1] %}
- {% macro notthere() %}{% endmacro %}
- {% endfor %}
- """
- )._get_default_module_async()
- m = asyncio.run(coro)
- assert asyncio.run(m.toplevel()) == "..."
- assert not hasattr(m, "__missing")
- assert m.variable == 42
- assert not hasattr(m, "notthere")
- def test_import_with_globals(self, test_env_async):
- t = test_env_async.from_string(
- '{% import "module" as m %}{{ m.test() }}', globals={"foo": 42}
- )
- assert t.render() == "[42|23]"
- t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}')
- assert t.render() == "[|23]"
- def test_import_with_globals_override(self, test_env_async):
- t = test_env_async.from_string(
- '{% set foo = 41 %}{% import "module" as m %}{{ m.test() }}',
- globals={"foo": 42},
- )
- assert t.render() == "[42|23]"
- def test_from_import_with_globals(self, test_env_async):
- t = test_env_async.from_string(
- '{% from "module" import test %}{{ test() }}',
- globals={"foo": 42},
- )
- assert t.render() == "[42|23]"
- class TestAsyncIncludes:
- def test_context_include(self, test_env_async):
- t = test_env_async.from_string('{% include "header" %}')
- assert t.render(foo=42) == "[42|23]"
- t = test_env_async.from_string('{% include "header" with context %}')
- assert t.render(foo=42) == "[42|23]"
- t = test_env_async.from_string('{% include "header" without context %}')
- assert t.render(foo=42) == "[|23]"
- def test_choice_includes(self, test_env_async):
- t = test_env_async.from_string('{% include ["missing", "header"] %}')
- assert t.render(foo=42) == "[42|23]"
- t = test_env_async.from_string(
- '{% include ["missing", "missing2"] ignore missing %}'
- )
- assert t.render(foo=42) == ""
- t = test_env_async.from_string('{% include ["missing", "missing2"] %}')
- pytest.raises(TemplateNotFound, t.render)
- with pytest.raises(TemplatesNotFound) as e:
- t.render()
- assert e.value.templates == ["missing", "missing2"]
- assert e.value.name == "missing2"
- def test_includes(t, **ctx):
- ctx["foo"] = 42
- assert t.render(ctx) == "[42|23]"
- t = test_env_async.from_string('{% include ["missing", "header"] %}')
- test_includes(t)
- t = test_env_async.from_string("{% include x %}")
- test_includes(t, x=["missing", "header"])
- t = test_env_async.from_string('{% include [x, "header"] %}')
- test_includes(t, x="missing")
- t = test_env_async.from_string("{% include x %}")
- test_includes(t, x="header")
- t = test_env_async.from_string("{% include x %}")
- test_includes(t, x="header")
- t = test_env_async.from_string("{% include [x] %}")
- test_includes(t, x="header")
- def test_include_ignoring_missing(self, test_env_async):
- t = test_env_async.from_string('{% include "missing" %}')
- pytest.raises(TemplateNotFound, t.render)
- for extra in "", "with context", "without context":
- t = test_env_async.from_string(
- '{% include "missing" ignore missing ' + extra + " %}"
- )
- assert t.render() == ""
- def test_context_include_with_overrides(self, test_env_async):
- env = Environment(
- loader=DictLoader(
- dict(
- main="{% for item in [1, 2, 3] %}{% include 'item' %}{% endfor %}",
- item="{{ item }}",
- )
- )
- )
- assert env.get_template("main").render() == "123"
- def test_unoptimized_scopes(self, test_env_async):
- t = test_env_async.from_string(
- """
- {% macro outer(o) %}
- {% macro inner() %}
- {% include "o_printer" %}
- {% endmacro %}
- {{ inner() }}
- {% endmacro %}
- {{ outer("FOO") }}
- """
- )
- assert t.render().strip() == "(FOO)"
- def test_unoptimized_scopes_autoescape(self):
- env = Environment(
- loader=DictLoader({"o_printer": "({{ o }})"}),
- autoescape=True,
- enable_async=True,
- )
- t = env.from_string(
- """
- {% macro outer(o) %}
- {% macro inner() %}
- {% include "o_printer" %}
- {% endmacro %}
- {{ inner() }}
- {% endmacro %}
- {{ outer("FOO") }}
- """
- )
- assert t.render().strip() == "(FOO)"
- class TestAsyncForLoop:
- def test_simple(self, test_env_async):
- tmpl = test_env_async.from_string("{% for item in seq %}{{ item }}{% endfor %}")
- assert tmpl.render(seq=list(range(10))) == "0123456789"
- def test_else(self, test_env_async):
- tmpl = test_env_async.from_string(
- "{% for item in seq %}XXX{% else %}...{% endfor %}"
- )
- assert tmpl.render() == "..."
- def test_empty_blocks(self, test_env_async):
- tmpl = test_env_async.from_string(
- "<{% for item in seq %}{% else %}{% endfor %}>"
- )
- assert tmpl.render() == "<>"
- @pytest.mark.parametrize(
- "transform", [lambda x: x, iter, reversed, lambda x: (i for i in x), auto_aiter]
- )
- def test_context_vars(self, test_env_async, transform):
- t = test_env_async.from_string(
- "{% for item in seq %}{{ loop.index }}|{{ loop.index0 }}"
- "|{{ loop.revindex }}|{{ loop.revindex0 }}|{{ loop.first }}"
- "|{{ loop.last }}|{{ loop.length }}\n{% endfor %}"
- )
- out = t.render(seq=transform([42, 24]))
- assert out == "1|0|2|1|True|False|2\n2|1|1|0|False|True|2\n"
- def test_cycling(self, test_env_async):
- tmpl = test_env_async.from_string(
- """{% for item in seq %}{{
- loop.cycle('<1>', '<2>') }}{% endfor %}{%
- for item in seq %}{{ loop.cycle(*through) }}{% endfor %}"""
- )
- output = tmpl.render(seq=list(range(4)), through=("<1>", "<2>"))
- assert output == "<1><2>" * 4
- def test_lookaround(self, test_env_async):
- tmpl = test_env_async.from_string(
- """{% for item in seq -%}
- {{ loop.previtem|default('x') }}-{{ item }}-{{
- loop.nextitem|default('x') }}|
- {%- endfor %}"""
- )
- output = tmpl.render(seq=list(range(4)))
- assert output == "x-0-1|0-1-2|1-2-3|2-3-x|"
- def test_changed(self, test_env_async):
- tmpl = test_env_async.from_string(
- """{% for item in seq -%}
- {{ loop.changed(item) }},
- {%- endfor %}"""
- )
- output = tmpl.render(seq=[None, None, 1, 2, 2, 3, 4, 4, 4])
- assert output == "True,False,True,True,False,True,True,False,False,"
- def test_scope(self, test_env_async):
- tmpl = test_env_async.from_string("{% for item in seq %}{% endfor %}{{ item }}")
- output = tmpl.render(seq=list(range(10)))
- assert not output
- def test_varlen(self, test_env_async):
- def inner():
- yield from range(5)
- tmpl = test_env_async.from_string(
- "{% for item in iter %}{{ item }}{% endfor %}"
- )
- output = tmpl.render(iter=inner())
- assert output == "01234"
- def test_noniter(self, test_env_async):
- tmpl = test_env_async.from_string("{% for item in none %}...{% endfor %}")
- pytest.raises(TypeError, tmpl.render)
- def test_recursive(self, test_env_async):
- tmpl = test_env_async.from_string(
- """{% for item in seq recursive -%}
- [{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
- {%- endfor %}"""
- )
- assert (
- tmpl.render(
- seq=[
- dict(a=1, b=[dict(a=1), dict(a=2)]),
- dict(a=2, b=[dict(a=1), dict(a=2)]),
- dict(a=3, b=[dict(a="a")]),
- ]
- )
- == "[1<[1][2]>][2<[1][2]>][3<[a]>]"
- )
- def test_recursive_lookaround(self, test_env_async):
- tmpl = test_env_async.from_string(
- """{% for item in seq recursive -%}
- [{{ loop.previtem.a if loop.previtem is defined else 'x' }}.{{
- item.a }}.{{ loop.nextitem.a if loop.nextitem is defined else 'x'
- }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
- {%- endfor %}"""
- )
- assert (
- tmpl.render(
- seq=[
- dict(a=1, b=[dict(a=1), dict(a=2)]),
- dict(a=2, b=[dict(a=1), dict(a=2)]),
- dict(a=3, b=[dict(a="a")]),
- ]
- )
- == "[x.1.2<[x.1.2][1.2.x]>][1.2.3<[x.1.2][1.2.x]>][2.3.x<[x.a.x]>]"
- )
- def test_recursive_depth0(self, test_env_async):
- tmpl = test_env_async.from_string(
- "{% for item in seq recursive %}[{{ loop.depth0 }}:{{ item.a }}"
- "{% if item.b %}<{{ loop(item.b) }}>{% endif %}]{% endfor %}"
- )
- assert (
- tmpl.render(
- seq=[
- dict(a=1, b=[dict(a=1), dict(a=2)]),
- dict(a=2, b=[dict(a=1), dict(a=2)]),
- dict(a=3, b=[dict(a="a")]),
- ]
- )
- == "[0:1<[1:1][1:2]>][0:2<[1:1][1:2]>][0:3<[1:a]>]"
- )
- def test_recursive_depth(self, test_env_async):
- tmpl = test_env_async.from_string(
- "{% for item in seq recursive %}[{{ loop.depth }}:{{ item.a }}"
- "{% if item.b %}<{{ loop(item.b) }}>{% endif %}]{% endfor %}"
- )
- assert (
- tmpl.render(
- seq=[
- dict(a=1, b=[dict(a=1), dict(a=2)]),
- dict(a=2, b=[dict(a=1), dict(a=2)]),
- dict(a=3, b=[dict(a="a")]),
- ]
- )
- == "[1:1<[2:1][2:2]>][1:2<[2:1][2:2]>][1:3<[2:a]>]"
- )
- def test_looploop(self, test_env_async):
- tmpl = test_env_async.from_string(
- """{% for row in table %}
- {%- set rowloop = loop -%}
- {% for cell in row -%}
- [{{ rowloop.index }}|{{ loop.index }}]
- {%- endfor %}
- {%- endfor %}"""
- )
- assert tmpl.render(table=["ab", "cd"]) == "[1|1][1|2][2|1][2|2]"
- def test_reversed_bug(self, test_env_async):
- tmpl = test_env_async.from_string(
- "{% for i in items %}{{ i }}"
- "{% if not loop.last %}"
- ",{% endif %}{% endfor %}"
- )
- assert tmpl.render(items=reversed([3, 2, 1])) == "1,2,3"
- def test_loop_errors(self, test_env_async):
- tmpl = test_env_async.from_string(
- """{% for item in [1] if loop.index
- == 0 %}...{% endfor %}"""
- )
- pytest.raises(UndefinedError, tmpl.render)
- tmpl = test_env_async.from_string(
- """{% for item in [] %}...{% else
- %}{{ loop }}{% endfor %}"""
- )
- assert tmpl.render() == ""
- def test_loop_filter(self, test_env_async):
- tmpl = test_env_async.from_string(
- "{% for item in range(10) if item is even %}[{{ item }}]{% endfor %}"
- )
- assert tmpl.render() == "[0][2][4][6][8]"
- tmpl = test_env_async.from_string(
- """
- {%- for item in range(10) if item is even %}[{{
- loop.index }}:{{ item }}]{% endfor %}"""
- )
- assert tmpl.render() == "[1:0][2:2][3:4][4:6][5:8]"
- def test_scoped_special_var(self, test_env_async):
- t = test_env_async.from_string(
- "{% for s in seq %}[{{ loop.first }}{% for c in s %}"
- "|{{ loop.first }}{% endfor %}]{% endfor %}"
- )
- assert t.render(seq=("ab", "cd")) == "[True|True|False][False|True|False]"
- def test_scoped_loop_var(self, test_env_async):
- t = test_env_async.from_string(
- "{% for x in seq %}{{ loop.first }}"
- "{% for y in seq %}{% endfor %}{% endfor %}"
- )
- assert t.render(seq="ab") == "TrueFalse"
- t = test_env_async.from_string(
- "{% for x in seq %}{% for y in seq %}"
- "{{ loop.first }}{% endfor %}{% endfor %}"
- )
- assert t.render(seq="ab") == "TrueFalseTrueFalse"
- def test_recursive_empty_loop_iter(self, test_env_async):
- t = test_env_async.from_string(
- """
- {%- for item in foo recursive -%}{%- endfor -%}
- """
- )
- assert t.render(dict(foo=[])) == ""
- def test_call_in_loop(self, test_env_async):
- t = test_env_async.from_string(
- """
- {%- macro do_something() -%}
- [{{ caller() }}]
- {%- endmacro %}
- {%- for i in [1, 2, 3] %}
- {%- call do_something() -%}
- {{ i }}
- {%- endcall %}
- {%- endfor -%}
- """
- )
- assert t.render() == "[1][2][3]"
- def test_scoping_bug(self, test_env_async):
- t = test_env_async.from_string(
- """
- {%- for item in foo %}...{{ item }}...{% endfor %}
- {%- macro item(a) %}...{{ a }}...{% endmacro %}
- {{- item(2) -}}
- """
- )
- assert t.render(foo=(1,)) == "...1......2..."
- def test_unpacking(self, test_env_async):
- tmpl = test_env_async.from_string(
- "{% for a, b, c in [[1, 2, 3]] %}{{ a }}|{{ b }}|{{ c }}{% endfor %}"
- )
- assert tmpl.render() == "1|2|3"
- def test_recursive_loop_filter(self, test_env_async):
- t = test_env_async.from_string(
- """
- <?xml version="1.0" encoding="UTF-8"?>
- <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
- {%- for page in [site.root] if page.url != this recursive %}
- <url><loc>{{ page.url }}</loc></url>
- {{- loop(page.children) }}
- {%- endfor %}
- </urlset>
- """
- )
- sm = t.render(
- this="/foo",
- site={"root": {"url": "/", "children": [{"url": "/foo"}, {"url": "/bar"}]}},
- )
- lines = [x.strip() for x in sm.splitlines() if x.strip()]
- assert lines == [
- '<?xml version="1.0" encoding="UTF-8"?>',
- '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
- "<url><loc>/</loc></url>",
- "<url><loc>/bar</loc></url>",
- "</urlset>",
- ]
- def test_nonrecursive_loop_filter(self, test_env_async):
- t = test_env_async.from_string(
- """
- <?xml version="1.0" encoding="UTF-8"?>
- <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
- {%- for page in items if page.url != this %}
- <url><loc>{{ page.url }}</loc></url>
- {%- endfor %}
- </urlset>
- """
- )
- sm = t.render(
- this="/foo", items=[{"url": "/"}, {"url": "/foo"}, {"url": "/bar"}]
- )
- lines = [x.strip() for x in sm.splitlines() if x.strip()]
- assert lines == [
- '<?xml version="1.0" encoding="UTF-8"?>',
- '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
- "<url><loc>/</loc></url>",
- "<url><loc>/bar</loc></url>",
- "</urlset>",
- ]
- def test_bare_async(self, test_env_async):
- t = test_env_async.from_string('{% extends "header" %}')
- assert t.render(foo=42) == "[42|23]"
- def test_awaitable_property_slicing(self, test_env_async):
- t = test_env_async.from_string("{% for x in a.b[:1] %}{{ x }}{% endfor %}")
- assert t.render(a=dict(b=[1, 2, 3])) == "1"
- def test_namespace_awaitable(test_env_async):
- async def _test():
- t = test_env_async.from_string(
- '{% set ns = namespace(foo="Bar") %}{{ ns.foo }}'
- )
- actual = await t.render_async()
- assert actual == "Bar"
- asyncio.run(_test())
- def test_chainable_undefined_aiter():
- async def _test():
- t = Template(
- "{% for x in a['b']['c'] %}{{ x }}{% endfor %}",
- enable_async=True,
- undefined=ChainableUndefined,
- )
- rv = await t.render_async(a={})
- assert rv == ""
- asyncio.run(_test())
- @pytest.fixture
- def async_native_env():
- return NativeEnvironment(enable_async=True)
- def test_native_async(async_native_env):
- async def _test():
- t = async_native_env.from_string("{{ x }}")
- rv = await t.render_async(x=23)
- assert rv == 23
- asyncio.run(_test())
- def test_native_list_async(async_native_env):
- async def _test():
- t = async_native_env.from_string("{{ x }}")
- rv = await t.render_async(x=list(range(3)))
- assert rv == [0, 1, 2]
- asyncio.run(_test())
- def test_getitem_after_filter():
- env = Environment(enable_async=True)
- env.filters["add_each"] = lambda v, x: [i + x for i in v]
- t = env.from_string("{{ (a|add_each(2))[1:] }}")
- out = t.render(a=range(3))
- assert out == "[3, 4]"
- def test_getitem_after_call():
- env = Environment(enable_async=True)
- env.globals["add_each"] = lambda v, x: [i + x for i in v]
- t = env.from_string("{{ add_each(a, 2)[1:] }}")
- out = t.render(a=range(3))
- assert out == "[3, 4]"
|