|
@@ -1,5 +1,8 @@
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import collections.abc as cabc
|
|
|
import json
|
|
|
-import typing as _t
|
|
|
+import typing as t
|
|
|
|
|
|
from .encoding import want_bytes
|
|
|
from .exc import BadPayload
|
|
@@ -7,22 +10,36 @@ from .exc import BadSignature
|
|
|
from .signer import _make_keys_list
|
|
|
from .signer import Signer
|
|
|
|
|
|
-_t_str_bytes = _t.Union[str, bytes]
|
|
|
-_t_opt_str_bytes = _t.Optional[_t_str_bytes]
|
|
|
-_t_kwargs = _t.Dict[str, _t.Any]
|
|
|
-_t_opt_kwargs = _t.Optional[_t_kwargs]
|
|
|
-_t_signer = _t.Type[Signer]
|
|
|
-_t_fallbacks = _t.List[_t.Union[_t_kwargs, _t.Tuple[_t_signer, _t_kwargs], _t_signer]]
|
|
|
-_t_load_unsafe = _t.Tuple[bool, _t.Any]
|
|
|
-_t_secret_key = _t.Union[_t.Iterable[_t_str_bytes], _t_str_bytes]
|
|
|
+if t.TYPE_CHECKING:
|
|
|
+ import typing_extensions as te
|
|
|
+
|
|
|
+ # This should be either be str or bytes. To avoid having to specify the
|
|
|
+ # bound type, it falls back to a union if structural matching fails.
|
|
|
+ _TSerialized = te.TypeVar(
|
|
|
+ "_TSerialized", bound=t.Union[str, bytes], default=t.Union[str, bytes]
|
|
|
+ )
|
|
|
+else:
|
|
|
+ # Still available at runtime on Python < 3.13, but without the default.
|
|
|
+ _TSerialized = t.TypeVar("_TSerialized", bound=t.Union[str, bytes])
|
|
|
+
|
|
|
|
|
|
+class _PDataSerializer(t.Protocol[_TSerialized]):
|
|
|
+ def loads(self, payload: _TSerialized, /) -> t.Any: ...
|
|
|
+ # A signature with additional arguments is not handled correctly by type
|
|
|
+ # checkers right now, so an overload is used below for serializers that
|
|
|
+ # don't match this strict protocol.
|
|
|
+ def dumps(self, obj: t.Any, /) -> _TSerialized: ...
|
|
|
|
|
|
-def is_text_serializer(serializer: _t.Any) -> bool:
|
|
|
+
|
|
|
+# Use TypeIs once it's available in typing_extensions or 3.13.
|
|
|
+def is_text_serializer(
|
|
|
+ serializer: _PDataSerializer[t.Any],
|
|
|
+) -> te.TypeGuard[_PDataSerializer[str]]:
|
|
|
"""Checks whether a serializer generates text or binary."""
|
|
|
return isinstance(serializer.dumps({}), str)
|
|
|
|
|
|
|
|
|
-class Serializer:
|
|
|
+class Serializer(t.Generic[_TSerialized]):
|
|
|
"""A serializer wraps a :class:`~itsdangerous.signer.Signer` to
|
|
|
enable serializing and securely signing data other than bytes. It
|
|
|
can unsign to verify that the data hasn't been changed.
|
|
@@ -77,31 +94,120 @@ class Serializer:
|
|
|
#: The default serialization module to use to serialize data to a
|
|
|
#: string internally. The default is :mod:`json`, but can be changed
|
|
|
#: to any object that provides ``dumps`` and ``loads`` methods.
|
|
|
- default_serializer: _t.Any = json
|
|
|
+ default_serializer: _PDataSerializer[t.Any] = json
|
|
|
|
|
|
#: The default ``Signer`` class to instantiate when signing data.
|
|
|
#: The default is :class:`itsdangerous.signer.Signer`.
|
|
|
- default_signer: _t_signer = Signer
|
|
|
+ default_signer: type[Signer] = Signer
|
|
|
|
|
|
#: The default fallback signers to try when unsigning fails.
|
|
|
- default_fallback_signers: _t_fallbacks = []
|
|
|
+ default_fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ] = []
|
|
|
+
|
|
|
+ # Serializer[str] if no data serializer is provided, or if it returns str.
|
|
|
+ @t.overload
|
|
|
+ def __init__(
|
|
|
+ self: Serializer[str],
|
|
|
+ secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
|
|
|
+ salt: str | bytes | None = b"itsdangerous",
|
|
|
+ serializer: None | _PDataSerializer[str] = None,
|
|
|
+ serializer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ signer: type[Signer] | None = None,
|
|
|
+ signer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ]
|
|
|
+ | None = None,
|
|
|
+ ): ...
|
|
|
+
|
|
|
+ # Serializer[bytes] with a bytes data serializer positional argument.
|
|
|
+ @t.overload
|
|
|
+ def __init__(
|
|
|
+ self: Serializer[bytes],
|
|
|
+ secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
|
|
|
+ salt: str | bytes | None,
|
|
|
+ serializer: _PDataSerializer[bytes],
|
|
|
+ serializer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ signer: type[Signer] | None = None,
|
|
|
+ signer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ]
|
|
|
+ | None = None,
|
|
|
+ ): ...
|
|
|
+
|
|
|
+ # Serializer[bytes] with a bytes data serializer keyword argument.
|
|
|
+ @t.overload
|
|
|
+ def __init__(
|
|
|
+ self: Serializer[bytes],
|
|
|
+ secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
|
|
|
+ salt: str | bytes | None = b"itsdangerous",
|
|
|
+ *,
|
|
|
+ serializer: _PDataSerializer[bytes],
|
|
|
+ serializer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ signer: type[Signer] | None = None,
|
|
|
+ signer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ]
|
|
|
+ | None = None,
|
|
|
+ ): ...
|
|
|
+
|
|
|
+ # Fall back with a positional argument. If the strict signature of
|
|
|
+ # _PDataSerializer doesn't match, fall back to a union, requiring the user
|
|
|
+ # to specify the type.
|
|
|
+ @t.overload
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
|
|
|
+ salt: str | bytes | None,
|
|
|
+ serializer: t.Any,
|
|
|
+ serializer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ signer: type[Signer] | None = None,
|
|
|
+ signer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ]
|
|
|
+ | None = None,
|
|
|
+ ): ...
|
|
|
+
|
|
|
+ # Fall back with a keyword argument.
|
|
|
+ @t.overload
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
|
|
|
+ salt: str | bytes | None = b"itsdangerous",
|
|
|
+ *,
|
|
|
+ serializer: t.Any,
|
|
|
+ serializer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ signer: type[Signer] | None = None,
|
|
|
+ signer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ]
|
|
|
+ | None = None,
|
|
|
+ ): ...
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
- secret_key: _t_secret_key,
|
|
|
- salt: _t_opt_str_bytes = b"itsdangerous",
|
|
|
- serializer: _t.Any = None,
|
|
|
- serializer_kwargs: _t_opt_kwargs = None,
|
|
|
- signer: _t.Optional[_t_signer] = None,
|
|
|
- signer_kwargs: _t_opt_kwargs = None,
|
|
|
- fallback_signers: _t.Optional[_t_fallbacks] = None,
|
|
|
+ secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
|
|
|
+ salt: str | bytes | None = b"itsdangerous",
|
|
|
+ serializer: t.Any | None = None,
|
|
|
+ serializer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ signer: type[Signer] | None = None,
|
|
|
+ signer_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ]
|
|
|
+ | None = None,
|
|
|
):
|
|
|
#: The list of secret keys to try for verifying signatures, from
|
|
|
#: oldest to newest. The newest (last) key is used for signing.
|
|
|
#:
|
|
|
#: This allows a key rotation system to keep a list of allowed
|
|
|
#: keys and remove expired ones.
|
|
|
- self.secret_keys: _t.List[bytes] = _make_keys_list(secret_key)
|
|
|
+ self.secret_keys: list[bytes] = _make_keys_list(secret_key)
|
|
|
|
|
|
if salt is not None:
|
|
|
salt = want_bytes(salt)
|
|
@@ -112,20 +218,22 @@ class Serializer:
|
|
|
if serializer is None:
|
|
|
serializer = self.default_serializer
|
|
|
|
|
|
- self.serializer: _t.Any = serializer
|
|
|
+ self.serializer: _PDataSerializer[_TSerialized] = serializer
|
|
|
self.is_text_serializer: bool = is_text_serializer(serializer)
|
|
|
|
|
|
if signer is None:
|
|
|
signer = self.default_signer
|
|
|
|
|
|
- self.signer: _t_signer = signer
|
|
|
- self.signer_kwargs: _t_kwargs = signer_kwargs or {}
|
|
|
+ self.signer: type[Signer] = signer
|
|
|
+ self.signer_kwargs: dict[str, t.Any] = signer_kwargs or {}
|
|
|
|
|
|
if fallback_signers is None:
|
|
|
- fallback_signers = list(self.default_fallback_signers or ())
|
|
|
+ fallback_signers = list(self.default_fallback_signers)
|
|
|
|
|
|
- self.fallback_signers: _t_fallbacks = fallback_signers
|
|
|
- self.serializer_kwargs: _t_kwargs = serializer_kwargs or {}
|
|
|
+ self.fallback_signers: list[
|
|
|
+ dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
|
|
|
+ ] = fallback_signers
|
|
|
+ self.serializer_kwargs: dict[str, t.Any] = serializer_kwargs or {}
|
|
|
|
|
|
@property
|
|
|
def secret_key(self) -> bytes:
|
|
@@ -135,8 +243,8 @@ class Serializer:
|
|
|
return self.secret_keys[-1]
|
|
|
|
|
|
def load_payload(
|
|
|
- self, payload: bytes, serializer: _t.Optional[_t.Any] = None
|
|
|
- ) -> _t.Any:
|
|
|
+ self, payload: bytes, serializer: _PDataSerializer[t.Any] | None = None
|
|
|
+ ) -> t.Any:
|
|
|
"""Loads the encoded object. This function raises
|
|
|
:class:`.BadPayload` if the payload is not valid. The
|
|
|
``serializer`` parameter can be used to override the serializer
|
|
@@ -144,16 +252,17 @@ class Serializer:
|
|
|
bytes.
|
|
|
"""
|
|
|
if serializer is None:
|
|
|
- serializer = self.serializer
|
|
|
+ use_serializer = self.serializer
|
|
|
is_text = self.is_text_serializer
|
|
|
else:
|
|
|
+ use_serializer = serializer
|
|
|
is_text = is_text_serializer(serializer)
|
|
|
|
|
|
try:
|
|
|
if is_text:
|
|
|
- return serializer.loads(payload.decode("utf-8"))
|
|
|
+ return use_serializer.loads(payload.decode("utf-8")) # type: ignore[arg-type]
|
|
|
|
|
|
- return serializer.loads(payload)
|
|
|
+ return use_serializer.loads(payload) # type: ignore[arg-type]
|
|
|
except Exception as e:
|
|
|
raise BadPayload(
|
|
|
"Could not load the payload because an exception"
|
|
@@ -161,14 +270,14 @@ class Serializer:
|
|
|
original_error=e,
|
|
|
) from e
|
|
|
|
|
|
- def dump_payload(self, obj: _t.Any) -> bytes:
|
|
|
+ def dump_payload(self, obj: t.Any) -> bytes:
|
|
|
"""Dumps the encoded object. The return value is always bytes.
|
|
|
If the internal serializer returns text, the value will be
|
|
|
encoded as UTF-8.
|
|
|
"""
|
|
|
return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs))
|
|
|
|
|
|
- def make_signer(self, salt: _t_opt_str_bytes = None) -> Signer:
|
|
|
+ def make_signer(self, salt: str | bytes | None = None) -> Signer:
|
|
|
"""Creates a new instance of the signer to be used. The default
|
|
|
implementation uses the :class:`.Signer` base class.
|
|
|
"""
|
|
@@ -177,7 +286,7 @@ class Serializer:
|
|
|
|
|
|
return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs)
|
|
|
|
|
|
- def iter_unsigners(self, salt: _t_opt_str_bytes = None) -> _t.Iterator[Signer]:
|
|
|
+ def iter_unsigners(self, salt: str | bytes | None = None) -> cabc.Iterator[Signer]:
|
|
|
"""Iterates over all signers to be tried for unsigning. Starts
|
|
|
with the configured signer, then constructs each signer
|
|
|
specified in ``fallback_signers``.
|
|
@@ -199,7 +308,7 @@ class Serializer:
|
|
|
for secret_key in self.secret_keys:
|
|
|
yield fallback(secret_key, salt=salt, **kwargs)
|
|
|
|
|
|
- def dumps(self, obj: _t.Any, salt: _t_opt_str_bytes = None) -> _t_str_bytes:
|
|
|
+ def dumps(self, obj: t.Any, salt: str | bytes | None = None) -> _TSerialized:
|
|
|
"""Returns a signed string serialized with the internal
|
|
|
serializer. The return value can be either a byte or unicode
|
|
|
string depending on the format of the internal serializer.
|
|
@@ -208,19 +317,19 @@ class Serializer:
|
|
|
rv = self.make_signer(salt).sign(payload)
|
|
|
|
|
|
if self.is_text_serializer:
|
|
|
- return rv.decode("utf-8")
|
|
|
+ return rv.decode("utf-8") # type: ignore[return-value]
|
|
|
|
|
|
- return rv
|
|
|
+ return rv # type: ignore[return-value]
|
|
|
|
|
|
- def dump(self, obj: _t.Any, f: _t.IO, salt: _t_opt_str_bytes = None) -> None:
|
|
|
+ def dump(self, obj: t.Any, f: t.IO[t.Any], salt: str | bytes | None = None) -> None:
|
|
|
"""Like :meth:`dumps` but dumps into a file. The file handle has
|
|
|
to be compatible with what the internal serializer expects.
|
|
|
"""
|
|
|
f.write(self.dumps(obj, salt))
|
|
|
|
|
|
def loads(
|
|
|
- self, s: _t_str_bytes, salt: _t_opt_str_bytes = None, **kwargs: _t.Any
|
|
|
- ) -> _t.Any:
|
|
|
+ self, s: str | bytes, salt: str | bytes | None = None, **kwargs: t.Any
|
|
|
+ ) -> t.Any:
|
|
|
"""Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the
|
|
|
signature validation fails.
|
|
|
"""
|
|
@@ -233,15 +342,15 @@ class Serializer:
|
|
|
except BadSignature as err:
|
|
|
last_exception = err
|
|
|
|
|
|
- raise _t.cast(BadSignature, last_exception)
|
|
|
+ raise t.cast(BadSignature, last_exception)
|
|
|
|
|
|
- def load(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t.Any:
|
|
|
+ def load(self, f: t.IO[t.Any], salt: str | bytes | None = None) -> t.Any:
|
|
|
"""Like :meth:`loads` but loads from a file."""
|
|
|
return self.loads(f.read(), salt)
|
|
|
|
|
|
def loads_unsafe(
|
|
|
- self, s: _t_str_bytes, salt: _t_opt_str_bytes = None
|
|
|
- ) -> _t_load_unsafe:
|
|
|
+ self, s: str | bytes, salt: str | bytes | None = None
|
|
|
+ ) -> tuple[bool, t.Any]:
|
|
|
"""Like :meth:`loads` but without verifying the signature. This
|
|
|
is potentially very dangerous to use depending on how your
|
|
|
serializer works. The return value is ``(signature_valid,
|
|
@@ -259,11 +368,11 @@ class Serializer:
|
|
|
|
|
|
def _loads_unsafe_impl(
|
|
|
self,
|
|
|
- s: _t_str_bytes,
|
|
|
- salt: _t_opt_str_bytes,
|
|
|
- load_kwargs: _t_opt_kwargs = None,
|
|
|
- load_payload_kwargs: _t_opt_kwargs = None,
|
|
|
- ) -> _t_load_unsafe:
|
|
|
+ s: str | bytes,
|
|
|
+ salt: str | bytes | None,
|
|
|
+ load_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ load_payload_kwargs: dict[str, t.Any] | None = None,
|
|
|
+ ) -> tuple[bool, t.Any]:
|
|
|
"""Low level helper function to implement :meth:`loads_unsafe`
|
|
|
in serializer subclasses.
|
|
|
"""
|
|
@@ -287,7 +396,9 @@ class Serializer:
|
|
|
except BadPayload:
|
|
|
return False, None
|
|
|
|
|
|
- def load_unsafe(self, f: _t.IO, salt: _t_opt_str_bytes = None) -> _t_load_unsafe:
|
|
|
+ def load_unsafe(
|
|
|
+ self, f: t.IO[t.Any], salt: str | bytes | None = None
|
|
|
+ ) -> tuple[bool, t.Any]:
|
|
|
"""Like :meth:`loads_unsafe` but loads from a file.
|
|
|
|
|
|
.. versionadded:: 0.15
|