runner.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. # -*- coding: utf-8 -*-
  2. """ basic collect and runtest protocol implementations """
  3. from __future__ import absolute_import
  4. from __future__ import division
  5. from __future__ import print_function
  6. import bdb
  7. import os
  8. import sys
  9. from time import time
  10. import attr
  11. import six
  12. from .reports import CollectErrorRepr
  13. from .reports import CollectReport
  14. from .reports import TestReport
  15. from _pytest._code.code import ExceptionInfo
  16. from _pytest.compat import safe_str
  17. from _pytest.outcomes import Exit
  18. from _pytest.outcomes import Skipped
  19. from _pytest.outcomes import TEST_OUTCOME
  20. #
  21. # pytest plugin hooks
  22. def pytest_addoption(parser):
  23. group = parser.getgroup("terminal reporting", "reporting", after="general")
  24. group.addoption(
  25. "--durations",
  26. action="store",
  27. type=int,
  28. default=None,
  29. metavar="N",
  30. help="show N slowest setup/test durations (N=0 for all).",
  31. ),
  32. def pytest_terminal_summary(terminalreporter):
  33. durations = terminalreporter.config.option.durations
  34. verbose = terminalreporter.config.getvalue("verbose")
  35. if durations is None:
  36. return
  37. tr = terminalreporter
  38. dlist = []
  39. for replist in tr.stats.values():
  40. for rep in replist:
  41. if hasattr(rep, "duration"):
  42. dlist.append(rep)
  43. if not dlist:
  44. return
  45. dlist.sort(key=lambda x: x.duration)
  46. dlist.reverse()
  47. if not durations:
  48. tr.write_sep("=", "slowest test durations")
  49. else:
  50. tr.write_sep("=", "slowest %s test durations" % durations)
  51. dlist = dlist[:durations]
  52. for rep in dlist:
  53. if verbose < 2 and rep.duration < 0.005:
  54. tr.write_line("")
  55. tr.write_line("(0.00 durations hidden. Use -vv to show these durations.)")
  56. break
  57. tr.write_line("%02.2fs %-8s %s" % (rep.duration, rep.when, rep.nodeid))
  58. def pytest_sessionstart(session):
  59. session._setupstate = SetupState()
  60. def pytest_sessionfinish(session):
  61. session._setupstate.teardown_all()
  62. def pytest_runtest_protocol(item, nextitem):
  63. item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
  64. runtestprotocol(item, nextitem=nextitem)
  65. item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
  66. return True
  67. def runtestprotocol(item, log=True, nextitem=None):
  68. hasrequest = hasattr(item, "_request")
  69. if hasrequest and not item._request:
  70. item._initrequest()
  71. rep = call_and_report(item, "setup", log)
  72. reports = [rep]
  73. if rep.passed:
  74. if item.config.getoption("setupshow", False):
  75. show_test_item(item)
  76. if not item.config.getoption("setuponly", False):
  77. reports.append(call_and_report(item, "call", log))
  78. reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
  79. # after all teardown hooks have been called
  80. # want funcargs and request info to go away
  81. if hasrequest:
  82. item._request = False
  83. item.funcargs = None
  84. return reports
  85. def show_test_item(item):
  86. """Show test function, parameters and the fixtures of the test item."""
  87. tw = item.config.get_terminal_writer()
  88. tw.line()
  89. tw.write(" " * 8)
  90. tw.write(item._nodeid)
  91. used_fixtures = sorted(item._fixtureinfo.name2fixturedefs.keys())
  92. if used_fixtures:
  93. tw.write(" (fixtures used: {})".format(", ".join(used_fixtures)))
  94. def pytest_runtest_setup(item):
  95. _update_current_test_var(item, "setup")
  96. item.session._setupstate.prepare(item)
  97. def pytest_runtest_call(item):
  98. _update_current_test_var(item, "call")
  99. sys.last_type, sys.last_value, sys.last_traceback = (None, None, None)
  100. try:
  101. item.runtest()
  102. except Exception:
  103. # Store trace info to allow postmortem debugging
  104. type, value, tb = sys.exc_info()
  105. tb = tb.tb_next # Skip *this* frame
  106. sys.last_type = type
  107. sys.last_value = value
  108. sys.last_traceback = tb
  109. del type, value, tb # Get rid of these in this frame
  110. raise
  111. def pytest_runtest_teardown(item, nextitem):
  112. _update_current_test_var(item, "teardown")
  113. item.session._setupstate.teardown_exact(item, nextitem)
  114. _update_current_test_var(item, None)
  115. def _update_current_test_var(item, when):
  116. """
  117. Update PYTEST_CURRENT_TEST to reflect the current item and stage.
  118. If ``when`` is None, delete PYTEST_CURRENT_TEST from the environment.
  119. """
  120. var_name = "PYTEST_CURRENT_TEST"
  121. if when:
  122. value = "{} ({})".format(item.nodeid, when)
  123. # don't allow null bytes on environment variables (see #2644, #2957)
  124. value = value.replace("\x00", "(null)")
  125. os.environ[var_name] = value
  126. else:
  127. os.environ.pop(var_name)
  128. def pytest_report_teststatus(report):
  129. if report.when in ("setup", "teardown"):
  130. if report.failed:
  131. # category, shortletter, verbose-word
  132. return "error", "E", "ERROR"
  133. elif report.skipped:
  134. return "skipped", "s", "SKIPPED"
  135. else:
  136. return "", "", ""
  137. #
  138. # Implementation
  139. def call_and_report(item, when, log=True, **kwds):
  140. call = call_runtest_hook(item, when, **kwds)
  141. hook = item.ihook
  142. report = hook.pytest_runtest_makereport(item=item, call=call)
  143. if log:
  144. hook.pytest_runtest_logreport(report=report)
  145. if check_interactive_exception(call, report):
  146. hook.pytest_exception_interact(node=item, call=call, report=report)
  147. return report
  148. def check_interactive_exception(call, report):
  149. return call.excinfo and not (
  150. hasattr(report, "wasxfail")
  151. or call.excinfo.errisinstance(Skipped)
  152. or call.excinfo.errisinstance(bdb.BdbQuit)
  153. )
  154. def call_runtest_hook(item, when, **kwds):
  155. hookname = "pytest_runtest_" + when
  156. ihook = getattr(item.ihook, hookname)
  157. reraise = (Exit,)
  158. if not item.config.getoption("usepdb", False):
  159. reraise += (KeyboardInterrupt,)
  160. return CallInfo.from_call(
  161. lambda: ihook(item=item, **kwds), when=when, reraise=reraise
  162. )
  163. @attr.s(repr=False)
  164. class CallInfo(object):
  165. """ Result/Exception info a function invocation. """
  166. _result = attr.ib()
  167. # Optional[ExceptionInfo]
  168. excinfo = attr.ib()
  169. start = attr.ib()
  170. stop = attr.ib()
  171. when = attr.ib()
  172. @property
  173. def result(self):
  174. if self.excinfo is not None:
  175. raise AttributeError("{!r} has no valid result".format(self))
  176. return self._result
  177. @classmethod
  178. def from_call(cls, func, when, reraise=None):
  179. #: context of invocation: one of "setup", "call",
  180. #: "teardown", "memocollect"
  181. start = time()
  182. excinfo = None
  183. try:
  184. result = func()
  185. except: # noqa
  186. excinfo = ExceptionInfo.from_current()
  187. if reraise is not None and excinfo.errisinstance(reraise):
  188. raise
  189. result = None
  190. stop = time()
  191. return cls(start=start, stop=stop, when=when, result=result, excinfo=excinfo)
  192. def __repr__(self):
  193. if self.excinfo is not None:
  194. status = "exception"
  195. value = self.excinfo.value
  196. else:
  197. # TODO: investigate unification
  198. value = repr(self._result)
  199. status = "result"
  200. return "<CallInfo when={when!r} {status}: {value}>".format(
  201. when=self.when, value=safe_str(value), status=status
  202. )
  203. def pytest_runtest_makereport(item, call):
  204. return TestReport.from_item_and_call(item, call)
  205. def pytest_make_collect_report(collector):
  206. call = CallInfo.from_call(lambda: list(collector.collect()), "collect")
  207. longrepr = None
  208. if not call.excinfo:
  209. outcome = "passed"
  210. else:
  211. from _pytest import nose
  212. skip_exceptions = (Skipped,) + nose.get_skip_exceptions()
  213. if call.excinfo.errisinstance(skip_exceptions):
  214. outcome = "skipped"
  215. r = collector._repr_failure_py(call.excinfo, "line").reprcrash
  216. longrepr = (str(r.path), r.lineno, r.message)
  217. else:
  218. outcome = "failed"
  219. errorinfo = collector.repr_failure(call.excinfo)
  220. if not hasattr(errorinfo, "toterminal"):
  221. errorinfo = CollectErrorRepr(errorinfo)
  222. longrepr = errorinfo
  223. rep = CollectReport(
  224. collector.nodeid, outcome, longrepr, getattr(call, "result", None)
  225. )
  226. rep.call = call # see collect_one_node
  227. return rep
  228. class SetupState(object):
  229. """ shared state for setting up/tearing down test items or collectors. """
  230. def __init__(self):
  231. self.stack = []
  232. self._finalizers = {}
  233. def addfinalizer(self, finalizer, colitem):
  234. """ attach a finalizer to the given colitem.
  235. if colitem is None, this will add a finalizer that
  236. is called at the end of teardown_all().
  237. """
  238. assert colitem and not isinstance(colitem, tuple)
  239. assert callable(finalizer)
  240. # assert colitem in self.stack # some unit tests don't setup stack :/
  241. self._finalizers.setdefault(colitem, []).append(finalizer)
  242. def _pop_and_teardown(self):
  243. colitem = self.stack.pop()
  244. self._teardown_with_finalization(colitem)
  245. def _callfinalizers(self, colitem):
  246. finalizers = self._finalizers.pop(colitem, None)
  247. exc = None
  248. while finalizers:
  249. fin = finalizers.pop()
  250. try:
  251. fin()
  252. except TEST_OUTCOME:
  253. # XXX Only first exception will be seen by user,
  254. # ideally all should be reported.
  255. if exc is None:
  256. exc = sys.exc_info()
  257. if exc:
  258. six.reraise(*exc)
  259. def _teardown_with_finalization(self, colitem):
  260. self._callfinalizers(colitem)
  261. if hasattr(colitem, "teardown"):
  262. colitem.teardown()
  263. for colitem in self._finalizers:
  264. assert (
  265. colitem is None or colitem in self.stack or isinstance(colitem, tuple)
  266. )
  267. def teardown_all(self):
  268. while self.stack:
  269. self._pop_and_teardown()
  270. for key in list(self._finalizers):
  271. self._teardown_with_finalization(key)
  272. assert not self._finalizers
  273. def teardown_exact(self, item, nextitem):
  274. needed_collectors = nextitem and nextitem.listchain() or []
  275. self._teardown_towards(needed_collectors)
  276. def _teardown_towards(self, needed_collectors):
  277. exc = None
  278. while self.stack:
  279. if self.stack == needed_collectors[: len(self.stack)]:
  280. break
  281. try:
  282. self._pop_and_teardown()
  283. except TEST_OUTCOME:
  284. # XXX Only first exception will be seen by user,
  285. # ideally all should be reported.
  286. if exc is None:
  287. exc = sys.exc_info()
  288. if exc:
  289. six.reraise(*exc)
  290. def prepare(self, colitem):
  291. """ setup objects along the collector chain to the test-method
  292. and teardown previously setup objects."""
  293. needed_collectors = colitem.listchain()
  294. self._teardown_towards(needed_collectors)
  295. # check if the last collection node has raised an error
  296. for col in self.stack:
  297. if hasattr(col, "_prepare_exc"):
  298. six.reraise(*col._prepare_exc)
  299. for col in needed_collectors[len(self.stack) :]:
  300. self.stack.append(col)
  301. try:
  302. col.setup()
  303. except TEST_OUTCOME:
  304. col._prepare_exc = sys.exc_info()
  305. raise
  306. def collect_one_node(collector):
  307. ihook = collector.ihook
  308. ihook.pytest_collectstart(collector=collector)
  309. rep = ihook.pytest_make_collect_report(collector=collector)
  310. call = rep.__dict__.pop("call", None)
  311. if call and check_interactive_exception(call, rep):
  312. ihook.pytest_exception_interact(node=collector, call=call, report=rep)
  313. return rep