12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469 |
- # -*- test-case-name: twisted.trial.test -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Things likely to be used by writers of unit tests.
- Maintainer: Jonathan Lange
- """
- import inspect
- import os
- import sys
- import tempfile
- import types
- import unittest as pyunit
- import warnings
- from dis import findlinestarts as _findlinestarts
- from typing import (
- Any,
- Callable,
- Coroutine,
- Generator,
- Iterable,
- List,
- NoReturn,
- Optional,
- Tuple,
- Type,
- TypeVar,
- Union,
- )
- # Python 2.7 and higher has skip support built-in
- from unittest import SkipTest
- from attrs import frozen
- from typing_extensions import ParamSpec
- from twisted.internet.defer import Deferred, ensureDeferred
- from twisted.python import failure, log, monkey
- from twisted.python.deprecate import (
- DEPRECATION_WARNING_FORMAT,
- getDeprecationWarningString,
- getVersionString,
- warnAboutFunction,
- )
- from twisted.python.reflect import fullyQualifiedName
- from twisted.python.util import runWithWarningsSuppressed
- from twisted.trial import itrial, util
- _P = ParamSpec("_P")
- T = TypeVar("T")
- class FailTest(AssertionError):
- """
- Raised to indicate the current test has failed to pass.
- """
- @frozen
- class Todo:
- """
- Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo'
- are reported differently in Trial L{TestResult}s. If todo'd tests fail,
- they do not fail the suite and the errors are reported in a separate
- category. If todo'd tests succeed, Trial L{TestResult}s will report an
- unexpected success.
- @ivar reason: A string explaining why the test is marked 'todo'
- @ivar errors: An iterable of exception types that the test is expected to
- raise. If one of these errors is raised by the test, it will be
- trapped. Raising any other kind of error will fail the test. If
- L{None} then all errors will be trapped.
- """
- reason: str
- errors: Optional[Iterable[Type[BaseException]]] = None
- def __repr__(self) -> str:
- return f"<Todo reason={self.reason!r} errors={self.errors!r}>"
- def expected(self, failure):
- """
- @param failure: A L{twisted.python.failure.Failure}.
- @return: C{True} if C{failure} is expected, C{False} otherwise.
- """
- if self.errors is None:
- return True
- for error in self.errors:
- if failure.check(error):
- return True
- return False
- def makeTodo(
- value: Union[
- str, Tuple[Union[Type[BaseException], Iterable[Type[BaseException]]], str]
- ]
- ) -> Todo:
- """
- Return a L{Todo} object built from C{value}.
- If C{value} is a string, return a Todo that expects any exception with
- C{value} as a reason. If C{value} is a tuple, the second element is used
- as the reason and the first element as the excepted error(s).
- @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
- is either a single exception class or an iterable of exception classes.
- @return: A L{Todo} object.
- """
- if isinstance(value, str):
- return Todo(reason=value)
- if isinstance(value, tuple):
- errors, reason = value
- if isinstance(errors, type):
- iterableErrors: Iterable[Type[BaseException]] = [errors]
- else:
- iterableErrors = errors
- return Todo(reason=reason, errors=iterableErrors)
- class _Warning:
- """
- A L{_Warning} instance represents one warning emitted through the Python
- warning system (L{warnings}). This is used to insulate callers of
- L{_collectWarnings} from changes to the Python warnings system which might
- otherwise require changes to the warning objects that function passes to
- the observer object it accepts.
- @ivar message: The string which was passed as the message parameter to
- L{warnings.warn}.
- @ivar category: The L{Warning} subclass which was passed as the category
- parameter to L{warnings.warn}.
- @ivar filename: The name of the file containing the definition of the code
- object which was C{stacklevel} frames above the call to
- L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
- parameter passed to L{warnings.warn}.
- @ivar lineno: The source line associated with the active instruction of the
- code object object which was C{stacklevel} frames above the call to
- L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
- parameter passed to L{warnings.warn}.
- """
- def __init__(self, message, category, filename, lineno):
- self.message = message
- self.category = category
- self.filename = filename
- self.lineno = lineno
- def _setWarningRegistryToNone(modules):
- """
- Disable the per-module cache for every module found in C{modules}, typically
- C{sys.modules}.
- @param modules: Dictionary of modules, typically sys.module dict
- """
- for v in list(modules.values()):
- if v is not None:
- try:
- v.__warningregistry__ = None
- except BaseException:
- # Don't specify a particular exception type to handle in case
- # some wacky object raises some wacky exception in response to
- # the setattr attempt.
- pass
- def _collectWarnings(observeWarning, f, *args, **kwargs):
- """
- Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments
- and collect all warnings which are emitted as a result in a list.
- @param observeWarning: A callable which will be invoked with a L{_Warning}
- instance each time a warning is emitted.
- @return: The return value of C{f(*args, **kwargs)}.
- """
- def showWarning(message, category, filename, lineno, file=None, line=None):
- assert isinstance(message, Warning)
- observeWarning(_Warning(str(message), category, filename, lineno))
- # Disable the per-module cache for every module otherwise if the warning
- # which the caller is expecting us to collect was already emitted it won't
- # be re-emitted by the call to f which happens below.
- _setWarningRegistryToNone(sys.modules)
- origFilters = warnings.filters[:]
- origShow = warnings.showwarning
- warnings.simplefilter("always")
- try:
- warnings.showwarning = showWarning
- result = f(*args, **kwargs)
- finally:
- warnings.filters[:] = origFilters
- warnings.showwarning = origShow
- return result
- class UnsupportedTrialFeature(Exception):
- """A feature of twisted.trial was used that pyunit cannot support."""
- class PyUnitResultAdapter:
- """
- Wrap a C{TestResult} from the standard library's C{unittest} so that it
- supports the extended result types from Trial, and also supports
- L{twisted.python.failure.Failure}s being passed to L{addError} and
- L{addFailure}.
- """
- def __init__(self, original):
- """
- @param original: A C{TestResult} instance from C{unittest}.
- """
- self.original = original
- def _exc_info(self, err):
- return util.excInfoOrFailureToExcInfo(err)
- def startTest(self, method):
- self.original.startTest(method)
- def stopTest(self, method):
- self.original.stopTest(method)
- def addFailure(self, test, fail):
- self.original.addFailure(test, self._exc_info(fail))
- def addError(self, test, error):
- self.original.addError(test, self._exc_info(error))
- def _unsupported(self, test, feature, info):
- self.original.addFailure(
- test,
- (UnsupportedTrialFeature, UnsupportedTrialFeature(feature, info), None),
- )
- def addSkip(self, test, reason):
- """
- Report the skip as a failure.
- """
- self.original.addSkip(test, reason)
- def addUnexpectedSuccess(self, test, todo=None):
- """
- Report the unexpected success as a failure.
- """
- self._unsupported(test, "unexpected success", todo)
- def addExpectedFailure(self, test, error):
- """
- Report the expected failure (i.e. todo) as a failure.
- """
- self._unsupported(test, "expected failure", error)
- def addSuccess(self, test):
- self.original.addSuccess(test)
- def upDownError(self, method, error, warn, printStatus):
- pass
- class _AssertRaisesContext:
- """
- A helper for implementing C{assertRaises}. This is a context manager and a
- helper method to support the non-context manager version of
- C{assertRaises}.
- @ivar _testCase: See C{testCase} parameter of C{__init__}
- @ivar _expected: See C{expected} parameter of C{__init__}
- @ivar _returnValue: The value returned by the callable being tested (only
- when not being used as a context manager).
- @ivar _expectedName: A short string describing the expected exception
- (usually the name of the exception class).
- @ivar exception: The exception which was raised by the function being
- tested (if it raised one).
- """
- def __init__(self, testCase, expected):
- """
- @param testCase: The L{TestCase} instance which is used to raise a
- test-failing exception when that is necessary.
- @param expected: The exception type expected to be raised.
- """
- self._testCase = testCase
- self._expected = expected
- self._returnValue = None
- try:
- self._expectedName = self._expected.__name__
- except AttributeError:
- self._expectedName = str(self._expected)
- def _handle(self, obj):
- """
- Call the given object using this object as a context manager.
- @param obj: The object to call and which is expected to raise some
- exception.
- @type obj: L{object}
- @return: Whatever exception is raised by C{obj()}.
- @rtype: L{BaseException}
- """
- with self as context:
- self._returnValue = obj()
- return context.exception
- def __enter__(self):
- return self
- def __exit__(self, exceptionType, exceptionValue, traceback):
- """
- Check exit exception against expected exception.
- """
- # No exception raised.
- if exceptionType is None:
- self._testCase.fail(
- "{} not raised ({} returned)".format(
- self._expectedName, self._returnValue
- )
- )
- if not isinstance(exceptionValue, exceptionType):
- # Support some Python 2.6 ridiculousness. Exceptions raised using
- # the C API appear here as the arguments you might pass to the
- # exception class to create an exception instance. So... do that
- # to turn them into the instances.
- if isinstance(exceptionValue, tuple):
- exceptionValue = exceptionType(*exceptionValue)
- else:
- exceptionValue = exceptionType(exceptionValue)
- # Store exception so that it can be access from context.
- self.exception = exceptionValue
- # Wrong exception raised.
- if not issubclass(exceptionType, self._expected):
- reason = failure.Failure(exceptionValue, exceptionType, traceback)
- self._testCase.fail(
- "{} raised instead of {}:\n {}".format(
- fullyQualifiedName(exceptionType),
- self._expectedName,
- reason.getTraceback(),
- ),
- )
- # All good.
- return True
- class _Assertions(pyunit.TestCase):
- """
- Replaces many of the built-in TestCase assertions. In general, these
- assertions provide better error messages and are easier to use in
- callbacks.
- """
- def fail(self, msg: Optional[object] = None) -> NoReturn:
- """
- Absolutely fail the test. Do not pass go, do not collect $200.
- @param msg: the message that will be displayed as the reason for the
- failure
- """
- raise self.failureException(msg)
- def assertFalse(self, condition, msg=None):
- """
- Fail the test if C{condition} evaluates to True.
- @param condition: any object that defines __nonzero__
- """
- super().assertFalse(condition, msg)
- return condition
- assertNot = assertFalse
- failUnlessFalse = assertFalse
- failIf = assertFalse
- def assertTrue(self, condition, msg=None):
- """
- Fail the test if C{condition} evaluates to False.
- @param condition: any object that defines __nonzero__
- """
- super().assertTrue(condition, msg)
- return condition
- assert_ = assertTrue
- failUnlessTrue = assertTrue
- failUnless = assertTrue
- def assertRaises(self, exception, f=None, *args, **kwargs):
- """
- Fail the test unless calling the function C{f} with the given
- C{args} and C{kwargs} raises C{exception}. The failure will report
- the traceback and call stack of the unexpected exception.
- @param exception: exception type that is to be expected
- @param f: the function to call
- @return: If C{f} is L{None}, a context manager which will make an
- assertion about the exception raised from the suite it manages. If
- C{f} is not L{None}, the exception raised by C{f}.
- @raise self.failureException: Raised if the function call does
- not raise an exception or if it raises an exception of a
- different type.
- """
- context = _AssertRaisesContext(self, exception)
- if f is None:
- return context
- return context._handle(lambda: f(*args, **kwargs))
- # unittest.TestCase.assertRaises() is defined with 4 arguments
- # but we define it with 5 arguments. So we need to tell mypy
- # to ignore the following assignment to failUnlessRaises
- failUnlessRaises = assertRaises # type: ignore[assignment]
- def assertEqual(self, first, second, msg=None):
- """
- Fail the test if C{first} and C{second} are not equal.
- @param msg: A string describing the failure that's included in the
- exception.
- """
- super().assertEqual(first, second, msg)
- return first
- failUnlessEqual = assertEqual
- failUnlessEquals = assertEqual
- assertEquals = assertEqual
- def assertIs(self, first, second, msg=None):
- """
- Fail the test if C{first} is not C{second}. This is an
- obect-identity-equality test, not an object equality
- (i.e. C{__eq__}) test.
- @param msg: if msg is None, then the failure message will be
- '%r is not %r' % (first, second)
- """
- if first is not second:
- raise self.failureException(msg or f"{first!r} is not {second!r}")
- return first
- failUnlessIdentical = assertIs
- assertIdentical = assertIs
- def assertIsNot(self, first, second, msg=None):
- """
- Fail the test if C{first} is C{second}. This is an
- obect-identity-equality test, not an object equality
- (i.e. C{__eq__}) test.
- @param msg: if msg is None, then the failure message will be
- '%r is %r' % (first, second)
- """
- if first is second:
- raise self.failureException(msg or f"{first!r} is {second!r}")
- return first
- failIfIdentical = assertIsNot
- assertNotIdentical = assertIsNot
- def assertNotEqual(self, first, second, msg=None):
- """
- Fail the test if C{first} == C{second}.
- @param msg: if msg is None, then the failure message will be
- '%r == %r' % (first, second)
- """
- if not first != second:
- raise self.failureException(msg or f"{first!r} == {second!r}")
- return first
- assertNotEquals = assertNotEqual
- failIfEquals = assertNotEqual
- failIfEqual = assertNotEqual
- def assertIn(self, containee, container, msg=None):
- """
- Fail the test if C{containee} is not found in C{container}.
- @param containee: the value that should be in C{container}
- @param container: a sequence type, or in the case of a mapping type,
- will follow semantics of 'if key in dict.keys()'
- @param msg: if msg is None, then the failure message will be
- '%r not in %r' % (first, second)
- """
- if containee not in container:
- raise self.failureException(msg or f"{containee!r} not in {container!r}")
- return containee
- failUnlessIn = assertIn
- def assertNotIn(self, containee, container, msg=None):
- """
- Fail the test if C{containee} is found in C{container}.
- @param containee: the value that should not be in C{container}
- @param container: a sequence type, or in the case of a mapping type,
- will follow semantics of 'if key in dict.keys()'
- @param msg: if msg is None, then the failure message will be
- '%r in %r' % (first, second)
- """
- if containee in container:
- raise self.failureException(msg or f"{containee!r} in {container!r}")
- return containee
- failIfIn = assertNotIn
- def assertNotAlmostEqual(self, first, second, places=7, msg=None, delta=None):
- """
- Fail if the two objects are equal as determined by their
- difference rounded to the given number of decimal places
- (default 7) and comparing to zero.
- @note: decimal places (from zero) is usually not the same
- as significant digits (measured from the most
- significant digit).
- @note: included for compatibility with PyUnit test cases
- """
- if round(second - first, places) == 0:
- raise self.failureException(
- msg or f"{first!r} == {second!r} within {places!r} places"
- )
- return first
- assertNotAlmostEquals = assertNotAlmostEqual # type:ignore[assignment]
- failIfAlmostEqual = assertNotAlmostEqual # type:ignore[assignment]
- failIfAlmostEquals = assertNotAlmostEqual
- def assertAlmostEqual(self, first, second, places=7, msg=None, delta=None):
- """
- Fail if the two objects are unequal as determined by their
- difference rounded to the given number of decimal places
- (default 7) and comparing to zero.
- @note: decimal places (from zero) is usually not the same
- as significant digits (measured from the most
- significant digit).
- @note: included for compatibility with PyUnit test cases
- """
- if round(second - first, places) != 0:
- raise self.failureException(
- msg or f"{first!r} != {second!r} within {places!r} places"
- )
- return first
- assertAlmostEquals = assertAlmostEqual # type:ignore[assignment]
- failUnlessAlmostEqual = assertAlmostEqual # type:ignore[assignment]
- def assertApproximates(self, first, second, tolerance, msg=None):
- """
- Fail if C{first} - C{second} > C{tolerance}
- @param msg: if msg is None, then the failure message will be
- '%r ~== %r' % (first, second)
- """
- if abs(first - second) > tolerance:
- raise self.failureException(msg or f"{first} ~== {second}")
- return first
- failUnlessApproximates = assertApproximates
- def assertSubstring(self, substring, astring, msg=None):
- """
- Fail if C{substring} does not exist within C{astring}.
- """
- return self.failUnlessIn(substring, astring, msg)
- failUnlessSubstring = assertSubstring
- def assertNotSubstring(self, substring, astring, msg=None):
- """
- Fail if C{astring} contains C{substring}.
- """
- return self.failIfIn(substring, astring, msg)
- failIfSubstring = assertNotSubstring
- def assertWarns(self, category, message, filename, f, *args, **kwargs):
- """
- Fail if the given function doesn't generate the specified warning when
- called. It calls the function, checks the warning, and forwards the
- result of the function if everything is fine.
- @param category: the category of the warning to check.
- @param message: the output message of the warning to check.
- @param filename: the filename where the warning should come from.
- @param f: the function which is supposed to generate the warning.
- @type f: any callable.
- @param args: the arguments to C{f}.
- @param kwargs: the keywords arguments to C{f}.
- @return: the result of the original function C{f}.
- """
- warningsShown = []
- result = _collectWarnings(warningsShown.append, f, *args, **kwargs)
- if not warningsShown:
- self.fail("No warnings emitted")
- first = warningsShown[0]
- for other in warningsShown[1:]:
- if (other.message, other.category) != (first.message, first.category):
- self.fail("Can't handle different warnings")
- self.assertEqual(first.message, message)
- self.assertIdentical(first.category, category)
- # Use starts with because of .pyc/.pyo issues.
- self.assertTrue(
- filename.startswith(first.filename),
- f"Warning in {first.filename!r}, expected {filename!r}",
- )
- # It would be nice to be able to check the line number as well, but
- # different configurations actually end up reporting different line
- # numbers (generally the variation is only 1 line, but that's enough
- # to fail the test erroneously...).
- # self.assertEqual(lineno, xxx)
- return result
- failUnlessWarns = assertWarns
- def assertIsInstance(self, instance, classOrTuple, message=None):
- """
- Fail if C{instance} is not an instance of the given class or of
- one of the given classes.
- @param instance: the object to test the type (first argument of the
- C{isinstance} call).
- @type instance: any.
- @param classOrTuple: the class or classes to test against (second
- argument of the C{isinstance} call).
- @type classOrTuple: class, type, or tuple.
- @param message: Custom text to include in the exception text if the
- assertion fails.
- """
- if not isinstance(instance, classOrTuple):
- if message is None:
- suffix = ""
- else:
- suffix = ": " + message
- self.fail(f"{instance!r} is not an instance of {classOrTuple}{suffix}")
- failUnlessIsInstance = assertIsInstance
- def assertNotIsInstance(self, instance, classOrTuple):
- """
- Fail if C{instance} is an instance of the given class or of one of the
- given classes.
- @param instance: the object to test the type (first argument of the
- C{isinstance} call).
- @type instance: any.
- @param classOrTuple: the class or classes to test against (second
- argument of the C{isinstance} call).
- @type classOrTuple: class, type, or tuple.
- """
- if isinstance(instance, classOrTuple):
- self.fail(f"{instance!r} is an instance of {classOrTuple}")
- failIfIsInstance = assertNotIsInstance
- def successResultOf(
- self,
- deferred: Union[
- Coroutine[Deferred[T], Any, T],
- Generator[Deferred[T], Any, T],
- Deferred[T],
- ],
- ) -> T:
- """
- Return the current success result of C{deferred} or raise
- C{self.failureException}.
- @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} or
- I{coroutine} which has a success result.
- For a L{Deferred<twisted.internet.defer.Deferred>} this means
- L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or
- L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
- been called on it and it has reached the end of its callback chain
- and the last callback or errback returned a
- non-L{failure.Failure}.
- For a I{coroutine} this means all awaited values have a success
- result.
- @raise SynchronousTestCase.failureException: If the
- L{Deferred<twisted.internet.defer.Deferred>} has no result or has a
- failure result.
- @return: The result of C{deferred}.
- """
- deferred = ensureDeferred(deferred)
- results: List[Union[T, failure.Failure]] = []
- deferred.addBoth(results.append)
- if not results:
- self.fail(
- "Success result expected on {!r}, found no result instead".format(
- deferred
- )
- )
- result = results[0]
- if isinstance(result, failure.Failure):
- self.fail(
- "Success result expected on {!r}, "
- "found failure result instead:\n{}".format(
- deferred, result.getTraceback()
- )
- )
- return result
- def failureResultOf(self, deferred, *expectedExceptionTypes):
- """
- Return the current failure result of C{deferred} or raise
- C{self.failureException}.
- @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} which
- has a failure result. This means
- L{Deferred.callback<twisted.internet.defer.Deferred.callback>} or
- L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
- been called on it and it has reached the end of its callback chain
- and the last callback or errback raised an exception or returned a
- L{failure.Failure}.
- @type deferred: L{Deferred<twisted.internet.defer.Deferred>}
- @param expectedExceptionTypes: Exception types to expect - if
- provided, and the exception wrapped by the failure result is
- not one of the types provided, then this test will fail.
- @raise SynchronousTestCase.failureException: If the
- L{Deferred<twisted.internet.defer.Deferred>} has no result, has a
- success result, or has an unexpected failure result.
- @return: The failure result of C{deferred}.
- @rtype: L{failure.Failure}
- """
- deferred = ensureDeferred(deferred)
- result = []
- deferred.addBoth(result.append)
- if not result:
- self.fail(
- "Failure result expected on {!r}, found no result instead".format(
- deferred
- )
- )
- result = result[0]
- if not isinstance(result, failure.Failure):
- self.fail(
- "Failure result expected on {!r}, "
- "found success result ({!r}) instead".format(deferred, result)
- )
- if expectedExceptionTypes and not result.check(*expectedExceptionTypes):
- expectedString = " or ".join(
- [".".join((t.__module__, t.__name__)) for t in expectedExceptionTypes]
- )
- self.fail(
- "Failure of type ({}) expected on {!r}, "
- "found type {!r} instead: {}".format(
- expectedString, deferred, result.type, result.getTraceback()
- )
- )
- return result
- def assertNoResult(self, deferred):
- """
- Assert that C{deferred} does not have a result at this point.
- If the assertion succeeds, then the result of C{deferred} is left
- unchanged. Otherwise, any L{failure.Failure} result is swallowed.
- @param deferred: A L{Deferred<twisted.internet.defer.Deferred>} without
- a result. This means that neither
- L{Deferred.callback<twisted.internet.defer.Deferred.callback>} nor
- L{Deferred.errback<twisted.internet.defer.Deferred.errback>} has
- been called, or that the
- L{Deferred<twisted.internet.defer.Deferred>} is waiting on another
- L{Deferred<twisted.internet.defer.Deferred>} for a result.
- @type deferred: L{Deferred<twisted.internet.defer.Deferred>}
- @raise SynchronousTestCase.failureException: If the
- L{Deferred<twisted.internet.defer.Deferred>} has a result.
- """
- deferred = ensureDeferred(deferred)
- result = []
- def cb(res):
- result.append(res)
- return res
- deferred.addBoth(cb)
- if result:
- # If there is already a failure, the self.fail below will
- # report it, so swallow it in the deferred
- deferred.addErrback(lambda _: None)
- self.fail(
- "No result expected on {!r}, found {!r} instead".format(
- deferred, result[0]
- )
- )
- class _LogObserver:
- """
- Observes the Twisted logs and catches any errors.
- @ivar _errors: A C{list} of L{Failure} instances which were received as
- error events from the Twisted logging system.
- @ivar _added: A C{int} giving the number of times C{_add} has been called
- less the number of times C{_remove} has been called; used to only add
- this observer to the Twisted logging since once, regardless of the
- number of calls to the add method.
- @ivar _ignored: A C{list} of exception types which will not be recorded.
- """
- def __init__(self):
- self._errors = []
- self._added = 0
- self._ignored = []
- def _add(self):
- if self._added == 0:
- log.addObserver(self.gotEvent)
- self._added += 1
- def _remove(self):
- self._added -= 1
- if self._added == 0:
- log.removeObserver(self.gotEvent)
- def _ignoreErrors(self, *errorTypes):
- """
- Do not store any errors with any of the given types.
- """
- self._ignored.extend(errorTypes)
- def _clearIgnores(self):
- """
- Stop ignoring any errors we might currently be ignoring.
- """
- self._ignored = []
- def flushErrors(self, *errorTypes):
- """
- Flush errors from the list of caught errors. If no arguments are
- specified, remove all errors. If arguments are specified, only remove
- errors of those types from the stored list.
- """
- if errorTypes:
- flushed = []
- remainder = []
- for f in self._errors:
- if f.check(*errorTypes):
- flushed.append(f)
- else:
- remainder.append(f)
- self._errors = remainder
- else:
- flushed = self._errors
- self._errors = []
- return flushed
- def getErrors(self):
- """
- Return a list of errors caught by this observer.
- """
- return self._errors
- def gotEvent(self, event):
- """
- The actual observer method. Called whenever a message is logged.
- @param event: A dictionary containing the log message. Actual
- structure undocumented (see source for L{twisted.python.log}).
- """
- if event.get("isError", False) and "failure" in event:
- f = event["failure"]
- if len(self._ignored) == 0 or not f.check(*self._ignored):
- self._errors.append(f)
- _logObserver = _LogObserver()
- class SynchronousTestCase(_Assertions):
- """
- A unit test. The atom of the unit testing universe.
- This class extends C{unittest.TestCase} from the standard library. A number
- of convenient testing helpers are added, including logging and warning
- integration, monkey-patching support, and more.
- To write a unit test, subclass C{SynchronousTestCase} and define a method
- (say, 'test_foo') on the subclass. To run the test, instantiate your
- subclass with the name of the method, and call L{run} on the instance,
- passing a L{TestResult} object.
- The C{trial} script will automatically find any C{SynchronousTestCase}
- subclasses defined in modules beginning with 'test_' and construct test
- cases for all methods beginning with 'test'.
- If an error is logged during the test run, the test will fail with an
- error. See L{log.err}.
- @ivar failureException: An exception class, defaulting to C{FailTest}. If
- the test method raises this exception, it will be reported as a failure,
- rather than an exception. All of the assertion methods raise this if the
- assertion fails.
- @ivar skip: L{None} or a string explaining why this test is to be
- skipped. If defined, the test will not be run. Instead, it will be
- reported to the result object as 'skipped' (if the C{TestResult} supports
- skipping).
- @ivar todo: L{None}, a string or a tuple of C{(errors, reason)} where
- C{errors} is either an exception class or an iterable of exception
- classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more
- information.
- @ivar suppress: L{None} or a list of tuples of C{(args, kwargs)} to be
- passed to C{warnings.filterwarnings}. Use these to suppress warnings
- raised in a test. Useful for testing deprecated code. See also
- L{util.suppress}.
- """
- failureException = FailTest
- def __init__(self, methodName="runTest"):
- super().__init__(methodName)
- self._passed = False
- self._cleanups = []
- self._testMethodName = methodName
- testMethod = getattr(self, methodName)
- self._parents = [testMethod, self, sys.modules.get(self.__class__.__module__)]
- def __eq__(self, other: object) -> bool:
- """
- Override the comparison defined by the base TestCase which considers
- instances of the same class with the same _testMethodName to be
- equal. Since trial puts TestCase instances into a set, that
- definition of comparison makes it impossible to run the same test
- method twice. Most likely, trial should stop using a set to hold
- tests, but until it does, this is necessary on Python 2.6. -exarkun
- """
- if isinstance(other, SynchronousTestCase):
- return self is other
- else:
- return NotImplemented
- def __hash__(self):
- return hash((self.__class__, self._testMethodName))
- def shortDescription(self):
- desc = super().shortDescription()
- if desc is None:
- return self._testMethodName
- return desc
- def getSkip(self) -> Tuple[bool, Optional[str]]:
- """
- Return the skip reason set on this test, if any is set. Checks on the
- instance first, then the class, then the module, then packages. As
- soon as it finds something with a C{skip} attribute, returns that in
- a tuple (L{True}, L{str}).
- If the C{skip} attribute does not exist, look for C{__unittest_skip__}
- and C{__unittest_skip_why__} attributes which are set by the standard
- library L{unittest.skip} function.
- Returns (L{False}, L{None}) if it cannot find anything.
- See L{TestCase} docstring for more details.
- """
- skipReason = util.acquireAttribute(self._parents, "skip", None)
- doSkip = skipReason is not None
- if skipReason is None:
- doSkip = getattr(self, "__unittest_skip__", False)
- if doSkip:
- skipReason = getattr(self, "__unittest_skip_why__", "")
- return (doSkip, skipReason)
- def getTodo(self):
- """
- Return a L{Todo} object if the test is marked todo. Checks on the
- instance first, then the class, then the module, then packages. As
- soon as it finds something with a C{todo} attribute, returns that.
- Returns L{None} if it cannot find anything. See L{TestCase} docstring
- for more details.
- """
- todo = util.acquireAttribute(self._parents, "todo", None)
- if todo is None:
- return None
- return makeTodo(todo)
- def runTest(self):
- """
- If no C{methodName} argument is passed to the constructor, L{run} will
- treat this method as the thing with the actual test inside.
- """
- def run(self, result):
- """
- Run the test case, storing the results in C{result}.
- First runs C{setUp} on self, then runs the test method (defined in the
- constructor), then runs C{tearDown}. As with the standard library
- L{unittest.TestCase}, the return value of these methods is disregarded.
- In particular, returning a L{Deferred<twisted.internet.defer.Deferred>}
- has no special additional consequences.
- @param result: A L{TestResult} object.
- """
- log.msg("--> %s <--" % (self.id()))
- new_result = itrial.IReporter(result, None)
- if new_result is None:
- result = PyUnitResultAdapter(result)
- else:
- result = new_result
- result.startTest(self)
- (doSkip, skipReason) = self.getSkip()
- if doSkip: # don't run test methods that are marked as .skip
- result.addSkip(self, skipReason)
- result.stopTest(self)
- return
- self._passed = False
- self._warnings = []
- self._installObserver()
- # All the code inside _runFixturesAndTest will be run such that warnings
- # emitted by it will be collected and retrievable by flushWarnings.
- _collectWarnings(self._warnings.append, self._runFixturesAndTest, result)
- # Any collected warnings which the test method didn't flush get
- # re-emitted so they'll be logged or show up on stdout or whatever.
- for w in self.flushWarnings():
- try:
- warnings.warn_explicit(**w)
- except BaseException:
- result.addError(self, failure.Failure())
- result.stopTest(self)
- # f should be a positional only argument but that is a breaking change
- # see https://github.com/twisted/twisted/issues/11967
- def addCleanup( # type: ignore[override]
- self, f: Callable[_P, object], *args: _P.args, **kwargs: _P.kwargs
- ) -> None:
- """
- Add the given function to a list of functions to be called after the
- test has run, but before C{tearDown}.
- Functions will be run in reverse order of being added. This helps
- ensure that tear down complements set up.
- As with all aspects of L{SynchronousTestCase}, Deferreds are not
- supported in cleanup functions.
- """
- self._cleanups.append((f, args, kwargs))
- def patch(self, obj, attribute, value):
- """
- Monkey patch an object for the duration of the test.
- The monkey patch will be reverted at the end of the test using the
- L{addCleanup} mechanism.
- The L{monkey.MonkeyPatcher} is returned so that users can restore and
- re-apply the monkey patch within their tests.
- @param obj: The object to monkey patch.
- @param attribute: The name of the attribute to change.
- @param value: The value to set the attribute to.
- @return: A L{monkey.MonkeyPatcher} object.
- """
- monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value))
- monkeyPatch.patch()
- self.addCleanup(monkeyPatch.restore)
- return monkeyPatch
- def flushLoggedErrors(self, *errorTypes):
- """
- Remove stored errors received from the log.
- C{TestCase} stores each error logged during the run of the test and
- reports them as errors during the cleanup phase (after C{tearDown}).
- @param errorTypes: If unspecified, flush all errors. Otherwise, only
- flush errors that match the given types.
- @return: A list of failures that have been removed.
- """
- return self._observer.flushErrors(*errorTypes)
- def flushWarnings(self, offendingFunctions=None):
- """
- Remove stored warnings from the list of captured warnings and return
- them.
- @param offendingFunctions: If L{None}, all warnings issued during the
- currently running test will be flushed. Otherwise, only warnings
- which I{point} to a function included in this list will be flushed.
- All warnings include a filename and source line number; if these
- parts of a warning point to a source line which is part of a
- function, then the warning I{points} to that function.
- @type offendingFunctions: L{None} or L{list} of functions or methods.
- @raise ValueError: If C{offendingFunctions} is not L{None} and includes
- an object which is not a L{types.FunctionType} or
- L{types.MethodType} instance.
- @return: A C{list}, each element of which is a C{dict} giving
- information about one warning which was flushed by this call. The
- keys of each C{dict} are:
- - C{'message'}: The string which was passed as the I{message}
- parameter to L{warnings.warn}.
- - C{'category'}: The warning subclass which was passed as the
- I{category} parameter to L{warnings.warn}.
- - C{'filename'}: The name of the file containing the definition
- of the code object which was C{stacklevel} frames above the
- call to L{warnings.warn}, where C{stacklevel} is the value of
- the C{stacklevel} parameter passed to L{warnings.warn}.
- - C{'lineno'}: The source line associated with the active
- instruction of the code object object which was C{stacklevel}
- frames above the call to L{warnings.warn}, where
- C{stacklevel} is the value of the C{stacklevel} parameter
- passed to L{warnings.warn}.
- """
- if offendingFunctions is None:
- toFlush = self._warnings[:]
- self._warnings[:] = []
- else:
- toFlush = []
- for aWarning in self._warnings:
- for aFunction in offendingFunctions:
- if not isinstance(
- aFunction, (types.FunctionType, types.MethodType)
- ):
- raise ValueError(f"{aFunction!r} is not a function or method")
- # inspect.getabsfile(aFunction) sometimes returns a
- # filename which disagrees with the filename the warning
- # system generates. This seems to be because a
- # function's code object doesn't deal with source files
- # being renamed. inspect.getabsfile(module) seems
- # better (or at least agrees with the warning system
- # more often), and does some normalization for us which
- # is desirable. inspect.getmodule() is attractive, but
- # somewhat broken in Python < 2.6. See Python bug 4845.
- aModule = sys.modules[aFunction.__module__]
- filename = inspect.getabsfile(aModule)
- if filename != os.path.normcase(aWarning.filename):
- continue
- # In Python 3.13 line numbers returned by findlinestarts
- # can be None for bytecode that does not map to source
- # lines.
- lineNumbers = [
- lineNumber
- for _, lineNumber in _findlinestarts(aFunction.__code__)
- if lineNumber is not None
- ]
- if not (min(lineNumbers) <= aWarning.lineno <= max(lineNumbers)):
- continue
- # The warning points to this function, flush it and move on
- # to the next warning.
- toFlush.append(aWarning)
- break
- # Remove everything which is being flushed.
- list(map(self._warnings.remove, toFlush))
- return [
- {
- "message": w.message,
- "category": w.category,
- "filename": w.filename,
- "lineno": w.lineno,
- }
- for w in toFlush
- ]
- def getDeprecatedModuleAttribute(self, moduleName, name, version, message=None):
- """
- Retrieve a module attribute which should have been deprecated,
- and assert that we saw the appropriate deprecation warning.
- @type moduleName: C{str}
- @param moduleName: Fully-qualified Python name of the module containing
- the deprecated attribute; if called from the same module as the
- attributes are being deprecated in, using the C{__name__} global can
- be helpful
- @type name: C{str}
- @param name: Attribute name which we expect to be deprecated
- @param version: The first L{version<twisted.python.versions.Version>} that
- the module attribute was deprecated.
- @type message: C{str}
- @param message: (optional) The expected deprecation message for the module attribute
- @return: The given attribute from the named module
- @raise FailTest: if no warnings were emitted on getattr, or if the
- L{DeprecationWarning} emitted did not produce the canonical
- please-use-something-else message that is standard for Twisted
- deprecations according to the given version and replacement.
- @since: Twisted 21.2.0
- """
- fqpn = moduleName + "." + name
- module = sys.modules[moduleName]
- attr = getattr(module, name)
- warningsShown = self.flushWarnings([self.getDeprecatedModuleAttribute])
- if len(warningsShown) == 0:
- self.fail(f"{fqpn} is not deprecated.")
- observedWarning = warningsShown[0]["message"]
- expectedWarning = DEPRECATION_WARNING_FORMAT % {
- "fqpn": fqpn,
- "version": getVersionString(version),
- }
- if message is not None:
- expectedWarning = expectedWarning + ": " + message
- self.assert_(
- observedWarning.startswith(expectedWarning),
- f"Expected {observedWarning!r} to start with {expectedWarning!r}",
- )
- return attr
- def callDeprecated(self, version, f, *args, **kwargs):
- """
- Call a function that should have been deprecated at a specific version
- and in favor of a specific alternative, and assert that it was thusly
- deprecated.
- @param version: A 2-sequence of (since, replacement), where C{since} is
- a the first L{version<incremental.Version>} that C{f}
- should have been deprecated since, and C{replacement} is a suggested
- replacement for the deprecated functionality, as described by
- L{twisted.python.deprecate.deprecated}. If there is no suggested
- replacement, this parameter may also be simply a
- L{version<incremental.Version>} by itself.
- @param f: The deprecated function to call.
- @param args: The arguments to pass to C{f}.
- @param kwargs: The keyword arguments to pass to C{f}.
- @return: Whatever C{f} returns.
- @raise Exception: Whatever C{f} raises. If any exception is
- raised by C{f}, though, no assertions will be made about emitted
- deprecations.
- @raise FailTest: if no warnings were emitted by C{f}, or if the
- L{DeprecationWarning} emitted did not produce the canonical
- please-use-something-else message that is standard for Twisted
- deprecations according to the given version and replacement.
- """
- result = f(*args, **kwargs)
- warningsShown = self.flushWarnings([self.callDeprecated])
- try:
- info = list(version)
- except TypeError:
- since = version
- replacement = None
- else:
- [since, replacement] = info
- if len(warningsShown) == 0:
- self.fail(f"{f!r} is not deprecated.")
- observedWarning = warningsShown[0]["message"]
- expectedWarning = getDeprecationWarningString(f, since, replacement=replacement)
- self.assertEqual(expectedWarning, observedWarning)
- return result
- def mktemp(self):
- """
- Create a new path name which can be used for a new file or directory.
- The result is a relative path that is guaranteed to be unique within the
- current working directory. The parent of the path will exist, but the
- path will not.
- For a temporary directory call os.mkdir on the path. For a temporary
- file just create the file (e.g. by opening the path for writing and then
- closing it).
- @return: The newly created path
- @rtype: C{str}
- """
- MAX_FILENAME = 32 # some platforms limit lengths of filenames
- base = os.path.join(
- self.__class__.__module__[:MAX_FILENAME],
- self.__class__.__name__[:MAX_FILENAME],
- self._testMethodName[:MAX_FILENAME],
- )
- if not os.path.exists(base):
- os.makedirs(base)
- # With 3.11 or older mkdtemp returns a relative path.
- # With newer it is absolute.
- # Here we make sure we always handle a relative path.
- # See https://github.com/python/cpython/issues/51574
- dirname = os.path.relpath(tempfile.mkdtemp("", "", base))
- return os.path.join(dirname, "temp")
- def _getSuppress(self):
- """
- Returns any warning suppressions set for this test. Checks on the
- instance first, then the class, then the module, then packages. As
- soon as it finds something with a C{suppress} attribute, returns that.
- Returns any empty list (i.e. suppress no warnings) if it cannot find
- anything. See L{TestCase} docstring for more details.
- """
- return util.acquireAttribute(self._parents, "suppress", [])
- def _getSkipReason(self, method, skip):
- """
- Return the reason to use for skipping a test method.
- @param method: The method which produced the skip.
- @param skip: A L{unittest.SkipTest} instance raised by C{method}.
- """
- if len(skip.args) > 0:
- return skip.args[0]
- warnAboutFunction(
- method,
- "Do not raise unittest.SkipTest with no arguments! Give a reason "
- "for skipping tests!",
- )
- return skip
- def _run(self, suppress, todo, method, result):
- """
- Run a single method, either a test method or fixture.
- @param suppress: Any warnings to suppress, as defined by the C{suppress}
- attribute on this method, test case, or the module it is defined in.
- @param todo: Any expected failure or failures, as defined by the C{todo}
- attribute on this method, test case, or the module it is defined in.
- @param method: The method to run.
- @param result: The TestResult instance to which to report results.
- @return: C{True} if the method fails and no further method/fixture calls
- should be made, C{False} otherwise.
- """
- if inspect.isgeneratorfunction(method):
- exc = TypeError(
- "{!r} is a generator function and therefore will never run".format(
- method
- )
- )
- result.addError(self, failure.Failure(exc))
- return True
- try:
- runWithWarningsSuppressed(suppress, method)
- except SkipTest as e:
- result.addSkip(self, self._getSkipReason(method, e))
- except BaseException:
- reason = failure.Failure()
- if todo is None or not todo.expected(reason):
- if reason.check(self.failureException):
- addResult = result.addFailure
- else:
- addResult = result.addError
- addResult(self, reason)
- else:
- result.addExpectedFailure(self, reason, todo)
- else:
- return False
- return True
- def _runFixturesAndTest(self, result):
- """
- Run C{setUp}, a test method, test cleanups, and C{tearDown}.
- @param result: The TestResult instance to which to report results.
- """
- suppress = self._getSuppress()
- try:
- if self._run(suppress, None, self.setUp, result):
- return
- todo = self.getTodo()
- method = getattr(self, self._testMethodName)
- failed = self._run(suppress, todo, method, result)
- finally:
- self._runCleanups(result)
- if todo and not failed:
- result.addUnexpectedSuccess(self, todo)
- if self._run(suppress, None, self.tearDown, result):
- failed = True
- for error in self._observer.getErrors():
- result.addError(self, error)
- failed = True
- self._observer.flushErrors()
- self._removeObserver()
- if not (failed or todo):
- result.addSuccess(self)
- def _runCleanups(self, result):
- """
- Synchronously run any cleanups which have been added.
- """
- while len(self._cleanups) > 0:
- f, args, kwargs = self._cleanups.pop()
- try:
- f(*args, **kwargs)
- except BaseException:
- f = failure.Failure()
- result.addError(self, f)
- def _installObserver(self):
- self._observer = _logObserver
- self._observer._add()
- def _removeObserver(self):
- self._observer._remove()
|