1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099 |
- import codecs
- import io
- import mimetypes
- import os
- import pkgutil
- import re
- import sys
- import typing as t
- import unicodedata
- import warnings
- from datetime import datetime
- from html.entities import name2codepoint
- from time import time
- from zlib import adler32
- from ._internal import _DictAccessorProperty
- from ._internal import _missing
- from ._internal import _parse_signature
- from ._internal import _TAccessorValue
- from .datastructures import Headers
- from .exceptions import NotFound
- from .exceptions import RequestedRangeNotSatisfiable
- from .security import safe_join
- from .urls import url_quote
- from .wsgi import wrap_file
- if t.TYPE_CHECKING:
- from _typeshed.wsgi import WSGIEnvironment
- from .wrappers.request import Request
- from .wrappers.response import Response
- _T = t.TypeVar("_T")
- _entity_re = re.compile(r"&([^;]+);")
- _filename_ascii_strip_re = re.compile(r"[^A-Za-z0-9_.-]")
- _windows_device_files = (
- "CON",
- "AUX",
- "COM1",
- "COM2",
- "COM3",
- "COM4",
- "LPT1",
- "LPT2",
- "LPT3",
- "PRN",
- "NUL",
- )
- class cached_property(property, t.Generic[_T]):
- """A :func:`property` that is only evaluated once. Subsequent access
- returns the cached value. Setting the property sets the cached
- value. Deleting the property clears the cached value, accessing it
- again will evaluate it again.
- .. code-block:: python
- class Example:
- @cached_property
- def value(self):
- # calculate something important here
- return 42
- e = Example()
- e.value # evaluates
- e.value # uses cache
- e.value = 16 # sets cache
- del e.value # clears cache
- The class must have a ``__dict__`` for this to work.
- .. versionchanged:: 2.0
- ``del obj.name`` clears the cached value.
- """
- def __init__(
- self,
- fget: t.Callable[[t.Any], _T],
- name: t.Optional[str] = None,
- doc: t.Optional[str] = None,
- ) -> None:
- super().__init__(fget, doc=doc)
- self.__name__ = name or fget.__name__
- self.__module__ = fget.__module__
- def __set__(self, obj: object, value: _T) -> None:
- obj.__dict__[self.__name__] = value
- def __get__(self, obj: object, type: type = None) -> _T: # type: ignore
- if obj is None:
- return self # type: ignore
- value: _T = obj.__dict__.get(self.__name__, _missing)
- if value is _missing:
- value = self.fget(obj) # type: ignore
- obj.__dict__[self.__name__] = value
- return value
- def __delete__(self, obj: object) -> None:
- del obj.__dict__[self.__name__]
- def invalidate_cached_property(obj: object, name: str) -> None:
- """Invalidates the cache for a :class:`cached_property`:
- >>> class Test(object):
- ... @cached_property
- ... def magic_number(self):
- ... print("recalculating...")
- ... return 42
- ...
- >>> var = Test()
- >>> var.magic_number
- recalculating...
- 42
- >>> var.magic_number
- 42
- >>> invalidate_cached_property(var, "magic_number")
- >>> var.magic_number
- recalculating...
- 42
- You must pass the name of the cached property as the second argument.
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1. Use ``del obj.name`` instead.
- """
- warnings.warn(
- "'invalidate_cached_property' is deprecated and will be removed"
- " in Werkzeug 2.1. Use 'del obj.name' instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- delattr(obj, name)
- class environ_property(_DictAccessorProperty[_TAccessorValue]):
- """Maps request attributes to environment variables. This works not only
- for the Werkzeug request object, but also any other class with an
- environ attribute:
- >>> class Test(object):
- ... environ = {'key': 'value'}
- ... test = environ_property('key')
- >>> var = Test()
- >>> var.test
- 'value'
- If you pass it a second value it's used as default if the key does not
- exist, the third one can be a converter that takes a value and converts
- it. If it raises :exc:`ValueError` or :exc:`TypeError` the default value
- is used. If no default value is provided `None` is used.
- Per default the property is read only. You have to explicitly enable it
- by passing ``read_only=False`` to the constructor.
- """
- read_only = True
- def lookup(self, obj: "Request") -> "WSGIEnvironment":
- return obj.environ
- class header_property(_DictAccessorProperty[_TAccessorValue]):
- """Like `environ_property` but for headers."""
- def lookup(self, obj: t.Union["Request", "Response"]) -> Headers:
- return obj.headers
- class HTMLBuilder:
- """Helper object for HTML generation.
- Per default there are two instances of that class. The `html` one, and
- the `xhtml` one for those two dialects. The class uses keyword parameters
- and positional parameters to generate small snippets of HTML.
- Keyword parameters are converted to XML/SGML attributes, positional
- arguments are used as children. Because Python accepts positional
- arguments before keyword arguments it's a good idea to use a list with the
- star-syntax for some children:
- >>> html.p(class_='foo', *[html.a('foo', href='foo.html'), ' ',
- ... html.a('bar', href='bar.html')])
- '<p class="foo"><a href="foo.html">foo</a> <a href="bar.html">bar</a></p>'
- This class works around some browser limitations and can not be used for
- arbitrary SGML/XML generation. For that purpose lxml and similar
- libraries exist.
- Calling the builder escapes the string passed:
- >>> html.p(html("<foo>"))
- '<p><foo></p>'
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1.
- """
- _entity_re = re.compile(r"&([^;]+);")
- _entities = name2codepoint.copy()
- _entities["apos"] = 39
- _empty_elements = {
- "area",
- "base",
- "basefont",
- "br",
- "col",
- "command",
- "embed",
- "frame",
- "hr",
- "img",
- "input",
- "keygen",
- "isindex",
- "link",
- "meta",
- "param",
- "source",
- "wbr",
- }
- _boolean_attributes = {
- "selected",
- "checked",
- "compact",
- "declare",
- "defer",
- "disabled",
- "ismap",
- "multiple",
- "nohref",
- "noresize",
- "noshade",
- "nowrap",
- }
- _plaintext_elements = {"textarea"}
- _c_like_cdata = {"script", "style"}
- def __init__(self, dialect): # type: ignore
- self._dialect = dialect
- def __call__(self, s): # type: ignore
- import html
- warnings.warn(
- "'utils.HTMLBuilder' is deprecated and will be removed in Werkzeug 2.1.",
- DeprecationWarning,
- stacklevel=2,
- )
- return html.escape(s)
- def __getattr__(self, tag): # type: ignore
- import html
- warnings.warn(
- "'utils.HTMLBuilder' is deprecated and will be removed in Werkzeug 2.1.",
- DeprecationWarning,
- stacklevel=2,
- )
- if tag[:2] == "__":
- raise AttributeError(tag)
- def proxy(*children, **arguments): # type: ignore
- buffer = f"<{tag}"
- for key, value in arguments.items():
- if value is None:
- continue
- if key[-1] == "_":
- key = key[:-1]
- if key in self._boolean_attributes:
- if not value:
- continue
- if self._dialect == "xhtml":
- value = f'="{key}"'
- else:
- value = ""
- else:
- value = f'="{html.escape(value)}"'
- buffer += f" {key}{value}"
- if not children and tag in self._empty_elements:
- if self._dialect == "xhtml":
- buffer += " />"
- else:
- buffer += ">"
- return buffer
- buffer += ">"
- children_as_string = "".join([str(x) for x in children if x is not None])
- if children_as_string:
- if tag in self._plaintext_elements:
- children_as_string = html.escape(children_as_string)
- elif tag in self._c_like_cdata and self._dialect == "xhtml":
- children_as_string = f"/*<![CDATA[*/{children_as_string}/*]]>*/"
- buffer += children_as_string + f"</{tag}>"
- return buffer
- return proxy
- def __repr__(self) -> str:
- return f"<{type(self).__name__} for {self._dialect!r}>"
- html = HTMLBuilder("html")
- xhtml = HTMLBuilder("xhtml")
- # https://cgit.freedesktop.org/xdg/shared-mime-info/tree/freedesktop.org.xml.in
- # https://www.iana.org/assignments/media-types/media-types.xhtml
- # Types listed in the XDG mime info that have a charset in the IANA registration.
- _charset_mimetypes = {
- "application/ecmascript",
- "application/javascript",
- "application/sql",
- "application/xml",
- "application/xml-dtd",
- "application/xml-external-parsed-entity",
- }
- def get_content_type(mimetype: str, charset: str) -> str:
- """Returns the full content type string with charset for a mimetype.
- If the mimetype represents text, the charset parameter will be
- appended, otherwise the mimetype is returned unchanged.
- :param mimetype: The mimetype to be used as content type.
- :param charset: The charset to be appended for text mimetypes.
- :return: The content type.
- .. versionchanged:: 0.15
- Any type that ends with ``+xml`` gets a charset, not just those
- that start with ``application/``. Known text types such as
- ``application/javascript`` are also given charsets.
- """
- if (
- mimetype.startswith("text/")
- or mimetype in _charset_mimetypes
- or mimetype.endswith("+xml")
- ):
- mimetype += f"; charset={charset}"
- return mimetype
- def detect_utf_encoding(data: bytes) -> str:
- """Detect which UTF encoding was used to encode the given bytes.
- The latest JSON standard (:rfc:`8259`) suggests that only UTF-8 is
- accepted. Older documents allowed 8, 16, or 32. 16 and 32 can be big
- or little endian. Some editors or libraries may prepend a BOM.
- :internal:
- :param data: Bytes in unknown UTF encoding.
- :return: UTF encoding name
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1. This is built in to
- :func:`json.loads`.
- .. versionadded:: 0.15
- """
- warnings.warn(
- "'detect_utf_encoding' is deprecated and will be removed in"
- " Werkzeug 2.1. This is built in to 'json.loads'.",
- DeprecationWarning,
- stacklevel=2,
- )
- head = data[:4]
- if head[:3] == codecs.BOM_UTF8:
- return "utf-8-sig"
- if b"\x00" not in head:
- return "utf-8"
- if head in (codecs.BOM_UTF32_BE, codecs.BOM_UTF32_LE):
- return "utf-32"
- if head[:2] in (codecs.BOM_UTF16_BE, codecs.BOM_UTF16_LE):
- return "utf-16"
- if len(head) == 4:
- if head[:3] == b"\x00\x00\x00":
- return "utf-32-be"
- if head[::2] == b"\x00\x00":
- return "utf-16-be"
- if head[1:] == b"\x00\x00\x00":
- return "utf-32-le"
- if head[1::2] == b"\x00\x00":
- return "utf-16-le"
- if len(head) == 2:
- return "utf-16-be" if head.startswith(b"\x00") else "utf-16-le"
- return "utf-8"
- def format_string(string: str, context: t.Mapping[str, t.Any]) -> str:
- """String-template format a string:
- >>> format_string('$foo and ${foo}s', dict(foo=42))
- '42 and 42s'
- This does not do any attribute lookup.
- :param string: the format string.
- :param context: a dict with the variables to insert.
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1. Use :class:`string.Template`
- instead.
- """
- from string import Template
- warnings.warn(
- "'utils.format_string' is deprecated and will be removed in"
- " Werkzeug 2.1. Use 'string.Template' instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- return Template(string).substitute(context)
- def secure_filename(filename: str) -> str:
- r"""Pass it a filename and it will return a secure version of it. This
- filename can then safely be stored on a regular file system and passed
- to :func:`os.path.join`. The filename returned is an ASCII only string
- for maximum portability.
- On windows systems the function also makes sure that the file is not
- named after one of the special device files.
- >>> secure_filename("My cool movie.mov")
- 'My_cool_movie.mov'
- >>> secure_filename("../../../etc/passwd")
- 'etc_passwd'
- >>> secure_filename('i contain cool \xfcml\xe4uts.txt')
- 'i_contain_cool_umlauts.txt'
- The function might return an empty filename. It's your responsibility
- to ensure that the filename is unique and that you abort or
- generate a random filename if the function returned an empty one.
- .. versionadded:: 0.5
- :param filename: the filename to secure
- """
- filename = unicodedata.normalize("NFKD", filename)
- filename = filename.encode("ascii", "ignore").decode("ascii")
- for sep in os.path.sep, os.path.altsep:
- if sep:
- filename = filename.replace(sep, " ")
- filename = str(_filename_ascii_strip_re.sub("", "_".join(filename.split()))).strip(
- "._"
- )
- # on nt a couple of special files are present in each folder. We
- # have to ensure that the target file is not such a filename. In
- # this case we prepend an underline
- if (
- os.name == "nt"
- and filename
- and filename.split(".")[0].upper() in _windows_device_files
- ):
- filename = f"_{filename}"
- return filename
- def escape(s: t.Any) -> str:
- """Replace ``&``, ``<``, ``>``, ``"``, and ``'`` with HTML-safe
- sequences.
- ``None`` is escaped to an empty string.
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1. Use MarkupSafe instead.
- """
- import html
- warnings.warn(
- "'utils.escape' is deprecated and will be removed in Werkzeug"
- " 2.1. Use MarkupSafe instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- if s is None:
- return ""
- if hasattr(s, "__html__"):
- return s.__html__() # type: ignore
- if not isinstance(s, str):
- s = str(s)
- return html.escape(s, quote=True) # type: ignore
- def unescape(s: str) -> str:
- """The reverse of :func:`escape`. This unescapes all the HTML
- entities, not only those inserted by ``escape``.
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1. Use MarkupSafe instead.
- """
- import html
- warnings.warn(
- "'utils.unescape' is deprecated and will be removed in Werkzueg"
- " 2.1. Use MarkupSafe instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- return html.unescape(s)
- def redirect(
- location: str, code: int = 302, Response: t.Optional[t.Type["Response"]] = None
- ) -> "Response":
- """Returns a response object (a WSGI application) that, if called,
- redirects the client to the target location. Supported codes are
- 301, 302, 303, 305, 307, and 308. 300 is not supported because
- it's not a real redirect and 304 because it's the answer for a
- request with a request with defined If-Modified-Since headers.
- .. versionadded:: 0.6
- The location can now be a unicode string that is encoded using
- the :func:`iri_to_uri` function.
- .. versionadded:: 0.10
- The class used for the Response object can now be passed in.
- :param location: the location the response should redirect to.
- :param code: the redirect status code. defaults to 302.
- :param class Response: a Response class to use when instantiating a
- response. The default is :class:`werkzeug.wrappers.Response` if
- unspecified.
- """
- import html
- if Response is None:
- from .wrappers import Response # type: ignore
- display_location = html.escape(location)
- if isinstance(location, str):
- # Safe conversion is necessary here as we might redirect
- # to a broken URI scheme (for instance itms-services).
- from .urls import iri_to_uri
- location = iri_to_uri(location, safe_conversion=True)
- response = Response( # type: ignore
- '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n'
- "<title>Redirecting...</title>\n"
- "<h1>Redirecting...</h1>\n"
- "<p>You should be redirected automatically to target URL: "
- f'<a href="{html.escape(location)}">{display_location}</a>. If'
- " not click the link.",
- code,
- mimetype="text/html",
- )
- response.headers["Location"] = location
- return response
- def append_slash_redirect(environ: "WSGIEnvironment", code: int = 301) -> "Response":
- """Redirects to the same URL but with a slash appended. The behavior
- of this function is undefined if the path ends with a slash already.
- :param environ: the WSGI environment for the request that triggers
- the redirect.
- :param code: the status code for the redirect.
- """
- new_path = environ["PATH_INFO"].strip("/") + "/"
- query_string = environ.get("QUERY_STRING")
- if query_string:
- new_path += f"?{query_string}"
- return redirect(new_path, code)
- def send_file(
- path_or_file: t.Union[os.PathLike, str, t.IO[bytes]],
- environ: "WSGIEnvironment",
- mimetype: t.Optional[str] = None,
- as_attachment: bool = False,
- download_name: t.Optional[str] = None,
- conditional: bool = True,
- etag: t.Union[bool, str] = True,
- last_modified: t.Optional[t.Union[datetime, int, float]] = None,
- max_age: t.Optional[
- t.Union[int, t.Callable[[t.Optional[str]], t.Optional[int]]]
- ] = None,
- use_x_sendfile: bool = False,
- response_class: t.Optional[t.Type["Response"]] = None,
- _root_path: t.Optional[t.Union[os.PathLike, str]] = None,
- ) -> "Response":
- """Send the contents of a file to the client.
- The first argument can be a file path or a file-like object. Paths
- are preferred in most cases because Werkzeug can manage the file and
- get extra information from the path. Passing a file-like object
- requires that the file is opened in binary mode, and is mostly
- useful when building a file in memory with :class:`io.BytesIO`.
- Never pass file paths provided by a user. The path is assumed to be
- trusted, so a user could craft a path to access a file you didn't
- intend.
- If the WSGI server sets a ``file_wrapper`` in ``environ``, it is
- used, otherwise Werkzeug's built-in wrapper is used. Alternatively,
- if the HTTP server supports ``X-Sendfile``, ``use_x_sendfile=True``
- will tell the server to send the given path, which is much more
- efficient than reading it in Python.
- :param path_or_file: The path to the file to send, relative to the
- current working directory if a relative path is given.
- Alternatively, a file-like object opened in binary mode. Make
- sure the file pointer is seeked to the start of the data.
- :param environ: The WSGI environ for the current request.
- :param mimetype: The MIME type to send for the file. If not
- provided, it will try to detect it from the file name.
- :param as_attachment: Indicate to a browser that it should offer to
- save the file instead of displaying it.
- :param download_name: The default name browsers will use when saving
- the file. Defaults to the passed file name.
- :param conditional: Enable conditional and range responses based on
- request headers. Requires passing a file path and ``environ``.
- :param etag: Calculate an ETag for the file, which requires passing
- a file path. Can also be a string to use instead.
- :param last_modified: The last modified time to send for the file,
- in seconds. If not provided, it will try to detect it from the
- file path.
- :param max_age: How long the client should cache the file, in
- seconds. If set, ``Cache-Control`` will be ``public``, otherwise
- it will be ``no-cache`` to prefer conditional caching.
- :param use_x_sendfile: Set the ``X-Sendfile`` header to let the
- server to efficiently send the file. Requires support from the
- HTTP server. Requires passing a file path.
- :param response_class: Build the response using this class. Defaults
- to :class:`~werkzeug.wrappers.Response`.
- :param _root_path: Do not use. For internal use only. Use
- :func:`send_from_directory` to safely send files under a path.
- .. versionchanged:: 2.0.2
- ``send_file`` only sets a detected ``Content-Encoding`` if
- ``as_attachment`` is disabled.
- .. versionadded:: 2.0
- Adapted from Flask's implementation.
- .. versionchanged:: 2.0
- ``download_name`` replaces Flask's ``attachment_filename``
- parameter. If ``as_attachment=False``, it is passed with
- ``Content-Disposition: inline`` instead.
- .. versionchanged:: 2.0
- ``max_age`` replaces Flask's ``cache_timeout`` parameter.
- ``conditional`` is enabled and ``max_age`` is not set by
- default.
- .. versionchanged:: 2.0
- ``etag`` replaces Flask's ``add_etags`` parameter. It can be a
- string to use instead of generating one.
- .. versionchanged:: 2.0
- If an encoding is returned when guessing ``mimetype`` from
- ``download_name``, set the ``Content-Encoding`` header.
- """
- if response_class is None:
- from .wrappers import Response
- response_class = Response
- path: t.Optional[str] = None
- file: t.Optional[t.IO[bytes]] = None
- size: t.Optional[int] = None
- mtime: t.Optional[float] = None
- headers = Headers()
- if isinstance(path_or_file, (os.PathLike, str)) or hasattr(
- path_or_file, "__fspath__"
- ):
- path_or_file = t.cast(t.Union[os.PathLike, str], path_or_file)
- # Flask will pass app.root_path, allowing its send_file wrapper
- # to not have to deal with paths.
- if _root_path is not None:
- path = os.path.join(_root_path, path_or_file)
- else:
- path = os.path.abspath(path_or_file)
- stat = os.stat(path)
- size = stat.st_size
- mtime = stat.st_mtime
- else:
- file = path_or_file
- if download_name is None and path is not None:
- download_name = os.path.basename(path)
- if mimetype is None:
- if download_name is None:
- raise TypeError(
- "Unable to detect the MIME type because a file name is"
- " not available. Either set 'download_name', pass a"
- " path instead of a file, or set 'mimetype'."
- )
- mimetype, encoding = mimetypes.guess_type(download_name)
- if mimetype is None:
- mimetype = "application/octet-stream"
- # Don't send encoding for attachments, it causes browsers to
- # save decompress tar.gz files.
- if encoding is not None and not as_attachment:
- headers.set("Content-Encoding", encoding)
- if download_name is not None:
- try:
- download_name.encode("ascii")
- except UnicodeEncodeError:
- simple = unicodedata.normalize("NFKD", download_name)
- simple = simple.encode("ascii", "ignore").decode("ascii")
- quoted = url_quote(download_name, safe="")
- names = {"filename": simple, "filename*": f"UTF-8''{quoted}"}
- else:
- names = {"filename": download_name}
- value = "attachment" if as_attachment else "inline"
- headers.set("Content-Disposition", value, **names)
- elif as_attachment:
- raise TypeError(
- "No name provided for attachment. Either set"
- " 'download_name' or pass a path instead of a file."
- )
- if use_x_sendfile and path is not None:
- headers["X-Sendfile"] = path
- data = None
- else:
- if file is None:
- file = open(path, "rb") # type: ignore
- elif isinstance(file, io.BytesIO):
- size = file.getbuffer().nbytes
- elif isinstance(file, io.TextIOBase):
- raise ValueError("Files must be opened in binary mode or use BytesIO.")
- data = wrap_file(environ, file)
- rv = response_class(
- data, mimetype=mimetype, headers=headers, direct_passthrough=True
- )
- if size is not None:
- rv.content_length = size
- if last_modified is not None:
- rv.last_modified = last_modified # type: ignore
- elif mtime is not None:
- rv.last_modified = mtime # type: ignore
- rv.cache_control.no_cache = True
- # Flask will pass app.get_send_file_max_age, allowing its send_file
- # wrapper to not have to deal with paths.
- if callable(max_age):
- max_age = max_age(path)
- if max_age is not None:
- if max_age > 0:
- rv.cache_control.no_cache = None
- rv.cache_control.public = True
- rv.cache_control.max_age = max_age
- rv.expires = int(time() + max_age) # type: ignore
- if isinstance(etag, str):
- rv.set_etag(etag)
- elif etag and path is not None:
- check = adler32(path.encode("utf-8")) & 0xFFFFFFFF
- rv.set_etag(f"{mtime}-{size}-{check}")
- if conditional:
- try:
- rv = rv.make_conditional(environ, accept_ranges=True, complete_length=size)
- except RequestedRangeNotSatisfiable:
- if file is not None:
- file.close()
- raise
- # Some x-sendfile implementations incorrectly ignore the 304
- # status code and send the file anyway.
- if rv.status_code == 304:
- rv.headers.pop("x-sendfile", None)
- return rv
- def send_from_directory(
- directory: t.Union[os.PathLike, str],
- path: t.Union[os.PathLike, str],
- environ: "WSGIEnvironment",
- **kwargs: t.Any,
- ) -> "Response":
- """Send a file from within a directory using :func:`send_file`.
- This is a secure way to serve files from a folder, such as static
- files or uploads. Uses :func:`~werkzeug.security.safe_join` to
- ensure the path coming from the client is not maliciously crafted to
- point outside the specified directory.
- If the final path does not point to an existing regular file,
- returns a 404 :exc:`~werkzeug.exceptions.NotFound` error.
- :param directory: The directory that ``path`` must be located under.
- :param path: The path to the file to send, relative to
- ``directory``.
- :param environ: The WSGI environ for the current request.
- :param kwargs: Arguments to pass to :func:`send_file`.
- .. versionadded:: 2.0
- Adapted from Flask's implementation.
- """
- path = safe_join(os.fspath(directory), os.fspath(path))
- if path is None:
- raise NotFound()
- # Flask will pass app.root_path, allowing its send_from_directory
- # wrapper to not have to deal with paths.
- if "_root_path" in kwargs:
- path = os.path.join(kwargs["_root_path"], path)
- try:
- if not os.path.isfile(path):
- raise NotFound()
- except ValueError:
- # path contains null byte on Python < 3.8
- raise NotFound() from None
- return send_file(path, environ, **kwargs)
- def import_string(import_name: str, silent: bool = False) -> t.Any:
- """Imports an object based on a string. This is useful if you want to
- use import paths as endpoints or something similar. An import path can
- be specified either in dotted notation (``xml.sax.saxutils.escape``)
- or with a colon as object delimiter (``xml.sax.saxutils:escape``).
- If `silent` is True the return value will be `None` if the import fails.
- :param import_name: the dotted name for the object to import.
- :param silent: if set to `True` import errors are ignored and
- `None` is returned instead.
- :return: imported object
- """
- import_name = import_name.replace(":", ".")
- try:
- try:
- __import__(import_name)
- except ImportError:
- if "." not in import_name:
- raise
- else:
- return sys.modules[import_name]
- module_name, obj_name = import_name.rsplit(".", 1)
- module = __import__(module_name, globals(), locals(), [obj_name])
- try:
- return getattr(module, obj_name)
- except AttributeError as e:
- raise ImportError(e) from None
- except ImportError as e:
- if not silent:
- raise ImportStringError(import_name, e).with_traceback(
- sys.exc_info()[2]
- ) from None
- return None
- def find_modules(
- import_path: str, include_packages: bool = False, recursive: bool = False
- ) -> t.Iterator[str]:
- """Finds all the modules below a package. This can be useful to
- automatically import all views / controllers so that their metaclasses /
- function decorators have a chance to register themselves on the
- application.
- Packages are not returned unless `include_packages` is `True`. This can
- also recursively list modules but in that case it will import all the
- packages to get the correct load path of that module.
- :param import_path: the dotted name for the package to find child modules.
- :param include_packages: set to `True` if packages should be returned, too.
- :param recursive: set to `True` if recursion should happen.
- :return: generator
- """
- module = import_string(import_path)
- path = getattr(module, "__path__", None)
- if path is None:
- raise ValueError(f"{import_path!r} is not a package")
- basename = f"{module.__name__}."
- for _importer, modname, ispkg in pkgutil.iter_modules(path):
- modname = basename + modname
- if ispkg:
- if include_packages:
- yield modname
- if recursive:
- yield from find_modules(modname, include_packages, True)
- else:
- yield modname
- def validate_arguments(func, args, kwargs, drop_extra=True): # type: ignore
- """Checks if the function accepts the arguments and keyword arguments.
- Returns a new ``(args, kwargs)`` tuple that can safely be passed to
- the function without causing a `TypeError` because the function signature
- is incompatible. If `drop_extra` is set to `True` (which is the default)
- any extra positional or keyword arguments are dropped automatically.
- The exception raised provides three attributes:
- `missing`
- A set of argument names that the function expected but where
- missing.
- `extra`
- A dict of keyword arguments that the function can not handle but
- where provided.
- `extra_positional`
- A list of values that where given by positional argument but the
- function cannot accept.
- This can be useful for decorators that forward user submitted data to
- a view function::
- from werkzeug.utils import ArgumentValidationError, validate_arguments
- def sanitize(f):
- def proxy(request):
- data = request.values.to_dict()
- try:
- args, kwargs = validate_arguments(f, (request,), data)
- except ArgumentValidationError:
- raise BadRequest('The browser failed to transmit all '
- 'the data expected.')
- return f(*args, **kwargs)
- return proxy
- :param func: the function the validation is performed against.
- :param args: a tuple of positional arguments.
- :param kwargs: a dict of keyword arguments.
- :param drop_extra: set to `False` if you don't want extra arguments
- to be silently dropped.
- :return: tuple in the form ``(args, kwargs)``.
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1. Use :func:`inspect.signature`
- instead.
- """
- warnings.warn(
- "'utils.validate_arguments' is deprecated and will be removed"
- " in Werkzeug 2.1. Use 'inspect.signature' instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- parser = _parse_signature(func)
- args, kwargs, missing, extra, extra_positional = parser(args, kwargs)[:5]
- if missing:
- raise ArgumentValidationError(tuple(missing))
- elif (extra or extra_positional) and not drop_extra:
- raise ArgumentValidationError(None, extra, extra_positional)
- return tuple(args), kwargs
- def bind_arguments(func, args, kwargs): # type: ignore
- """Bind the arguments provided into a dict. When passed a function,
- a tuple of arguments and a dict of keyword arguments `bind_arguments`
- returns a dict of names as the function would see it. This can be useful
- to implement a cache decorator that uses the function arguments to build
- the cache key based on the values of the arguments.
- :param func: the function the arguments should be bound for.
- :param args: tuple of positional arguments.
- :param kwargs: a dict of keyword arguments.
- :return: a :class:`dict` of bound keyword arguments.
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1. Use :meth:`Signature.bind`
- instead.
- """
- warnings.warn(
- "'utils.bind_arguments' is deprecated and will be removed in"
- " Werkzeug 2.1. Use 'Signature.bind' instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- (
- args,
- kwargs,
- missing,
- extra,
- extra_positional,
- arg_spec,
- vararg_var,
- kwarg_var,
- ) = _parse_signature(func)(args, kwargs)
- values = {}
- for (name, _has_default, _default), value in zip(arg_spec, args):
- values[name] = value
- if vararg_var is not None:
- values[vararg_var] = tuple(extra_positional)
- elif extra_positional:
- raise TypeError("too many positional arguments")
- if kwarg_var is not None:
- multikw = set(extra) & {x[0] for x in arg_spec}
- if multikw:
- raise TypeError(
- f"got multiple values for keyword argument {next(iter(multikw))!r}"
- )
- values[kwarg_var] = extra
- elif extra:
- raise TypeError(f"got unexpected keyword argument {next(iter(extra))!r}")
- return values
- class ArgumentValidationError(ValueError):
- """Raised if :func:`validate_arguments` fails to validate
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1 along with ``utils.bind`` and
- ``validate_arguments``.
- """
- def __init__(self, missing=None, extra=None, extra_positional=None): # type: ignore
- self.missing = set(missing or ())
- self.extra = extra or {}
- self.extra_positional = extra_positional or []
- super().__init__(
- "function arguments invalid."
- f" ({len(self.missing)} missing,"
- f" {len(self.extra) + len(self.extra_positional)} additional)"
- )
- class ImportStringError(ImportError):
- """Provides information about a failed :func:`import_string` attempt."""
- #: String in dotted notation that failed to be imported.
- import_name: str
- #: Wrapped exception.
- exception: BaseException
- def __init__(self, import_name: str, exception: BaseException) -> None:
- self.import_name = import_name
- self.exception = exception
- msg = import_name
- name = ""
- tracked = []
- for part in import_name.replace(":", ".").split("."):
- name = f"{name}.{part}" if name else part
- imported = import_string(name, silent=True)
- if imported:
- tracked.append((name, getattr(imported, "__file__", None)))
- else:
- track = [f"- {n!r} found in {i!r}." for n, i in tracked]
- track.append(f"- {name!r} not found.")
- track_str = "\n".join(track)
- msg = (
- f"import_string() failed for {import_name!r}. Possible reasons"
- f" are:\n\n"
- "- missing __init__.py in a package;\n"
- "- package or module path not included in sys.path;\n"
- "- duplicated package or module name taking precedence in"
- " sys.path;\n"
- "- missing module, class, function or variable;\n\n"
- f"Debugged import:\n\n{track_str}\n\n"
- f"Original exception:\n\n{type(exception).__name__}: {exception}"
- )
- break
- super().__init__(msg)
- def __repr__(self) -> str:
- return f"<{type(self).__name__}({self.import_name!r}, {self.exception!r})>"
|