123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- # -*- coding: utf-8 -*-
- """ basic collect and runtest protocol implementations """
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import bdb
- import os
- import sys
- from time import time
- import attr
- import six
- from .reports import CollectErrorRepr
- from .reports import CollectReport
- from .reports import TestReport
- from _pytest._code.code import ExceptionInfo
- from _pytest.compat import safe_str
- from _pytest.outcomes import Exit
- from _pytest.outcomes import Skipped
- from _pytest.outcomes import TEST_OUTCOME
- #
- # pytest plugin hooks
- def pytest_addoption(parser):
- group = parser.getgroup("terminal reporting", "reporting", after="general")
- group.addoption(
- "--durations",
- action="store",
- type=int,
- default=None,
- metavar="N",
- help="show N slowest setup/test durations (N=0 for all).",
- ),
- def pytest_terminal_summary(terminalreporter):
- durations = terminalreporter.config.option.durations
- verbose = terminalreporter.config.getvalue("verbose")
- if durations is None:
- return
- tr = terminalreporter
- dlist = []
- for replist in tr.stats.values():
- for rep in replist:
- if hasattr(rep, "duration"):
- dlist.append(rep)
- if not dlist:
- return
- dlist.sort(key=lambda x: x.duration)
- dlist.reverse()
- if not durations:
- tr.write_sep("=", "slowest test durations")
- else:
- tr.write_sep("=", "slowest %s test durations" % durations)
- dlist = dlist[:durations]
- for rep in dlist:
- if verbose < 2 and rep.duration < 0.005:
- tr.write_line("")
- tr.write_line("(0.00 durations hidden. Use -vv to show these durations.)")
- break
- tr.write_line("%02.2fs %-8s %s" % (rep.duration, rep.when, rep.nodeid))
- def pytest_sessionstart(session):
- session._setupstate = SetupState()
- def pytest_sessionfinish(session):
- session._setupstate.teardown_all()
- def pytest_runtest_protocol(item, nextitem):
- item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
- runtestprotocol(item, nextitem=nextitem)
- item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
- return True
- def runtestprotocol(item, log=True, nextitem=None):
- hasrequest = hasattr(item, "_request")
- if hasrequest and not item._request:
- item._initrequest()
- rep = call_and_report(item, "setup", log)
- reports = [rep]
- if rep.passed:
- if item.config.getoption("setupshow", False):
- show_test_item(item)
- if not item.config.getoption("setuponly", False):
- reports.append(call_and_report(item, "call", log))
- reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
- # after all teardown hooks have been called
- # want funcargs and request info to go away
- if hasrequest:
- item._request = False
- item.funcargs = None
- return reports
- def show_test_item(item):
- """Show test function, parameters and the fixtures of the test item."""
- tw = item.config.get_terminal_writer()
- tw.line()
- tw.write(" " * 8)
- tw.write(item._nodeid)
- used_fixtures = sorted(item._fixtureinfo.name2fixturedefs.keys())
- if used_fixtures:
- tw.write(" (fixtures used: {})".format(", ".join(used_fixtures)))
- def pytest_runtest_setup(item):
- _update_current_test_var(item, "setup")
- item.session._setupstate.prepare(item)
- def pytest_runtest_call(item):
- _update_current_test_var(item, "call")
- sys.last_type, sys.last_value, sys.last_traceback = (None, None, None)
- try:
- item.runtest()
- except Exception:
- # Store trace info to allow postmortem debugging
- type, value, tb = sys.exc_info()
- tb = tb.tb_next # Skip *this* frame
- sys.last_type = type
- sys.last_value = value
- sys.last_traceback = tb
- del type, value, tb # Get rid of these in this frame
- raise
- def pytest_runtest_teardown(item, nextitem):
- _update_current_test_var(item, "teardown")
- item.session._setupstate.teardown_exact(item, nextitem)
- _update_current_test_var(item, None)
- def _update_current_test_var(item, when):
- """
- Update PYTEST_CURRENT_TEST to reflect the current item and stage.
- If ``when`` is None, delete PYTEST_CURRENT_TEST from the environment.
- """
- var_name = "PYTEST_CURRENT_TEST"
- if when:
- value = "{} ({})".format(item.nodeid, when)
- # don't allow null bytes on environment variables (see #2644, #2957)
- value = value.replace("\x00", "(null)")
- os.environ[var_name] = value
- else:
- os.environ.pop(var_name)
- def pytest_report_teststatus(report):
- if report.when in ("setup", "teardown"):
- if report.failed:
- # category, shortletter, verbose-word
- return "error", "E", "ERROR"
- elif report.skipped:
- return "skipped", "s", "SKIPPED"
- else:
- return "", "", ""
- #
- # Implementation
- def call_and_report(item, when, log=True, **kwds):
- call = call_runtest_hook(item, when, **kwds)
- hook = item.ihook
- report = hook.pytest_runtest_makereport(item=item, call=call)
- if log:
- hook.pytest_runtest_logreport(report=report)
- if check_interactive_exception(call, report):
- hook.pytest_exception_interact(node=item, call=call, report=report)
- return report
- def check_interactive_exception(call, report):
- return call.excinfo and not (
- hasattr(report, "wasxfail")
- or call.excinfo.errisinstance(Skipped)
- or call.excinfo.errisinstance(bdb.BdbQuit)
- )
- def call_runtest_hook(item, when, **kwds):
- hookname = "pytest_runtest_" + when
- ihook = getattr(item.ihook, hookname)
- reraise = (Exit,)
- if not item.config.getoption("usepdb", False):
- reraise += (KeyboardInterrupt,)
- return CallInfo.from_call(
- lambda: ihook(item=item, **kwds), when=when, reraise=reraise
- )
- @attr.s(repr=False)
- class CallInfo(object):
- """ Result/Exception info a function invocation. """
- _result = attr.ib()
- # Optional[ExceptionInfo]
- excinfo = attr.ib()
- start = attr.ib()
- stop = attr.ib()
- when = attr.ib()
- @property
- def result(self):
- if self.excinfo is not None:
- raise AttributeError("{!r} has no valid result".format(self))
- return self._result
- @classmethod
- def from_call(cls, func, when, reraise=None):
- #: context of invocation: one of "setup", "call",
- #: "teardown", "memocollect"
- start = time()
- excinfo = None
- try:
- result = func()
- except: # noqa
- excinfo = ExceptionInfo.from_current()
- if reraise is not None and excinfo.errisinstance(reraise):
- raise
- result = None
- stop = time()
- return cls(start=start, stop=stop, when=when, result=result, excinfo=excinfo)
- def __repr__(self):
- if self.excinfo is not None:
- status = "exception"
- value = self.excinfo.value
- else:
- # TODO: investigate unification
- value = repr(self._result)
- status = "result"
- return "<CallInfo when={when!r} {status}: {value}>".format(
- when=self.when, value=safe_str(value), status=status
- )
- def pytest_runtest_makereport(item, call):
- return TestReport.from_item_and_call(item, call)
- def pytest_make_collect_report(collector):
- call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
- longrepr = None
- if not call.excinfo:
- outcome = "passed"
- else:
- from _pytest import nose
- skip_exceptions = (Skipped,) + nose.get_skip_exceptions()
- if call.excinfo.errisinstance(skip_exceptions):
- outcome = "skipped"
- r = collector._repr_failure_py(call.excinfo, "line").reprcrash
- longrepr = (str(r.path), r.lineno, r.message)
- else:
- outcome = "failed"
- errorinfo = collector.repr_failure(call.excinfo)
- if not hasattr(errorinfo, "toterminal"):
- errorinfo = CollectErrorRepr(errorinfo)
- longrepr = errorinfo
- rep = CollectReport(
- collector.nodeid, outcome, longrepr, getattr(call, "result", None)
- )
- rep.call = call # see collect_one_node
- return rep
- class SetupState(object):
- """ shared state for setting up/tearing down test items or collectors. """
- def __init__(self):
- self.stack = []
- self._finalizers = {}
- def addfinalizer(self, finalizer, colitem):
- """ attach a finalizer to the given colitem.
- if colitem is None, this will add a finalizer that
- is called at the end of teardown_all().
- """
- assert colitem and not isinstance(colitem, tuple)
- assert callable(finalizer)
- # assert colitem in self.stack # some unit tests don't setup stack :/
- self._finalizers.setdefault(colitem, []).append(finalizer)
- def _pop_and_teardown(self):
- colitem = self.stack.pop()
- self._teardown_with_finalization(colitem)
- def _callfinalizers(self, colitem):
- finalizers = self._finalizers.pop(colitem, None)
- exc = None
- while finalizers:
- fin = finalizers.pop()
- try:
- fin()
- except TEST_OUTCOME:
- # XXX Only first exception will be seen by user,
- # ideally all should be reported.
- if exc is None:
- exc = sys.exc_info()
- if exc:
- six.reraise(*exc)
- def _teardown_with_finalization(self, colitem):
- self._callfinalizers(colitem)
- if hasattr(colitem, "teardown"):
- colitem.teardown()
- for colitem in self._finalizers:
- assert (
- colitem is None or colitem in self.stack or isinstance(colitem, tuple)
- )
- def teardown_all(self):
- while self.stack:
- self._pop_and_teardown()
- for key in list(self._finalizers):
- self._teardown_with_finalization(key)
- assert not self._finalizers
- def teardown_exact(self, item, nextitem):
- needed_collectors = nextitem and nextitem.listchain() or []
- self._teardown_towards(needed_collectors)
- def _teardown_towards(self, needed_collectors):
- exc = None
- while self.stack:
- if self.stack == needed_collectors[: len(self.stack)]:
- break
- try:
- self._pop_and_teardown()
- except TEST_OUTCOME:
- # XXX Only first exception will be seen by user,
- # ideally all should be reported.
- if exc is None:
- exc = sys.exc_info()
- if exc:
- six.reraise(*exc)
- def prepare(self, colitem):
- """ setup objects along the collector chain to the test-method
- and teardown previously setup objects."""
- needed_collectors = colitem.listchain()
- self._teardown_towards(needed_collectors)
- # check if the last collection node has raised an error
- for col in self.stack:
- if hasattr(col, "_prepare_exc"):
- six.reraise(*col._prepare_exc)
- for col in needed_collectors[len(self.stack) :]:
- self.stack.append(col)
- try:
- col.setup()
- except TEST_OUTCOME:
- col._prepare_exc = sys.exc_info()
- raise
- def collect_one_node(collector):
- ihook = collector.ihook
- ihook.pytest_collectstart(collector=collector)
- rep = ihook.pytest_make_collect_report(collector=collector)
- call = rep.__dict__.pop("call", None)
- if call and check_interactive_exception(call, rep):
- ihook.pytest_exception_interact(node=collector, call=call, report=rep)
- return rep
|