123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627 |
- # -*- test-case-name: twisted.trial.test.test_script -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- from __future__ import absolute_import, division, print_function
- import gc
- import inspect
- import os
- import pdb
- import random
- import sys
- import time
- import warnings
- from twisted.internet import defer
- from twisted.application import app
- from twisted.python import usage, reflect, failure
- from twisted.python.filepath import FilePath
- from twisted.python.reflect import namedModule
- from twisted.python.compat import long
- from twisted import plugin
- from twisted.trial import runner, itrial, reporter
- # Yea, this is stupid. Leave it for command-line compatibility for a
- # while, though.
- TBFORMAT_MAP = {
- 'plain': 'default',
- 'default': 'default',
- 'emacs': 'brief',
- 'brief': 'brief',
- 'cgitb': 'verbose',
- 'verbose': 'verbose'
- }
- def _parseLocalVariables(line):
- """
- Accepts a single line in Emacs local variable declaration format and
- returns a dict of all the variables {name: value}.
- Raises ValueError if 'line' is in the wrong format.
- See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
- """
- paren = '-*-'
- start = line.find(paren) + len(paren)
- end = line.rfind(paren)
- if start == -1 or end == -1:
- raise ValueError("%r not a valid local variable declaration" % (line,))
- items = line[start:end].split(';')
- localVars = {}
- for item in items:
- if len(item.strip()) == 0:
- continue
- split = item.split(':')
- if len(split) != 2:
- raise ValueError("%r contains invalid declaration %r"
- % (line, item))
- localVars[split[0].strip()] = split[1].strip()
- return localVars
- def loadLocalVariables(filename):
- """
- Accepts a filename and attempts to load the Emacs variable declarations
- from that file, simulating what Emacs does.
- See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
- """
- with open(filename, "r") as f:
- lines = [f.readline(), f.readline()]
- for line in lines:
- try:
- return _parseLocalVariables(line)
- except ValueError:
- pass
- return {}
- def getTestModules(filename):
- testCaseVar = loadLocalVariables(filename).get('test-case-name', None)
- if testCaseVar is None:
- return []
- return testCaseVar.split(',')
- def isTestFile(filename):
- """
- Returns true if 'filename' looks like a file containing unit tests.
- False otherwise. Doesn't care whether filename exists.
- """
- basename = os.path.basename(filename)
- return (basename.startswith('test_')
- and os.path.splitext(basename)[1] == ('.py'))
- def _reporterAction():
- return usage.CompleteList([p.longOpt for p in
- plugin.getPlugins(itrial.IReporter)])
- def _maybeFindSourceLine(testThing):
- """
- Try to find the source line of the given test thing.
- @param testThing: the test item to attempt to inspect
- @type testThing: an L{TestCase}, test method, or module, though only the
- former two have a chance to succeed
- @rtype: int
- @return: the starting source line, or -1 if one couldn't be found
- """
- # an instance of L{TestCase} -- locate the test it will run
- method = getattr(testThing, "_testMethodName", None)
- if method is not None:
- testThing = getattr(testThing, method)
- # If it's a function, we can get the line number even if the source file no
- # longer exists
- code = getattr(testThing, "__code__", None)
- if code is not None:
- return code.co_firstlineno
- try:
- return inspect.getsourcelines(testThing)[1]
- except (IOError, TypeError):
- # either testThing is a module, which raised a TypeError, or the file
- # couldn't be read
- return -1
- # orders which can be passed to trial --order
- _runOrders = {
- "alphabetical" : (
- "alphabetical order for test methods, arbitrary order for test cases",
- runner.name),
- "toptobottom" : (
- "attempt to run test cases and methods in the order they were defined",
- _maybeFindSourceLine),
- }
- def _checkKnownRunOrder(order):
- """
- Check that the given order is a known test running order.
- Does nothing else, since looking up the appropriate callable to sort the
- tests should be done when it actually will be used, as the default argument
- will not be coerced by this function.
- @param order: one of the known orders in C{_runOrders}
- @return: the order unmodified
- """
- if order not in _runOrders:
- raise usage.UsageError(
- "--order must be one of: %s. See --help-orders for details" %
- (", ".join(repr(order) for order in _runOrders),))
- return order
- class _BasicOptions(object):
- """
- Basic options shared between trial and its local workers.
- """
- longdesc = ("trial loads and executes a suite of unit tests, obtained "
- "from modules, packages and files listed on the command line.")
- optFlags = [["help", "h"],
- ["no-recurse", "N", "Don't recurse into packages"],
- ['help-orders', None, "Help on available test running orders"],
- ['help-reporters', None,
- "Help on available output plugins (reporters)"],
- ["rterrors", "e", "realtime errors, print out tracebacks as "
- "soon as they occur"],
- ["unclean-warnings", None,
- "Turn dirty reactor errors into warnings"],
- ["force-gc", None, "Have Trial run gc.collect() before and "
- "after each test case."],
- ["exitfirst", "x",
- "Exit after the first non-successful result (cannot be "
- "specified along with --jobs)."],
- ]
- optParameters = [
- ["order", "o", None,
- "Specify what order to run test cases and methods. "
- "See --help-orders for more info.", _checkKnownRunOrder],
- ["random", "z", None,
- "Run tests in random order using the specified seed"],
- ['temp-directory', None, '_trial_temp',
- 'Path to use as working directory for tests.'],
- ['reporter', None, 'verbose',
- 'The reporter to use for this test run. See --help-reporters for '
- 'more info.']]
- compData = usage.Completions(
- optActions={"order": usage.CompleteList(_runOrders),
- "reporter": _reporterAction,
- "logfile": usage.CompleteFiles(descr="log file name"),
- "random": usage.Completer(descr="random seed")},
- extraActions=[usage.CompleteFiles(
- "*.py", descr="file | module | package | TestCase | testMethod",
- repeat=True)],
- )
- fallbackReporter = reporter.TreeReporter
- tracer = None
- def __init__(self):
- self['tests'] = []
- usage.Options.__init__(self)
- def getSynopsis(self):
- executableName = reflect.filenameToModuleName(sys.argv[0])
- if executableName.endswith('.__main__'):
- executableName = '{} -m {}'.format(os.path.basename(sys.executable),
- executableName.replace('.__main__', ''))
- return """%s [options] [[file|package|module|TestCase|testmethod]...]
- """ % (executableName,)
- def coverdir(self):
- """
- Return a L{FilePath} representing the directory into which coverage
- results should be written.
- """
- coverdir = 'coverage'
- result = FilePath(self['temp-directory']).child(coverdir)
- print("Setting coverage directory to %s." % (result.path,))
- return result
- # TODO: Some of the opt_* methods on this class have docstrings and some do
- # not. This is mostly because usage.Options's currently will replace
- # any intended output in optFlags and optParameters with the
- # docstring. See #6427. When that is fixed, all methods should be
- # given docstrings (and it should be verified that those with
- # docstrings already have content suitable for printing as usage
- # information).
- def opt_coverage(self):
- """
- Generate coverage information in the coverage file in the
- directory specified by the temp-directory option.
- """
- import trace
- self.tracer = trace.Trace(count=1, trace=0)
- sys.settrace(self.tracer.globaltrace)
- self['coverage'] = True
- def opt_testmodule(self, filename):
- """
- Filename to grep for test cases (-*- test-case-name).
- """
- # If the filename passed to this parameter looks like a test module
- # we just add that to the test suite.
- #
- # If not, we inspect it for an Emacs buffer local variable called
- # 'test-case-name'. If that variable is declared, we try to add its
- # value to the test suite as a module.
- #
- # This parameter allows automated processes (like Buildbot) to pass
- # a list of files to Trial with the general expectation of "these files,
- # whatever they are, will get tested"
- if not os.path.isfile(filename):
- sys.stderr.write("File %r doesn't exist\n" % (filename,))
- return
- filename = os.path.abspath(filename)
- if isTestFile(filename):
- self['tests'].append(filename)
- else:
- self['tests'].extend(getTestModules(filename))
- def opt_spew(self):
- """
- Print an insanely verbose log of everything that happens. Useful
- when debugging freezes or locks in complex code.
- """
- from twisted.python.util import spewer
- sys.settrace(spewer)
- def opt_help_orders(self):
- synopsis = ("Trial can attempt to run test cases and their methods in "
- "a few different orders. You can select any of the "
- "following options using --order=<foo>.\n")
- print(synopsis)
- for name, (description, _) in sorted(_runOrders.items()):
- print(' ', name, '\t', description)
- sys.exit(0)
- def opt_help_reporters(self):
- synopsis = ("Trial's output can be customized using plugins called "
- "Reporters. You can\nselect any of the following "
- "reporters using --reporter=<foo>\n")
- print(synopsis)
- for p in plugin.getPlugins(itrial.IReporter):
- print(' ', p.longOpt, '\t', p.description)
- sys.exit(0)
- def opt_disablegc(self):
- """
- Disable the garbage collector
- """
- self["disablegc"] = True
- gc.disable()
- def opt_tbformat(self, opt):
- """
- Specify the format to display tracebacks with. Valid formats are
- 'plain', 'emacs', and 'cgitb' which uses the nicely verbose stdlib
- cgitb.text function
- """
- try:
- self['tbformat'] = TBFORMAT_MAP[opt]
- except KeyError:
- raise usage.UsageError(
- "tbformat must be 'plain', 'emacs', or 'cgitb'.")
- def opt_recursionlimit(self, arg):
- """
- see sys.setrecursionlimit()
- """
- try:
- sys.setrecursionlimit(int(arg))
- except (TypeError, ValueError):
- raise usage.UsageError(
- "argument to recursionlimit must be an integer")
- else:
- self["recursionlimit"] = int(arg)
- def opt_random(self, option):
- try:
- self['random'] = long(option)
- except ValueError:
- raise usage.UsageError(
- "Argument to --random must be a positive integer")
- else:
- if self['random'] < 0:
- raise usage.UsageError(
- "Argument to --random must be a positive integer")
- elif self['random'] == 0:
- self['random'] = long(time.time() * 100)
- def opt_without_module(self, option):
- """
- Fake the lack of the specified modules, separated with commas.
- """
- self["without-module"] = option
- for module in option.split(","):
- if module in sys.modules:
- warnings.warn("Module '%s' already imported, "
- "disabling anyway." % (module,),
- category=RuntimeWarning)
- sys.modules[module] = None
- def parseArgs(self, *args):
- self['tests'].extend(args)
- def _loadReporterByName(self, name):
- for p in plugin.getPlugins(itrial.IReporter):
- qual = "%s.%s" % (p.module, p.klass)
- if p.longOpt == name:
- return reflect.namedAny(qual)
- raise usage.UsageError("Only pass names of Reporter plugins to "
- "--reporter. See --help-reporters for "
- "more info.")
- def postOptions(self):
- # Only load reporters now, as opposed to any earlier, to avoid letting
- # application-defined plugins muck up reactor selecting by importing
- # t.i.reactor and causing the default to be installed.
- self['reporter'] = self._loadReporterByName(self['reporter'])
- if 'tbformat' not in self:
- self['tbformat'] = 'default'
- if self['order'] is not None and self['random'] is not None:
- raise usage.UsageError(
- "You can't specify --random when using --order")
- class Options(_BasicOptions, usage.Options, app.ReactorSelectionMixin):
- """
- Options to the trial command line tool.
- @ivar _workerFlags: List of flags which are accepted by trial distributed
- workers. This is used by C{_getWorkerArguments} to build the command
- line arguments.
- @type _workerFlags: C{list}
- @ivar _workerParameters: List of parameter which are accepted by trial
- distributed workers. This is used by C{_getWorkerArguments} to build
- the command line arguments.
- @type _workerParameters: C{list}
- """
- optFlags = [
- ["debug", "b", "Run tests in a debugger. If that debugger is "
- "pdb, will load '.pdbrc' from current directory if it exists."
- ],
- ["debug-stacktraces", "B", "Report Deferred creation and "
- "callback stack traces"],
- ["nopm", None, "don't automatically jump into debugger for "
- "postmorteming of exceptions"],
- ["dry-run", 'n', "do everything but run the tests"],
- ["profile", None, "Run tests under the Python profiler"],
- ["until-failure", "u", "Repeat test until it fails"],
- ]
- optParameters = [
- ["debugger", None, "pdb", "the fully qualified name of a debugger to "
- "use if --debug is passed"],
- ["logfile", "l", "test.log", "log file name"],
- ["jobs", "j", None, "Number of local workers to run"]
- ]
- compData = usage.Completions(
- optActions = {
- "tbformat": usage.CompleteList(["plain", "emacs", "cgitb"]),
- "reporter": _reporterAction,
- },
- )
- _workerFlags = ["disablegc", "force-gc", "coverage"]
- _workerParameters = ["recursionlimit", "reactor", "without-module"]
- fallbackReporter = reporter.TreeReporter
- extra = None
- tracer = None
- def opt_jobs(self, number):
- """
- Number of local workers to run, a strictly positive integer.
- """
- try:
- number = int(number)
- except ValueError:
- raise usage.UsageError(
- "Expecting integer argument to jobs, got '%s'" % number)
- if number <= 0:
- raise usage.UsageError(
- "Argument to jobs must be a strictly positive integer")
- self["jobs"] = number
- def _getWorkerArguments(self):
- """
- Return a list of options to pass to distributed workers.
- """
- args = []
- for option in self._workerFlags:
- if self.get(option) is not None:
- if self[option]:
- args.append("--%s" % (option,))
- for option in self._workerParameters:
- if self.get(option) is not None:
- args.extend(["--%s" % (option,), str(self[option])])
- return args
- def postOptions(self):
- _BasicOptions.postOptions(self)
- if self['jobs']:
- conflicts = ['debug', 'profile', 'debug-stacktraces', 'exitfirst']
- for option in conflicts:
- if self[option]:
- raise usage.UsageError(
- "You can't specify --%s when using --jobs" % option)
- if self['nopm']:
- if not self['debug']:
- raise usage.UsageError("You must specify --debug when using "
- "--nopm ")
- failure.DO_POST_MORTEM = False
- def _initialDebugSetup(config):
- # do this part of debug setup first for easy debugging of import failures
- if config['debug']:
- failure.startDebugMode()
- if config['debug'] or config['debug-stacktraces']:
- defer.setDebugging(True)
- def _getSuite(config):
- loader = _getLoader(config)
- recurse = not config['no-recurse']
- return loader.loadByNames(config['tests'], recurse=recurse)
- def _getLoader(config):
- loader = runner.TestLoader()
- if config['random']:
- randomer = random.Random()
- randomer.seed(config['random'])
- loader.sorter = lambda x : randomer.random()
- print('Running tests shuffled with seed %d\n' % config['random'])
- elif config['order']:
- _, sorter = _runOrders[config['order']]
- loader.sorter = sorter
- if not config['until-failure']:
- loader.suiteFactory = runner.DestructiveTestSuite
- return loader
- def _wrappedPdb():
- """
- Wrap an instance of C{pdb.Pdb} with readline support and load any .rcs.
- """
- dbg = pdb.Pdb()
- try:
- namedModule('readline')
- except ImportError:
- print("readline module not available")
- for path in ('.pdbrc', 'pdbrc'):
- if os.path.exists(path):
- try:
- rcFile = open(path, 'r')
- except IOError:
- pass
- else:
- with rcFile:
- dbg.rcLines.extend(rcFile.readlines())
- return dbg
- class _DebuggerNotFound(Exception):
- """
- A debugger import failed.
- Used to allow translating these errors into usage error messages.
- """
- def _makeRunner(config):
- """
- Return a trial runner class set up with the parameters extracted from
- C{config}.
- @return: A trial runner instance.
- @rtype: L{runner.TrialRunner} or C{DistTrialRunner} depending on the
- configuration.
- """
- cls = runner.TrialRunner
- args = {'reporterFactory': config['reporter'],
- 'tracebackFormat': config['tbformat'],
- 'realTimeErrors': config['rterrors'],
- 'uncleanWarnings': config['unclean-warnings'],
- 'logfile': config['logfile'],
- 'workingDirectory': config['temp-directory']}
- if config['dry-run']:
- args['mode'] = runner.TrialRunner.DRY_RUN
- elif config['jobs']:
- from twisted.trial._dist.disttrial import DistTrialRunner
- cls = DistTrialRunner
- args['workerNumber'] = config['jobs']
- args['workerArguments'] = config._getWorkerArguments()
- else:
- if config['debug']:
- args['mode'] = runner.TrialRunner.DEBUG
- debugger = config['debugger']
- if debugger != 'pdb':
- try:
- args['debugger'] = reflect.namedAny(debugger)
- except reflect.ModuleNotFound:
- raise _DebuggerNotFound(
- '%r debugger could not be found.' % (debugger,))
- else:
- args['debugger'] = _wrappedPdb()
- args['exitFirst'] = config['exitfirst']
- args['profile'] = config['profile']
- args['forceGarbageCollection'] = config['force-gc']
- return cls(**args)
- def run():
- if len(sys.argv) == 1:
- sys.argv.append("--help")
- config = Options()
- try:
- config.parseOptions()
- except usage.error as ue:
- raise SystemExit("%s: %s" % (sys.argv[0], ue))
- _initialDebugSetup(config)
- try:
- trialRunner = _makeRunner(config)
- except _DebuggerNotFound as e:
- raise SystemExit('%s: %s' % (sys.argv[0], str(e)))
- suite = _getSuite(config)
- if config['until-failure']:
- test_result = trialRunner.runUntilFailure(suite)
- else:
- test_result = trialRunner.run(suite)
- if config.tracer:
- sys.settrace(None)
- results = config.tracer.results()
- results.write_results(show_missing=1, summary=False,
- coverdir=config.coverdir().path)
- sys.exit(not test_result.wasSuccessful())
|