123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696 |
- import asyncio
- import builtins
- import functools
- import inspect
- import sys
- import unittest.mock
- import warnings
- from typing import Any
- from typing import Callable
- from typing import cast
- from typing import Dict
- from typing import Generator
- from typing import Iterable
- from typing import List
- from typing import Mapping
- from typing import Optional
- from typing import overload
- from typing import Tuple
- from typing import Type
- from typing import TypeVar
- from typing import Union
- import pytest
- from ._util import get_mock_module
- from ._util import parse_ini_boolean
- _T = TypeVar("_T")
- if sys.version_info >= (3, 8):
- AsyncMockType = unittest.mock.AsyncMock
- MockType = Union[
- unittest.mock.MagicMock,
- unittest.mock.AsyncMock,
- unittest.mock.NonCallableMagicMock,
- ]
- else:
- AsyncMockType = Any
- MockType = Union[unittest.mock.MagicMock, unittest.mock.NonCallableMagicMock]
- class PytestMockWarning(UserWarning):
- """Base class for all warnings emitted by pytest-mock."""
- class MockerFixture:
- """
- Fixture that provides the same interface to functions in the mock module,
- ensuring that they are uninstalled at the end of each test.
- """
- def __init__(self, config: Any) -> None:
- self._patches_and_mocks: List[Tuple[Any, unittest.mock.MagicMock]] = []
- self.mock_module = mock_module = get_mock_module(config)
- self.patch = self._Patcher(
- self._patches_and_mocks, mock_module
- ) # type: MockerFixture._Patcher
- # aliases for convenience
- self.Mock = mock_module.Mock
- self.MagicMock = mock_module.MagicMock
- self.NonCallableMock = mock_module.NonCallableMock
- self.NonCallableMagicMock = mock_module.NonCallableMagicMock
- self.PropertyMock = mock_module.PropertyMock
- if hasattr(mock_module, "AsyncMock"):
- self.AsyncMock = mock_module.AsyncMock
- self.call = mock_module.call
- self.ANY = mock_module.ANY
- self.DEFAULT = mock_module.DEFAULT
- self.sentinel = mock_module.sentinel
- self.mock_open = mock_module.mock_open
- if hasattr(mock_module, "seal"):
- self.seal = mock_module.seal
- def create_autospec(
- self, spec: Any, spec_set: bool = False, instance: bool = False, **kwargs: Any
- ) -> MockType:
- m: MockType = self.mock_module.create_autospec(
- spec, spec_set, instance, **kwargs
- )
- self._patches_and_mocks.append((None, m))
- return m
- def resetall(
- self, *, return_value: bool = False, side_effect: bool = False
- ) -> None:
- """
- Call reset_mock() on all patchers started by this fixture.
- :param bool return_value: Reset the return_value of mocks.
- :param bool side_effect: Reset the side_effect of mocks.
- """
- supports_reset_mock_with_args: Tuple[Type[Any], ...]
- if hasattr(self, "AsyncMock"):
- supports_reset_mock_with_args = (self.Mock, self.AsyncMock)
- else:
- supports_reset_mock_with_args = (self.Mock,)
- for p, m in self._patches_and_mocks:
- # See issue #237.
- if not hasattr(m, "reset_mock"):
- continue
- if isinstance(m, supports_reset_mock_with_args):
- m.reset_mock(return_value=return_value, side_effect=side_effect)
- else:
- m.reset_mock()
- def stopall(self) -> None:
- """
- Stop all patchers started by this fixture. Can be safely called multiple
- times.
- """
- for p, m in reversed(self._patches_and_mocks):
- if p is not None:
- p.stop()
- self._patches_and_mocks.clear()
- def stop(self, mock: unittest.mock.MagicMock) -> None:
- """
- Stops a previous patch or spy call by passing the ``MagicMock`` object
- returned by it.
- """
- for index, (p, m) in enumerate(self._patches_and_mocks):
- if mock is m:
- p.stop()
- del self._patches_and_mocks[index]
- break
- else:
- raise ValueError("This mock object is not registered")
- def spy(self, obj: object, name: str) -> MockType:
- """
- Create a spy of method. It will run method normally, but it is now
- possible to use `mock` call features with it, like call count.
- :param obj: An object.
- :param name: A method in object.
- :return: Spy object.
- """
- method = getattr(obj, name)
- if inspect.isclass(obj) and isinstance(
- inspect.getattr_static(obj, name), (classmethod, staticmethod)
- ):
- # Can't use autospec classmethod or staticmethod objects before 3.7
- # see: https://bugs.python.org/issue23078
- autospec = False
- else:
- autospec = inspect.ismethod(method) or inspect.isfunction(method)
- def wrapper(*args, **kwargs):
- spy_obj.spy_return = None
- spy_obj.spy_exception = None
- try:
- r = method(*args, **kwargs)
- except BaseException as e:
- spy_obj.spy_exception = e
- raise
- else:
- spy_obj.spy_return = r
- return r
- async def async_wrapper(*args, **kwargs):
- spy_obj.spy_return = None
- spy_obj.spy_exception = None
- try:
- r = await method(*args, **kwargs)
- except BaseException as e:
- spy_obj.spy_exception = e
- raise
- else:
- spy_obj.spy_return = r
- return r
- if asyncio.iscoroutinefunction(method):
- wrapped = functools.update_wrapper(async_wrapper, method)
- else:
- wrapped = functools.update_wrapper(wrapper, method)
- spy_obj = self.patch.object(obj, name, side_effect=wrapped, autospec=autospec)
- spy_obj.spy_return = None
- spy_obj.spy_exception = None
- return spy_obj
- def stub(self, name: Optional[str] = None) -> unittest.mock.MagicMock:
- """
- Create a stub method. It accepts any arguments. Ideal to register to
- callbacks in tests.
- :param name: the constructed stub's name as used in repr
- :return: Stub object.
- """
- return cast(
- unittest.mock.MagicMock,
- self.mock_module.MagicMock(spec=lambda *args, **kwargs: None, name=name),
- )
- def async_stub(self, name: Optional[str] = None) -> AsyncMockType:
- """
- Create a async stub method. It accepts any arguments. Ideal to register to
- callbacks in tests.
- :param name: the constructed stub's name as used in repr
- :return: Stub object.
- """
- return cast(
- AsyncMockType,
- self.mock_module.AsyncMock(spec=lambda *args, **kwargs: None, name=name),
- )
- class _Patcher:
- """
- Object to provide the same interface as mock.patch, mock.patch.object,
- etc. We need this indirection to keep the same API of the mock package.
- """
- DEFAULT = object()
- def __init__(self, patches_and_mocks, mock_module):
- self.__patches_and_mocks = patches_and_mocks
- self.mock_module = mock_module
- def _start_patch(
- self, mock_func: Any, warn_on_mock_enter: bool, *args: Any, **kwargs: Any
- ) -> MockType:
- """Patches something by calling the given function from the mock
- module, registering the patch to stop it later and returns the
- mock object resulting from the mock call.
- """
- p = mock_func(*args, **kwargs)
- mocked: MockType = p.start()
- self.__patches_and_mocks.append((p, mocked))
- if hasattr(mocked, "reset_mock"):
- # check if `mocked` is actually a mock object, as depending on autospec or target
- # parameters `mocked` can be anything
- if hasattr(mocked, "__enter__") and warn_on_mock_enter:
- if sys.version_info >= (3, 8):
- depth = 5
- else:
- depth = 4
- mocked.__enter__.side_effect = lambda: warnings.warn(
- "Mocks returned by pytest-mock do not need to be used as context managers. "
- "The mocker fixture automatically undoes mocking at the end of a test. "
- "This warning can be ignored if it was triggered by mocking a context manager. "
- "https://pytest-mock.readthedocs.io/en/latest/remarks.html#usage-as-context-manager",
- PytestMockWarning,
- stacklevel=depth,
- )
- return mocked
- def object(
- self,
- target: object,
- attribute: str,
- new: object = DEFAULT,
- spec: Optional[object] = None,
- create: bool = False,
- spec_set: Optional[object] = None,
- autospec: Optional[object] = None,
- new_callable: object = None,
- **kwargs: Any
- ) -> MockType:
- """API to mock.patch.object"""
- if new is self.DEFAULT:
- new = self.mock_module.DEFAULT
- return self._start_patch(
- self.mock_module.patch.object,
- True,
- target,
- attribute,
- new=new,
- spec=spec,
- create=create,
- spec_set=spec_set,
- autospec=autospec,
- new_callable=new_callable,
- **kwargs
- )
- def context_manager(
- self,
- target: builtins.object,
- attribute: str,
- new: builtins.object = DEFAULT,
- spec: Optional[builtins.object] = None,
- create: bool = False,
- spec_set: Optional[builtins.object] = None,
- autospec: Optional[builtins.object] = None,
- new_callable: builtins.object = None,
- **kwargs: Any
- ) -> MockType:
- """This is equivalent to mock.patch.object except that the returned mock
- does not issue a warning when used as a context manager."""
- if new is self.DEFAULT:
- new = self.mock_module.DEFAULT
- return self._start_patch(
- self.mock_module.patch.object,
- False,
- target,
- attribute,
- new=new,
- spec=spec,
- create=create,
- spec_set=spec_set,
- autospec=autospec,
- new_callable=new_callable,
- **kwargs
- )
- def multiple(
- self,
- target: builtins.object,
- spec: Optional[builtins.object] = None,
- create: bool = False,
- spec_set: Optional[builtins.object] = None,
- autospec: Optional[builtins.object] = None,
- new_callable: Optional[builtins.object] = None,
- **kwargs: Any
- ) -> Dict[str, MockType]:
- """API to mock.patch.multiple"""
- return self._start_patch(
- self.mock_module.patch.multiple,
- True,
- target,
- spec=spec,
- create=create,
- spec_set=spec_set,
- autospec=autospec,
- new_callable=new_callable,
- **kwargs
- )
- def dict(
- self,
- in_dict: Union[Mapping[Any, Any], str],
- values: Union[Mapping[Any, Any], Iterable[Tuple[Any, Any]]] = (),
- clear: bool = False,
- **kwargs: Any
- ) -> Any:
- """API to mock.patch.dict"""
- return self._start_patch(
- self.mock_module.patch.dict,
- True,
- in_dict,
- values=values,
- clear=clear,
- **kwargs
- )
- @overload
- def __call__(
- self,
- target: str,
- new: None = ...,
- spec: Optional[builtins.object] = ...,
- create: bool = ...,
- spec_set: Optional[builtins.object] = ...,
- autospec: Optional[builtins.object] = ...,
- new_callable: None = ...,
- **kwargs: Any
- ) -> MockType:
- ...
- @overload
- def __call__(
- self,
- target: str,
- new: _T,
- spec: Optional[builtins.object] = ...,
- create: bool = ...,
- spec_set: Optional[builtins.object] = ...,
- autospec: Optional[builtins.object] = ...,
- new_callable: None = ...,
- **kwargs: Any
- ) -> _T:
- ...
- @overload
- def __call__(
- self,
- target: str,
- new: None,
- spec: Optional[builtins.object],
- create: bool,
- spec_set: Optional[builtins.object],
- autospec: Optional[builtins.object],
- new_callable: Callable[[], _T],
- **kwargs: Any
- ) -> _T:
- ...
- @overload
- def __call__(
- self,
- target: str,
- new: None = ...,
- spec: Optional[builtins.object] = ...,
- create: bool = ...,
- spec_set: Optional[builtins.object] = ...,
- autospec: Optional[builtins.object] = ...,
- *,
- new_callable: Callable[[], _T],
- **kwargs: Any
- ) -> _T:
- ...
- def __call__(
- self,
- target: str,
- new: builtins.object = DEFAULT,
- spec: Optional[builtins.object] = None,
- create: bool = False,
- spec_set: Optional[builtins.object] = None,
- autospec: Optional[builtins.object] = None,
- new_callable: Optional[Callable[[], Any]] = None,
- **kwargs: Any
- ) -> Any:
- """API to mock.patch"""
- if new is self.DEFAULT:
- new = self.mock_module.DEFAULT
- return self._start_patch(
- self.mock_module.patch,
- True,
- target,
- new=new,
- spec=spec,
- create=create,
- spec_set=spec_set,
- autospec=autospec,
- new_callable=new_callable,
- **kwargs
- )
- def _mocker(pytestconfig: Any) -> Generator[MockerFixture, None, None]:
- """
- Return an object that has the same interface to the `mock` module, but
- takes care of automatically undoing all patches after each test method.
- """
- result = MockerFixture(pytestconfig)
- yield result
- result.stopall()
- mocker = pytest.fixture()(_mocker) # default scope is function
- class_mocker = pytest.fixture(scope="class")(_mocker)
- module_mocker = pytest.fixture(scope="module")(_mocker)
- package_mocker = pytest.fixture(scope="package")(_mocker)
- session_mocker = pytest.fixture(scope="session")(_mocker)
- _mock_module_patches = [] # type: List[Any]
- _mock_module_originals = {} # type: Dict[str, Any]
- def assert_wrapper(
- __wrapped_mock_method__: Callable[..., Any], *args: Any, **kwargs: Any
- ) -> None:
- __tracebackhide__ = True
- try:
- __wrapped_mock_method__(*args, **kwargs)
- return
- except AssertionError as e:
- if getattr(e, "_mock_introspection_applied", 0):
- msg = str(e)
- else:
- __mock_self = args[0]
- msg = str(e)
- if __mock_self.call_args is not None:
- actual_args, actual_kwargs = __mock_self.call_args
- introspection = ""
- try:
- assert actual_args == args[1:]
- except AssertionError as e_args:
- introspection += "\nArgs:\n" + str(e_args)
- try:
- assert actual_kwargs == kwargs
- except AssertionError as e_kwargs:
- introspection += "\nKwargs:\n" + str(e_kwargs)
- if introspection:
- msg += "\n\npytest introspection follows:\n" + introspection
- e = AssertionError(msg)
- e._mock_introspection_applied = True # type:ignore[attr-defined]
- raise e
- def assert_has_calls_wrapper(
- __wrapped_mock_method__: Callable[..., Any], *args: Any, **kwargs: Any
- ) -> None:
- __tracebackhide__ = True
- try:
- __wrapped_mock_method__(*args, **kwargs)
- return
- except AssertionError as e:
- any_order = kwargs.get("any_order", False)
- if getattr(e, "_mock_introspection_applied", 0) or any_order:
- msg = str(e)
- else:
- __mock_self = args[0]
- msg = str(e)
- if __mock_self.call_args_list is not None:
- actual_calls = list(__mock_self.call_args_list)
- expect_calls = args[1]
- introspection = ""
- from itertools import zip_longest
- for actual_call, expect_call in zip_longest(actual_calls, expect_calls):
- if actual_call is not None:
- actual_args, actual_kwargs = actual_call
- else:
- actual_args = tuple()
- actual_kwargs = {}
- if expect_call is not None:
- _, expect_args, expect_kwargs = expect_call
- else:
- expect_args = tuple()
- expect_kwargs = {}
- try:
- assert actual_args == expect_args
- except AssertionError as e_args:
- introspection += "\nArgs:\n" + str(e_args)
- try:
- assert actual_kwargs == expect_kwargs
- except AssertionError as e_kwargs:
- introspection += "\nKwargs:\n" + str(e_kwargs)
- if introspection:
- msg += "\n\npytest introspection follows:\n" + introspection
- e = AssertionError(msg)
- e._mock_introspection_applied = True # type:ignore[attr-defined]
- raise e
- def wrap_assert_not_called(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_not_called"], *args, **kwargs)
- def wrap_assert_called_with(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_called_with"], *args, **kwargs)
- def wrap_assert_called_once(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_called_once"], *args, **kwargs)
- def wrap_assert_called_once_with(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_called_once_with"], *args, **kwargs)
- def wrap_assert_has_calls(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_has_calls_wrapper(
- _mock_module_originals["assert_has_calls"], *args, **kwargs
- )
- def wrap_assert_any_call(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_any_call"], *args, **kwargs)
- def wrap_assert_called(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_called"], *args, **kwargs)
- def wrap_assert_not_awaited(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_not_awaited"], *args, **kwargs)
- def wrap_assert_awaited_with(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_awaited_with"], *args, **kwargs)
- def wrap_assert_awaited_once(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_awaited_once"], *args, **kwargs)
- def wrap_assert_awaited_once_with(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_awaited_once_with"], *args, **kwargs)
- def wrap_assert_has_awaits(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_has_awaits"], *args, **kwargs)
- def wrap_assert_any_await(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_any_await"], *args, **kwargs)
- def wrap_assert_awaited(*args: Any, **kwargs: Any) -> None:
- __tracebackhide__ = True
- assert_wrapper(_mock_module_originals["assert_awaited"], *args, **kwargs)
- def wrap_assert_methods(config: Any) -> None:
- """
- Wrap assert methods of mock module so we can hide their traceback and
- add introspection information to specified argument asserts.
- """
- # Make sure we only do this once
- if _mock_module_originals:
- return
- mock_module = get_mock_module(config)
- wrappers = {
- "assert_called": wrap_assert_called,
- "assert_called_once": wrap_assert_called_once,
- "assert_called_with": wrap_assert_called_with,
- "assert_called_once_with": wrap_assert_called_once_with,
- "assert_any_call": wrap_assert_any_call,
- "assert_has_calls": wrap_assert_has_calls,
- "assert_not_called": wrap_assert_not_called,
- }
- for method, wrapper in wrappers.items():
- try:
- original = getattr(mock_module.NonCallableMock, method)
- except AttributeError: # pragma: no cover
- continue
- _mock_module_originals[method] = original
- patcher = mock_module.patch.object(mock_module.NonCallableMock, method, wrapper)
- patcher.start()
- _mock_module_patches.append(patcher)
- if hasattr(mock_module, "AsyncMock"):
- async_wrappers = {
- "assert_awaited": wrap_assert_awaited,
- "assert_awaited_once": wrap_assert_awaited_once,
- "assert_awaited_with": wrap_assert_awaited_with,
- "assert_awaited_once_with": wrap_assert_awaited_once_with,
- "assert_any_await": wrap_assert_any_await,
- "assert_has_awaits": wrap_assert_has_awaits,
- "assert_not_awaited": wrap_assert_not_awaited,
- }
- for method, wrapper in async_wrappers.items():
- try:
- original = getattr(mock_module.AsyncMock, method)
- except AttributeError: # pragma: no cover
- continue
- _mock_module_originals[method] = original
- patcher = mock_module.patch.object(mock_module.AsyncMock, method, wrapper)
- patcher.start()
- _mock_module_patches.append(patcher)
- config.add_cleanup(unwrap_assert_methods)
- def unwrap_assert_methods() -> None:
- for patcher in _mock_module_patches:
- try:
- patcher.stop()
- except RuntimeError as e:
- # a patcher might have been stopped by user code (#137)
- # so we need to catch this error here and ignore it;
- # unfortunately there's no public API to check if a patch
- # has been started, so catching the error it is
- if str(e) == "stop called on unstarted patcher":
- pass
- else:
- raise
- _mock_module_patches[:] = []
- _mock_module_originals.clear()
- def pytest_addoption(parser: Any) -> None:
- parser.addini(
- "mock_traceback_monkeypatch",
- "Monkeypatch the mock library to improve reporting of the "
- "assert_called_... methods",
- default=True,
- )
- parser.addini(
- "mock_use_standalone_module",
- 'Use standalone "mock" (from PyPI) instead of builtin "unittest.mock" '
- "on Python 3",
- default=False,
- )
- def pytest_configure(config: Any) -> None:
- tb = config.getoption("--tb", default="auto")
- if (
- parse_ini_boolean(config.getini("mock_traceback_monkeypatch"))
- and tb != "native"
- ):
- wrap_assert_methods(config)
|