123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- # -*- test-case-name: twisted.web.test.test_flatten,twisted.web.test.test_template -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Context-free flattener/serializer for rendering Python objects, possibly
- complex or arbitrarily nested, as strings.
- """
- from __future__ import annotations
- from inspect import iscoroutine
- from io import BytesIO
- from sys import exc_info
- from traceback import extract_tb
- from types import GeneratorType
- from typing import (
- Any,
- Callable,
- Coroutine,
- Generator,
- List,
- Mapping,
- Optional,
- Sequence,
- Tuple,
- TypeVar,
- Union,
- cast,
- )
- from twisted.internet.defer import Deferred, ensureDeferred
- from twisted.python.compat import nativeString
- from twisted.python.failure import Failure
- from twisted.web._stan import CDATA, CharRef, Comment, Tag, slot, voidElements
- from twisted.web.error import FlattenerError, UnfilledSlot, UnsupportedType
- from twisted.web.iweb import IRenderable, IRequest
- T = TypeVar("T")
- FlattenableRecursive = Any
- """
- For documentation purposes, read C{FlattenableRecursive} as L{Flattenable}.
- However, since mypy doesn't support recursive type definitions (yet?),
- we'll put Any in the actual definition.
- """
- Flattenable = Union[
- bytes,
- str,
- slot,
- CDATA,
- Comment,
- Tag,
- Tuple[FlattenableRecursive, ...],
- List[FlattenableRecursive],
- Generator[FlattenableRecursive, None, None],
- CharRef,
- Deferred[FlattenableRecursive],
- Coroutine[Deferred[FlattenableRecursive], object, FlattenableRecursive],
- IRenderable,
- ]
- """
- Type alias containing all types that can be flattened by L{flatten()}.
- """
- # The maximum number of bytes to synchronously accumulate in the flattener
- # buffer before delivering them onwards.
- BUFFER_SIZE = 2**16
- def escapeForContent(data: Union[bytes, str]) -> bytes:
- """
- Escape some character or UTF-8 byte data for inclusion in an HTML or XML
- document, by replacing metacharacters (C{&<>}) with their entity
- equivalents (C{&<>}).
- This is used as an input to L{_flattenElement}'s C{dataEscaper} parameter.
- @param data: The string to escape.
- @return: The quoted form of C{data}. If C{data} is L{str}, return a utf-8
- encoded string.
- """
- if isinstance(data, str):
- data = data.encode("utf-8")
- data = data.replace(b"&", b"&").replace(b"<", b"<").replace(b">", b">")
- return data
- def attributeEscapingDoneOutside(data: Union[bytes, str]) -> bytes:
- """
- Escape some character or UTF-8 byte data for inclusion in the top level of
- an attribute. L{attributeEscapingDoneOutside} actually passes the data
- through unchanged, because L{writeWithAttributeEscaping} handles the
- quoting of the text within attributes outside the generator returned by
- L{_flattenElement}; this is used as the C{dataEscaper} argument to that
- L{_flattenElement} call so that that generator does not redundantly escape
- its text output.
- @param data: The string to escape.
- @return: The string, unchanged, except for encoding.
- """
- if isinstance(data, str):
- return data.encode("utf-8")
- return data
- def writeWithAttributeEscaping(
- write: Callable[[bytes], object]
- ) -> Callable[[bytes], None]:
- """
- Decorate a C{write} callable so that all output written is properly quoted
- for inclusion within an XML attribute value.
- If a L{Tag <twisted.web.template.Tag>} C{x} is flattened within the context
- of the contents of another L{Tag <twisted.web.template.Tag>} C{y}, the
- metacharacters (C{<>&"}) delimiting C{x} should be passed through
- unchanged, but the textual content of C{x} should still be quoted, as
- usual. For example: C{<y><x>&</x></y>}. That is the default behavior
- of L{_flattenElement} when L{escapeForContent} is passed as the
- C{dataEscaper}.
- However, when a L{Tag <twisted.web.template.Tag>} C{x} is flattened within
- the context of an I{attribute} of another L{Tag <twisted.web.template.Tag>}
- C{y}, then the metacharacters delimiting C{x} should be quoted so that it
- can be parsed from the attribute's value. In the DOM itself, this is not a
- valid thing to do, but given that renderers and slots may be freely moved
- around in a L{twisted.web.template} template, it is a condition which may
- arise in a document and must be handled in a way which produces valid
- output. So, for example, you should be able to get C{<y attr="<x />"
- />}. This should also be true for other XML/HTML meta-constructs such as
- comments and CDATA, so if you were to serialize a L{comment
- <twisted.web.template.Comment>} in an attribute you should get C{<y
- attr="<-- comment -->" />}. Therefore in order to capture these
- meta-characters, flattening is done with C{write} callable that is wrapped
- with L{writeWithAttributeEscaping}.
- The final case, and hopefully the much more common one as compared to
- serializing L{Tag <twisted.web.template.Tag>} and arbitrary L{IRenderable}
- objects within an attribute, is to serialize a simple string, and those
- should be passed through for L{writeWithAttributeEscaping} to quote
- without applying a second, redundant level of quoting.
- @param write: A callable which will be invoked with the escaped L{bytes}.
- @return: A callable that writes data with escaping.
- """
- def _write(data: bytes) -> None:
- write(escapeForContent(data).replace(b'"', b"""))
- return _write
- def escapedCDATA(data: Union[bytes, str]) -> bytes:
- """
- Escape CDATA for inclusion in a document.
- @param data: The string to escape.
- @return: The quoted form of C{data}. If C{data} is unicode, return a utf-8
- encoded string.
- """
- if isinstance(data, str):
- data = data.encode("utf-8")
- return data.replace(b"]]>", b"]]]]><![CDATA[>")
- def escapedComment(data: Union[bytes, str]) -> bytes:
- """
- Within comments the sequence C{-->} can be mistaken as the end of the comment.
- To ensure consistent parsing and valid output the sequence is replaced with C{-->}.
- Furthermore, whitespace is added when a comment ends in a dash. This is done to break
- the connection of the ending C{-} with the closing C{-->}.
- @param data: The string to escape.
- @return: The quoted form of C{data}. If C{data} is unicode, return a utf-8
- encoded string.
- """
- if isinstance(data, str):
- data = data.encode("utf-8")
- data = data.replace(b"-->", b"-->")
- if data and data[-1:] == b"-":
- data += b" "
- return data
- def _getSlotValue(
- name: str,
- slotData: Sequence[Optional[Mapping[str, Flattenable]]],
- default: Optional[Flattenable] = None,
- ) -> Flattenable:
- """
- Find the value of the named slot in the given stack of slot data.
- """
- for slotFrame in reversed(slotData):
- if slotFrame is not None and name in slotFrame:
- return slotFrame[name]
- else:
- if default is not None:
- return default
- raise UnfilledSlot(name)
- def _fork(d: Deferred[T]) -> Deferred[T]:
- """
- Create a new L{Deferred} based on C{d} that will fire and fail with C{d}'s
- result or error, but will not modify C{d}'s callback type.
- """
- d2: Deferred[T] = Deferred(lambda _: d.cancel())
- def callback(result: T) -> T:
- d2.callback(result)
- return result
- def errback(failure: Failure) -> Failure:
- d2.errback(failure)
- return failure
- d.addCallbacks(callback, errback)
- return d2
- def _flattenElement(
- request: Optional[IRequest],
- root: Flattenable,
- write: Callable[[bytes], object],
- slotData: List[Optional[Mapping[str, Flattenable]]],
- renderFactory: Optional[IRenderable],
- dataEscaper: Callable[[Union[bytes, str]], bytes],
- # This is annotated as Generator[T, None, None] instead of Iterator[T]
- # because mypy does not consider an Iterator to be an instance of
- # GeneratorType.
- ) -> Generator[Union[Generator[Any, Any, Any], Deferred[Flattenable]], None, None]:
- """
- Make C{root} slightly more flat by yielding all its immediate contents as
- strings, deferreds or generators that are recursive calls to itself.
- @param request: A request object which will be passed to
- L{IRenderable.render}.
- @param root: An object to be made flatter. This may be of type C{unicode},
- L{str}, L{slot}, L{Tag <twisted.web.template.Tag>}, L{tuple}, L{list},
- L{types.GeneratorType}, L{Deferred}, or an object that implements
- L{IRenderable}.
- @param write: A callable which will be invoked with each L{bytes} produced
- by flattening C{root}.
- @param slotData: A L{list} of L{dict} mapping L{str} slot names to data
- with which those slots will be replaced.
- @param renderFactory: If not L{None}, an object that provides
- L{IRenderable}.
- @param dataEscaper: A 1-argument callable which takes L{bytes} or
- L{unicode} and returns L{bytes}, quoted as appropriate for the
- rendering context. This is really only one of two values:
- L{attributeEscapingDoneOutside} or L{escapeForContent}, depending on
- whether the rendering context is within an attribute or not. See the
- explanation in L{writeWithAttributeEscaping}.
- @return: An iterator that eventually writes L{bytes} to C{write}.
- It can yield other iterators or L{Deferred}s; if it yields another
- iterator, the caller will iterate it; if it yields a L{Deferred},
- the result of that L{Deferred} will be another generator, in which
- case it is iterated. See L{_flattenTree} for the trampoline that
- consumes said values.
- """
- def keepGoing(
- newRoot: Flattenable,
- dataEscaper: Callable[[Union[bytes, str]], bytes] = dataEscaper,
- renderFactory: Optional[IRenderable] = renderFactory,
- write: Callable[[bytes], object] = write,
- ) -> Generator[Union[Flattenable, Deferred[Flattenable]], None, None]:
- return _flattenElement(
- request, newRoot, write, slotData, renderFactory, dataEscaper
- )
- def keepGoingAsync(result: Deferred[Flattenable]) -> Deferred[Flattenable]:
- return result.addCallback(keepGoing)
- if isinstance(root, (bytes, str)):
- write(dataEscaper(root))
- elif isinstance(root, slot):
- slotValue = _getSlotValue(root.name, slotData, root.default)
- yield keepGoing(slotValue)
- elif isinstance(root, CDATA):
- write(b"<![CDATA[")
- write(escapedCDATA(root.data))
- write(b"]]>")
- elif isinstance(root, Comment):
- write(b"<!--")
- write(escapedComment(root.data))
- write(b"-->")
- elif isinstance(root, Tag):
- slotData.append(root.slotData)
- rendererName = root.render
- if rendererName is not None:
- if renderFactory is None:
- raise ValueError(
- f'Tag wants to be rendered by method "{rendererName}" '
- f"but is not contained in any IRenderable"
- )
- rootClone = root.clone(False)
- rootClone.render = None
- renderMethod = renderFactory.lookupRenderMethod(rendererName)
- result = renderMethod(request, rootClone)
- yield keepGoing(result)
- slotData.pop()
- return
- if not root.tagName:
- yield keepGoing(root.children)
- return
- write(b"<")
- if isinstance(root.tagName, str):
- tagName = root.tagName.encode("ascii")
- else:
- tagName = root.tagName
- write(tagName)
- for k, v in root.attributes.items():
- if isinstance(k, str):
- k = k.encode("ascii")
- write(b" " + k + b'="')
- # Serialize the contents of the attribute, wrapping the results of
- # that serialization so that _everything_ is quoted.
- yield keepGoing(
- v, attributeEscapingDoneOutside, write=writeWithAttributeEscaping(write)
- )
- write(b'"')
- if root.children or nativeString(tagName) not in voidElements:
- write(b">")
- # Regardless of whether we're in an attribute or not, switch back
- # to the escapeForContent dataEscaper. The contents of a tag must
- # be quoted no matter what; in the top-level document, just so
- # they're valid, and if they're within an attribute, they have to
- # be quoted so that after applying the *un*-quoting required to re-
- # parse the tag within the attribute, all the quoting is still
- # correct.
- yield keepGoing(root.children, escapeForContent)
- write(b"</" + tagName + b">")
- else:
- write(b" />")
- elif isinstance(root, (tuple, list, GeneratorType)):
- for element in root:
- yield keepGoing(element)
- elif isinstance(root, CharRef):
- escaped = "&#%d;" % (root.ordinal,)
- write(escaped.encode("ascii"))
- elif isinstance(root, Deferred):
- yield keepGoingAsync(_fork(root))
- elif iscoroutine(root):
- yield keepGoingAsync(
- Deferred.fromCoroutine(
- cast(Coroutine[Deferred[Flattenable], object, Flattenable], root)
- )
- )
- elif IRenderable.providedBy(root):
- result = root.render(request)
- yield keepGoing(result, renderFactory=root)
- else:
- raise UnsupportedType(root)
- async def _flattenTree(
- request: Optional[IRequest], root: Flattenable, write: Callable[[bytes], object]
- ) -> None:
- """
- Make C{root} into an iterable of L{bytes} and L{Deferred} by doing a depth
- first traversal of the tree.
- @param request: A request object which will be passed to
- L{IRenderable.render}.
- @param root: An object to be made flatter. This may be of type C{unicode},
- L{bytes}, L{slot}, L{Tag <twisted.web.template.Tag>}, L{tuple},
- L{list}, L{types.GeneratorType}, L{Deferred}, or something providing
- L{IRenderable}.
- @param write: A callable which will be invoked with each L{bytes} produced
- by flattening C{root}.
- @return: A C{Deferred}-returning coroutine that resolves to C{None}.
- """
- buf = []
- bufSize = 0
- # Accumulate some bytes up to the buffer size so that we don't annoy the
- # upstream writer with a million tiny string.
- def bufferedWrite(bs: bytes) -> None:
- nonlocal bufSize
- buf.append(bs)
- bufSize += len(bs)
- if bufSize >= BUFFER_SIZE:
- flushBuffer()
- # Deliver the buffered content to the upstream writer as a single string.
- # This is how a "big enough" buffer gets delivered, how a buffer of any
- # size is delivered before execution is suspended to wait for an
- # asynchronous value, and how anything left in the buffer when we're
- # finished is delivered.
- def flushBuffer() -> None:
- nonlocal bufSize
- if bufSize > 0:
- write(b"".join(buf))
- del buf[:]
- bufSize = 0
- stack: List[Generator[Any, Any, Any]] = [
- _flattenElement(request, root, bufferedWrite, [], None, escapeForContent)
- ]
- while stack:
- try:
- frame = stack[-1].gi_frame
- element = next(stack[-1])
- if isinstance(element, Deferred):
- # Before suspending flattening for an unknown amount of time,
- # flush whatever data we have collected so far.
- flushBuffer()
- element = await element
- except StopIteration:
- stack.pop()
- except Exception as e:
- stack.pop()
- roots = []
- for generator in stack:
- roots.append(generator.gi_frame.f_locals["root"])
- roots.append(frame.f_locals["root"])
- raise FlattenerError(e, roots, extract_tb(exc_info()[2]))
- else:
- stack.append(element)
- # Flush any data that remains in the buffer before finishing.
- flushBuffer()
- def flatten(
- request: Optional[IRequest], root: Flattenable, write: Callable[[bytes], object]
- ) -> Deferred[None]:
- """
- Incrementally write out a string representation of C{root} using C{write}.
- In order to create a string representation, C{root} will be decomposed into
- simpler objects which will themselves be decomposed and so on until strings
- or objects which can easily be converted to strings are encountered.
- @param request: A request object which will be passed to the C{render}
- method of any L{IRenderable} provider which is encountered.
- @param root: An object to be made flatter. This may be of type L{str},
- L{bytes}, L{slot}, L{Tag <twisted.web.template.Tag>}, L{tuple},
- L{list}, L{types.GeneratorType}, L{Deferred}, or something that
- provides L{IRenderable}.
- @param write: A callable which will be invoked with each L{bytes} produced
- by flattening C{root}.
- @return: A L{Deferred} which will be called back with C{None} when C{root}
- has been completely flattened into C{write} or which will be errbacked
- if an unexpected exception occurs.
- """
- return ensureDeferred(_flattenTree(request, root, write))
- def flattenString(request: Optional[IRequest], root: Flattenable) -> Deferred[bytes]:
- """
- Collate a string representation of C{root} into a single string.
- This is basically gluing L{flatten} to an L{io.BytesIO} and returning
- the results. See L{flatten} for the exact meanings of C{request} and
- C{root}.
- @return: A L{Deferred} which will be called back with a single UTF-8 encoded
- string as its result when C{root} has been completely flattened or which
- will be errbacked if an unexpected exception occurs.
- """
- io = BytesIO()
- d = flatten(request, root, io.write)
- d.addCallback(lambda _: io.getvalue())
- return cast(Deferred[bytes], d)
|