_asynctest.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. # -*- test-case-name: twisted.trial.test -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Things likely to be used by writers of unit tests.
  6. Maintainer: Jonathan Lange
  7. """
  8. import inspect
  9. import warnings
  10. from typing import Callable, List
  11. from zope.interface import implementer
  12. from typing_extensions import ParamSpec
  13. # We can't import reactor at module-level because this code runs before trial
  14. # installs a user-specified reactor, installing the default reactor and
  15. # breaking reactor installation. See also #6047.
  16. from twisted.internet import defer, utils
  17. from twisted.python import failure
  18. from twisted.trial import itrial, util
  19. from twisted.trial._synctest import FailTest, SkipTest, SynchronousTestCase
  20. _P = ParamSpec("_P")
  21. _wait_is_running: List[None] = []
  22. @implementer(itrial.ITestCase)
  23. class TestCase(SynchronousTestCase):
  24. """
  25. A unit test. The atom of the unit testing universe.
  26. This class extends L{SynchronousTestCase} which extends C{unittest.TestCase}
  27. from the standard library. The main feature is the ability to return
  28. C{Deferred}s from tests and fixture methods and to have the suite wait for
  29. those C{Deferred}s to fire. Also provides new assertions such as
  30. L{assertFailure}.
  31. @ivar timeout: A real number of seconds. If set, the test will
  32. raise an error if it takes longer than C{timeout} seconds.
  33. If not set, util.DEFAULT_TIMEOUT_DURATION is used.
  34. """
  35. def __init__(self, methodName="runTest"):
  36. """
  37. Construct an asynchronous test case for C{methodName}.
  38. @param methodName: The name of a method on C{self}. This method should
  39. be a unit test. That is, it should be a short method that calls some of
  40. the assert* methods. If C{methodName} is unspecified,
  41. L{SynchronousTestCase.runTest} will be used as the test method. This is
  42. mostly useful for testing Trial.
  43. """
  44. super().__init__(methodName)
  45. def assertFailure(self, deferred, *expectedFailures):
  46. """
  47. Fail if C{deferred} does not errback with one of C{expectedFailures}.
  48. Returns the original Deferred with callbacks added. You will need
  49. to return this Deferred from your test case.
  50. """
  51. def _cb(ignore):
  52. raise self.failureException(
  53. f"did not catch an error, instead got {ignore!r}"
  54. )
  55. def _eb(failure):
  56. if failure.check(*expectedFailures):
  57. return failure.value
  58. else:
  59. output = "\nExpected: {!r}\nGot:\n{}".format(
  60. expectedFailures, str(failure)
  61. )
  62. raise self.failureException(output)
  63. return deferred.addCallbacks(_cb, _eb)
  64. failUnlessFailure = assertFailure
  65. def _run(self, methodName, result):
  66. from twisted.internet import reactor
  67. timeout = self.getTimeout()
  68. def onTimeout(d):
  69. e = defer.TimeoutError(
  70. f"{self!r} ({methodName}) still running at {timeout} secs"
  71. )
  72. f = failure.Failure(e)
  73. # try to errback the deferred that the test returns (for no gorram
  74. # reason) (see issue1005 and test_errorPropagation in
  75. # test_deferred)
  76. try:
  77. d.errback(f)
  78. except defer.AlreadyCalledError:
  79. # if the deferred has been called already but the *back chain
  80. # is still unfinished, crash the reactor and report timeout
  81. # error ourself.
  82. reactor.crash()
  83. self._timedOut = True # see self._wait
  84. todo = self.getTodo()
  85. if todo is not None and todo.expected(f):
  86. result.addExpectedFailure(self, f, todo)
  87. else:
  88. result.addError(self, f)
  89. onTimeout = utils.suppressWarnings(
  90. onTimeout, util.suppress(category=DeprecationWarning)
  91. )
  92. method = getattr(self, methodName)
  93. if inspect.isgeneratorfunction(method):
  94. exc = TypeError(
  95. "{!r} is a generator function and therefore will never run".format(
  96. method
  97. )
  98. )
  99. return defer.fail(exc)
  100. d = defer.maybeDeferred(
  101. utils.runWithWarningsSuppressed, self._getSuppress(), method
  102. )
  103. call = reactor.callLater(timeout, onTimeout, d)
  104. d.addBoth(lambda x: call.active() and call.cancel() or x)
  105. return d
  106. def __call__(self, *args, **kwargs):
  107. return self.run(*args, **kwargs)
  108. def deferSetUp(self, ignored, result):
  109. d = self._run("setUp", result)
  110. d.addCallbacks(
  111. self.deferTestMethod,
  112. self._ebDeferSetUp,
  113. callbackArgs=(result,),
  114. errbackArgs=(result,),
  115. )
  116. return d
  117. def _ebDeferSetUp(self, failure, result):
  118. if failure.check(SkipTest):
  119. result.addSkip(self, self._getSkipReason(self.setUp, failure.value))
  120. else:
  121. result.addError(self, failure)
  122. if failure.check(KeyboardInterrupt):
  123. result.stop()
  124. return self.deferRunCleanups(None, result)
  125. def deferTestMethod(self, ignored, result):
  126. d = self._run(self._testMethodName, result)
  127. d.addCallbacks(
  128. self._cbDeferTestMethod,
  129. self._ebDeferTestMethod,
  130. callbackArgs=(result,),
  131. errbackArgs=(result,),
  132. )
  133. d.addBoth(self.deferRunCleanups, result)
  134. d.addBoth(self.deferTearDown, result)
  135. return d
  136. def _cbDeferTestMethod(self, ignored, result):
  137. if self.getTodo() is not None:
  138. result.addUnexpectedSuccess(self, self.getTodo())
  139. else:
  140. self._passed = True
  141. return ignored
  142. def _ebDeferTestMethod(self, f, result):
  143. todo = self.getTodo()
  144. if todo is not None and todo.expected(f):
  145. result.addExpectedFailure(self, f, todo)
  146. elif f.check(self.failureException, FailTest):
  147. result.addFailure(self, f)
  148. elif f.check(KeyboardInterrupt):
  149. result.addError(self, f)
  150. result.stop()
  151. elif f.check(SkipTest):
  152. result.addSkip(
  153. self, self._getSkipReason(getattr(self, self._testMethodName), f.value)
  154. )
  155. else:
  156. result.addError(self, f)
  157. def deferTearDown(self, ignored, result):
  158. d = self._run("tearDown", result)
  159. d.addErrback(self._ebDeferTearDown, result)
  160. return d
  161. def _ebDeferTearDown(self, failure, result):
  162. result.addError(self, failure)
  163. if failure.check(KeyboardInterrupt):
  164. result.stop()
  165. self._passed = False
  166. @defer.inlineCallbacks
  167. def deferRunCleanups(self, ignored, result):
  168. """
  169. Run any scheduled cleanups and report errors (if any) to the result.
  170. object.
  171. """
  172. failures = []
  173. while len(self._cleanups) > 0:
  174. func, args, kwargs = self._cleanups.pop()
  175. try:
  176. yield func(*args, **kwargs)
  177. except Exception:
  178. failures.append(failure.Failure())
  179. for f in failures:
  180. result.addError(self, f)
  181. self._passed = False
  182. def _cleanUp(self, result):
  183. try:
  184. clean = util._Janitor(self, result).postCaseCleanup()
  185. if not clean:
  186. self._passed = False
  187. except BaseException:
  188. result.addError(self, failure.Failure())
  189. self._passed = False
  190. for error in self._observer.getErrors():
  191. result.addError(self, error)
  192. self._passed = False
  193. self.flushLoggedErrors()
  194. self._removeObserver()
  195. if self._passed:
  196. result.addSuccess(self)
  197. def _classCleanUp(self, result):
  198. try:
  199. util._Janitor(self, result).postClassCleanup()
  200. except BaseException:
  201. result.addError(self, failure.Failure())
  202. def _makeReactorMethod(self, name):
  203. """
  204. Create a method which wraps the reactor method C{name}. The new
  205. method issues a deprecation warning and calls the original.
  206. """
  207. def _(*a, **kw):
  208. warnings.warn(
  209. "reactor.%s cannot be used inside unit tests. "
  210. "In the future, using %s will fail the test and may "
  211. "crash or hang the test run." % (name, name),
  212. stacklevel=2,
  213. category=DeprecationWarning,
  214. )
  215. return self._reactorMethods[name](*a, **kw)
  216. return _
  217. def _deprecateReactor(self, reactor):
  218. """
  219. Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
  220. each method is wrapped in a function that issues a deprecation
  221. warning, then calls the original.
  222. @param reactor: The Twisted reactor.
  223. """
  224. self._reactorMethods = {}
  225. for name in ["crash", "iterate", "stop"]:
  226. self._reactorMethods[name] = getattr(reactor, name)
  227. setattr(reactor, name, self._makeReactorMethod(name))
  228. def _undeprecateReactor(self, reactor):
  229. """
  230. Restore the deprecated reactor methods. Undoes what
  231. L{_deprecateReactor} did.
  232. @param reactor: The Twisted reactor.
  233. """
  234. for name, method in self._reactorMethods.items():
  235. setattr(reactor, name, method)
  236. self._reactorMethods = {}
  237. def _runFixturesAndTest(self, result):
  238. """
  239. Really run C{setUp}, the test method, and C{tearDown}. Any of these may
  240. return L{defer.Deferred}s. After they complete, do some reactor cleanup.
  241. @param result: A L{TestResult} object.
  242. """
  243. from twisted.internet import reactor
  244. self._deprecateReactor(reactor)
  245. self._timedOut = False
  246. try:
  247. d = self.deferSetUp(None, result)
  248. try:
  249. self._wait(d)
  250. finally:
  251. self._cleanUp(result)
  252. self._classCleanUp(result)
  253. finally:
  254. self._undeprecateReactor(reactor)
  255. # f should be a positional only argument but that is a breaking change
  256. # see https://github.com/twisted/twisted/issues/11967
  257. def addCleanup( # type: ignore[override]
  258. self, f: Callable[_P, object], *args: _P.args, **kwargs: _P.kwargs
  259. ) -> None:
  260. """
  261. Extend the base cleanup feature with support for cleanup functions which
  262. return Deferreds.
  263. If the function C{f} returns a Deferred, C{TestCase} will wait until the
  264. Deferred has fired before proceeding to the next function.
  265. """
  266. return super().addCleanup(f, *args, **kwargs)
  267. def getSuppress(self):
  268. return self._getSuppress()
  269. def getTimeout(self):
  270. """
  271. Returns the timeout value set on this test. Checks on the instance
  272. first, then the class, then the module, then packages. As soon as it
  273. finds something with a C{timeout} attribute, returns that. Returns
  274. L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
  275. L{TestCase} docstring for more details.
  276. """
  277. timeout = util.acquireAttribute(
  278. self._parents, "timeout", util.DEFAULT_TIMEOUT_DURATION
  279. )
  280. try:
  281. return float(timeout)
  282. except (ValueError, TypeError):
  283. # XXX -- this is here because sometimes people will have methods
  284. # called 'timeout', or set timeout to 'orange', or something
  285. # Particularly, test_news.NewsTestCase and ReactorCoreTestCase
  286. # both do this.
  287. warnings.warn(
  288. "'timeout' attribute needs to be a number.", category=DeprecationWarning
  289. )
  290. return util.DEFAULT_TIMEOUT_DURATION
  291. def _wait(self, d, running=_wait_is_running):
  292. """Take a Deferred that only ever callbacks. Block until it happens."""
  293. if running:
  294. raise RuntimeError("_wait is not reentrant")
  295. from twisted.internet import reactor
  296. results = []
  297. def append(any):
  298. if results is not None:
  299. results.append(any)
  300. def crash(ign):
  301. if results is not None:
  302. reactor.crash()
  303. crash = utils.suppressWarnings(
  304. crash,
  305. util.suppress(
  306. message=r"reactor\.crash cannot be used.*", category=DeprecationWarning
  307. ),
  308. )
  309. def stop():
  310. reactor.crash()
  311. stop = utils.suppressWarnings(
  312. stop,
  313. util.suppress(
  314. message=r"reactor\.crash cannot be used.*", category=DeprecationWarning
  315. ),
  316. )
  317. running.append(None)
  318. try:
  319. d.addBoth(append)
  320. if results:
  321. # d might have already been fired, in which case append is
  322. # called synchronously. Avoid any reactor stuff.
  323. return
  324. d.addBoth(crash)
  325. reactor.stop = stop
  326. try:
  327. reactor.run()
  328. finally:
  329. del reactor.stop
  330. # If the reactor was crashed elsewhere due to a timeout, hopefully
  331. # that crasher also reported an error. Just return.
  332. # _timedOut is most likely to be set when d has fired but hasn't
  333. # completed its callback chain (see self._run)
  334. if results or self._timedOut: # defined in run() and _run()
  335. return
  336. # If the timeout didn't happen, and we didn't get a result or
  337. # a failure, then the user probably aborted the test, so let's
  338. # just raise KeyboardInterrupt.
  339. # FIXME: imagine this:
  340. # web/test/test_webclient.py:
  341. # exc = self.assertRaises(error.Error, wait, method(url))
  342. #
  343. # wait() will raise KeyboardInterrupt, and assertRaises will
  344. # swallow it. Therefore, wait() raising KeyboardInterrupt is
  345. # insufficient to stop trial. A suggested solution is to have
  346. # this code set a "stop trial" flag, or otherwise notify trial
  347. # that it should really try to stop as soon as possible.
  348. raise KeyboardInterrupt()
  349. finally:
  350. results = None
  351. running.pop()