123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- """Basic collect and runtest protocol implementations."""
- import bdb
- import dataclasses
- import os
- import sys
- from typing import Callable
- from typing import cast
- from typing import Dict
- from typing import Generic
- from typing import List
- from typing import Optional
- from typing import Tuple
- from typing import Type
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- from .reports import BaseReport
- from .reports import CollectErrorRepr
- from .reports import CollectReport
- from .reports import TestReport
- from _pytest import timing
- from _pytest._code.code import ExceptionChainRepr
- from _pytest._code.code import ExceptionInfo
- from _pytest._code.code import TerminalRepr
- from _pytest.compat import final
- from _pytest.config.argparsing import Parser
- from _pytest.deprecated import check_ispytest
- from _pytest.nodes import Collector
- from _pytest.nodes import Item
- from _pytest.nodes import Node
- from _pytest.outcomes import Exit
- from _pytest.outcomes import OutcomeException
- from _pytest.outcomes import Skipped
- from _pytest.outcomes import TEST_OUTCOME
- if sys.version_info[:2] < (3, 11):
- from exceptiongroup import BaseExceptionGroup
- if TYPE_CHECKING:
- from typing_extensions import Literal
- from _pytest.main import Session
- from _pytest.terminal import TerminalReporter
- #
- # pytest plugin hooks.
- def pytest_addoption(parser: Parser) -> None:
- group = parser.getgroup("terminal reporting", "Reporting", after="general")
- group.addoption(
- "--durations",
- action="store",
- type=int,
- default=None,
- metavar="N",
- help="Show N slowest setup/test durations (N=0 for all)",
- )
- group.addoption(
- "--durations-min",
- action="store",
- type=float,
- default=0.005,
- metavar="N",
- help="Minimal duration in seconds for inclusion in slowest list. "
- "Default: 0.005.",
- )
- def pytest_terminal_summary(terminalreporter: "TerminalReporter") -> None:
- durations = terminalreporter.config.option.durations
- durations_min = terminalreporter.config.option.durations_min
- verbose = terminalreporter.config.getvalue("verbose")
- if durations is None:
- return
- tr = terminalreporter
- dlist = []
- for replist in tr.stats.values():
- for rep in replist:
- if hasattr(rep, "duration"):
- dlist.append(rep)
- if not dlist:
- return
- dlist.sort(key=lambda x: x.duration, reverse=True) # type: ignore[no-any-return]
- if not durations:
- tr.write_sep("=", "slowest durations")
- else:
- tr.write_sep("=", "slowest %s durations" % durations)
- dlist = dlist[:durations]
- for i, rep in enumerate(dlist):
- if verbose < 2 and rep.duration < durations_min:
- tr.write_line("")
- tr.write_line(
- "(%s durations < %gs hidden. Use -vv to show these durations.)"
- % (len(dlist) - i, durations_min)
- )
- break
- tr.write_line(f"{rep.duration:02.2f}s {rep.when:<8} {rep.nodeid}")
- def pytest_sessionstart(session: "Session") -> None:
- session._setupstate = SetupState()
- def pytest_sessionfinish(session: "Session") -> None:
- session._setupstate.teardown_exact(None)
- def pytest_runtest_protocol(item: Item, nextitem: Optional[Item]) -> bool:
- ihook = item.ihook
- ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
- runtestprotocol(item, nextitem=nextitem)
- ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
- return True
- def runtestprotocol(
- item: Item, log: bool = True, nextitem: Optional[Item] = None
- ) -> List[TestReport]:
- hasrequest = hasattr(item, "_request")
- if hasrequest and not item._request: # type: ignore[attr-defined]
- # This only happens if the item is re-run, as is done by
- # pytest-rerunfailures.
- item._initrequest() # type: ignore[attr-defined]
- rep = call_and_report(item, "setup", log)
- reports = [rep]
- if rep.passed:
- if item.config.getoption("setupshow", False):
- show_test_item(item)
- if not item.config.getoption("setuponly", False):
- reports.append(call_and_report(item, "call", log))
- reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
- # After all teardown hooks have been called
- # want funcargs and request info to go away.
- if hasrequest:
- item._request = False # type: ignore[attr-defined]
- item.funcargs = None # type: ignore[attr-defined]
- return reports
- def show_test_item(item: Item) -> None:
- """Show test function, parameters and the fixtures of the test item."""
- tw = item.config.get_terminal_writer()
- tw.line()
- tw.write(" " * 8)
- tw.write(item.nodeid)
- used_fixtures = sorted(getattr(item, "fixturenames", []))
- if used_fixtures:
- tw.write(" (fixtures used: {})".format(", ".join(used_fixtures)))
- tw.flush()
- def pytest_runtest_setup(item: Item) -> None:
- _update_current_test_var(item, "setup")
- item.session._setupstate.setup(item)
- def pytest_runtest_call(item: Item) -> None:
- _update_current_test_var(item, "call")
- try:
- del sys.last_type
- del sys.last_value
- del sys.last_traceback
- except AttributeError:
- pass
- try:
- item.runtest()
- except Exception as e:
- # Store trace info to allow postmortem debugging
- sys.last_type = type(e)
- sys.last_value = e
- assert e.__traceback__ is not None
- # Skip *this* frame
- sys.last_traceback = e.__traceback__.tb_next
- raise e
- def pytest_runtest_teardown(item: Item, nextitem: Optional[Item]) -> None:
- _update_current_test_var(item, "teardown")
- item.session._setupstate.teardown_exact(nextitem)
- _update_current_test_var(item, None)
- def _update_current_test_var(
- item: Item, when: Optional["Literal['setup', 'call', 'teardown']"]
- ) -> None:
- """Update :envvar:`PYTEST_CURRENT_TEST` to reflect the current item and stage.
- If ``when`` is None, delete ``PYTEST_CURRENT_TEST`` from the environment.
- """
- var_name = "PYTEST_CURRENT_TEST"
- if when:
- value = f"{item.nodeid} ({when})"
- # don't allow null bytes on environment variables (see #2644, #2957)
- value = value.replace("\x00", "(null)")
- os.environ[var_name] = value
- else:
- os.environ.pop(var_name)
- def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str]]:
- if report.when in ("setup", "teardown"):
- if report.failed:
- # category, shortletter, verbose-word
- return "error", "E", "ERROR"
- elif report.skipped:
- return "skipped", "s", "SKIPPED"
- else:
- return "", "", ""
- return None
- #
- # Implementation
- def call_and_report(
- item: Item, when: "Literal['setup', 'call', 'teardown']", log: bool = True, **kwds
- ) -> TestReport:
- call = call_runtest_hook(item, when, **kwds)
- hook = item.ihook
- report: TestReport = hook.pytest_runtest_makereport(item=item, call=call)
- if log:
- hook.pytest_runtest_logreport(report=report)
- if check_interactive_exception(call, report):
- hook.pytest_exception_interact(node=item, call=call, report=report)
- return report
- def check_interactive_exception(call: "CallInfo[object]", report: BaseReport) -> bool:
- """Check whether the call raised an exception that should be reported as
- interactive."""
- if call.excinfo is None:
- # Didn't raise.
- return False
- if hasattr(report, "wasxfail"):
- # Exception was expected.
- return False
- if isinstance(call.excinfo.value, (Skipped, bdb.BdbQuit)):
- # Special control flow exception.
- return False
- return True
- def call_runtest_hook(
- item: Item, when: "Literal['setup', 'call', 'teardown']", **kwds
- ) -> "CallInfo[None]":
- if when == "setup":
- ihook: Callable[..., None] = item.ihook.pytest_runtest_setup
- elif when == "call":
- ihook = item.ihook.pytest_runtest_call
- elif when == "teardown":
- ihook = item.ihook.pytest_runtest_teardown
- else:
- assert False, f"Unhandled runtest hook case: {when}"
- reraise: Tuple[Type[BaseException], ...] = (Exit,)
- if not item.config.getoption("usepdb", False):
- reraise += (KeyboardInterrupt,)
- return CallInfo.from_call(
- lambda: ihook(item=item, **kwds), when=when, reraise=reraise
- )
- TResult = TypeVar("TResult", covariant=True)
- @final
- @dataclasses.dataclass
- class CallInfo(Generic[TResult]):
- """Result/Exception info of a function invocation."""
- _result: Optional[TResult]
- #: The captured exception of the call, if it raised.
- excinfo: Optional[ExceptionInfo[BaseException]]
- #: The system time when the call started, in seconds since the epoch.
- start: float
- #: The system time when the call ended, in seconds since the epoch.
- stop: float
- #: The call duration, in seconds.
- duration: float
- #: The context of invocation: "collect", "setup", "call" or "teardown".
- when: "Literal['collect', 'setup', 'call', 'teardown']"
- def __init__(
- self,
- result: Optional[TResult],
- excinfo: Optional[ExceptionInfo[BaseException]],
- start: float,
- stop: float,
- duration: float,
- when: "Literal['collect', 'setup', 'call', 'teardown']",
- *,
- _ispytest: bool = False,
- ) -> None:
- check_ispytest(_ispytest)
- self._result = result
- self.excinfo = excinfo
- self.start = start
- self.stop = stop
- self.duration = duration
- self.when = when
- @property
- def result(self) -> TResult:
- """The return value of the call, if it didn't raise.
- Can only be accessed if excinfo is None.
- """
- if self.excinfo is not None:
- raise AttributeError(f"{self!r} has no valid result")
- # The cast is safe because an exception wasn't raised, hence
- # _result has the expected function return type (which may be
- # None, that's why a cast and not an assert).
- return cast(TResult, self._result)
- @classmethod
- def from_call(
- cls,
- func: "Callable[[], TResult]",
- when: "Literal['collect', 'setup', 'call', 'teardown']",
- reraise: Optional[
- Union[Type[BaseException], Tuple[Type[BaseException], ...]]
- ] = None,
- ) -> "CallInfo[TResult]":
- """Call func, wrapping the result in a CallInfo.
- :param func:
- The function to call. Called without arguments.
- :param when:
- The phase in which the function is called.
- :param reraise:
- Exception or exceptions that shall propagate if raised by the
- function, instead of being wrapped in the CallInfo.
- """
- excinfo = None
- start = timing.time()
- precise_start = timing.perf_counter()
- try:
- result: Optional[TResult] = func()
- except BaseException:
- excinfo = ExceptionInfo.from_current()
- if reraise is not None and isinstance(excinfo.value, reraise):
- raise
- result = None
- # use the perf counter
- precise_stop = timing.perf_counter()
- duration = precise_stop - precise_start
- stop = timing.time()
- return cls(
- start=start,
- stop=stop,
- duration=duration,
- when=when,
- result=result,
- excinfo=excinfo,
- _ispytest=True,
- )
- def __repr__(self) -> str:
- if self.excinfo is None:
- return f"<CallInfo when={self.when!r} result: {self._result!r}>"
- return f"<CallInfo when={self.when!r} excinfo={self.excinfo!r}>"
- def pytest_runtest_makereport(item: Item, call: CallInfo[None]) -> TestReport:
- return TestReport.from_item_and_call(item, call)
- def pytest_make_collect_report(collector: Collector) -> CollectReport:
- call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
- longrepr: Union[None, Tuple[str, int, str], str, TerminalRepr] = None
- if not call.excinfo:
- outcome: Literal["passed", "skipped", "failed"] = "passed"
- else:
- skip_exceptions = [Skipped]
- unittest = sys.modules.get("unittest")
- if unittest is not None:
- # Type ignored because unittest is loaded dynamically.
- skip_exceptions.append(unittest.SkipTest) # type: ignore
- if isinstance(call.excinfo.value, tuple(skip_exceptions)):
- outcome = "skipped"
- r_ = collector._repr_failure_py(call.excinfo, "line")
- assert isinstance(r_, ExceptionChainRepr), repr(r_)
- r = r_.reprcrash
- assert r
- longrepr = (str(r.path), r.lineno, r.message)
- else:
- outcome = "failed"
- errorinfo = collector.repr_failure(call.excinfo)
- if not hasattr(errorinfo, "toterminal"):
- assert isinstance(errorinfo, str)
- errorinfo = CollectErrorRepr(errorinfo)
- longrepr = errorinfo
- result = call.result if not call.excinfo else None
- rep = CollectReport(collector.nodeid, outcome, longrepr, result)
- rep.call = call # type: ignore # see collect_one_node
- return rep
- class SetupState:
- """Shared state for setting up/tearing down test items or collectors
- in a session.
- Suppose we have a collection tree as follows:
- <Session session>
- <Module mod1>
- <Function item1>
- <Module mod2>
- <Function item2>
- The SetupState maintains a stack. The stack starts out empty:
- []
- During the setup phase of item1, setup(item1) is called. What it does
- is:
- push session to stack, run session.setup()
- push mod1 to stack, run mod1.setup()
- push item1 to stack, run item1.setup()
- The stack is:
- [session, mod1, item1]
- While the stack is in this shape, it is allowed to add finalizers to
- each of session, mod1, item1 using addfinalizer().
- During the teardown phase of item1, teardown_exact(item2) is called,
- where item2 is the next item to item1. What it does is:
- pop item1 from stack, run its teardowns
- pop mod1 from stack, run its teardowns
- mod1 was popped because it ended its purpose with item1. The stack is:
- [session]
- During the setup phase of item2, setup(item2) is called. What it does
- is:
- push mod2 to stack, run mod2.setup()
- push item2 to stack, run item2.setup()
- Stack:
- [session, mod2, item2]
- During the teardown phase of item2, teardown_exact(None) is called,
- because item2 is the last item. What it does is:
- pop item2 from stack, run its teardowns
- pop mod2 from stack, run its teardowns
- pop session from stack, run its teardowns
- Stack:
- []
- The end!
- """
- def __init__(self) -> None:
- # The stack is in the dict insertion order.
- self.stack: Dict[
- Node,
- Tuple[
- # Node's finalizers.
- List[Callable[[], object]],
- # Node's exception, if its setup raised.
- Optional[Union[OutcomeException, Exception]],
- ],
- ] = {}
- def setup(self, item: Item) -> None:
- """Setup objects along the collector chain to the item."""
- needed_collectors = item.listchain()
- # If a collector fails its setup, fail its entire subtree of items.
- # The setup is not retried for each item - the same exception is used.
- for col, (finalizers, exc) in self.stack.items():
- assert col in needed_collectors, "previous item was not torn down properly"
- if exc:
- raise exc
- for col in needed_collectors[len(self.stack) :]:
- assert col not in self.stack
- # Push onto the stack.
- self.stack[col] = ([col.teardown], None)
- try:
- col.setup()
- except TEST_OUTCOME as exc:
- self.stack[col] = (self.stack[col][0], exc)
- raise exc
- def addfinalizer(self, finalizer: Callable[[], object], node: Node) -> None:
- """Attach a finalizer to the given node.
- The node must be currently active in the stack.
- """
- assert node and not isinstance(node, tuple)
- assert callable(finalizer)
- assert node in self.stack, (node, self.stack)
- self.stack[node][0].append(finalizer)
- def teardown_exact(self, nextitem: Optional[Item]) -> None:
- """Teardown the current stack up until reaching nodes that nextitem
- also descends from.
- When nextitem is None (meaning we're at the last item), the entire
- stack is torn down.
- """
- needed_collectors = nextitem and nextitem.listchain() or []
- exceptions: List[BaseException] = []
- while self.stack:
- if list(self.stack.keys()) == needed_collectors[: len(self.stack)]:
- break
- node, (finalizers, _) = self.stack.popitem()
- these_exceptions = []
- while finalizers:
- fin = finalizers.pop()
- try:
- fin()
- except TEST_OUTCOME as e:
- these_exceptions.append(e)
- if len(these_exceptions) == 1:
- exceptions.extend(these_exceptions)
- elif these_exceptions:
- msg = f"errors while tearing down {node!r}"
- exceptions.append(BaseExceptionGroup(msg, these_exceptions[::-1]))
- if len(exceptions) == 1:
- raise exceptions[0]
- elif exceptions:
- raise BaseExceptionGroup("errors during test teardown", exceptions[::-1])
- if nextitem is None:
- assert not self.stack
- def collect_one_node(collector: Collector) -> CollectReport:
- ihook = collector.ihook
- ihook.pytest_collectstart(collector=collector)
- rep: CollectReport = ihook.pytest_make_collect_report(collector=collector)
- call = rep.__dict__.pop("call", None)
- if call and check_interactive_exception(call, rep):
- ihook.pytest_exception_interact(node=collector, call=call, report=rep)
- return rep
|