disttrial.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # -*- test-case-name: twisted.trial._dist.test.test_disttrial -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. This module contains the trial distributed runner, the management class
  6. responsible for coordinating all of trial's behavior at the highest level.
  7. @since: 12.3
  8. """
  9. import os
  10. import sys
  11. from twisted.python.filepath import FilePath
  12. from twisted.python.modules import theSystemPath
  13. from twisted.internet.defer import DeferredList
  14. from twisted.internet.task import cooperate
  15. from twisted.trial.util import _unusedTestDirectory
  16. from twisted.trial._asyncrunner import _iterateTests
  17. from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP
  18. from twisted.trial._dist.distreporter import DistReporter
  19. from twisted.trial.reporter import UncleanWarningsReporterWrapper
  20. from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
  21. class DistTrialRunner(object):
  22. """
  23. A specialized runner for distributed trial. The runner launches a number of
  24. local worker processes which will run tests.
  25. @ivar _workerNumber: the number of workers to be spawned.
  26. @type _workerNumber: C{int}
  27. @ivar _stream: stream which the reporter will use.
  28. @ivar _reporterFactory: the reporter class to be used.
  29. """
  30. _distReporterFactory = DistReporter
  31. def _makeResult(self):
  32. """
  33. Make reporter factory, and wrap it with a L{DistReporter}.
  34. """
  35. reporter = self._reporterFactory(self._stream, self._tbformat,
  36. realtime=self._rterrors)
  37. if self._uncleanWarnings:
  38. reporter = UncleanWarningsReporterWrapper(reporter)
  39. return self._distReporterFactory(reporter)
  40. def __init__(self, reporterFactory, workerNumber, workerArguments,
  41. stream=None,
  42. tracebackFormat='default',
  43. realTimeErrors=False,
  44. uncleanWarnings=False,
  45. logfile='test.log',
  46. workingDirectory='_trial_temp'):
  47. self._workerNumber = workerNumber
  48. self._workerArguments = workerArguments
  49. self._reporterFactory = reporterFactory
  50. if stream is None:
  51. stream = sys.stdout
  52. self._stream = stream
  53. self._tbformat = tracebackFormat
  54. self._rterrors = realTimeErrors
  55. self._uncleanWarnings = uncleanWarnings
  56. self._result = None
  57. self._workingDirectory = workingDirectory
  58. self._logFile = logfile
  59. self._logFileObserver = None
  60. self._logFileObject = None
  61. self._logWarnings = False
  62. def writeResults(self, result):
  63. """
  64. Write test run final outcome to result.
  65. @param result: A C{TestResult} which will print errors and the summary.
  66. """
  67. result.done()
  68. def createLocalWorkers(self, protocols, workingDirectory):
  69. """
  70. Create local worker protocol instances and return them.
  71. @param protocols: An iterable of L{LocalWorkerAMP} instances.
  72. @param workingDirectory: The base path in which we should run the
  73. workers.
  74. @type workingDirectory: C{str}
  75. @return: A list of C{quantity} C{LocalWorker} instances.
  76. """
  77. return [LocalWorker(protocol,
  78. os.path.join(workingDirectory, str(x)),
  79. self._logFile)
  80. for x, protocol in enumerate(protocols)]
  81. def launchWorkerProcesses(self, spawner, protocols, arguments):
  82. """
  83. Spawn processes from a list of process protocols.
  84. @param spawner: A C{IReactorProcess.spawnProcess} implementation.
  85. @param protocols: An iterable of C{ProcessProtocol} instances.
  86. @param arguments: Extra arguments passed to the processes.
  87. """
  88. workertrialPath = theSystemPath[
  89. 'twisted.trial._dist.workertrial'].filePath.path
  90. childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w',
  91. _WORKER_AMP_STDOUT: 'r'}
  92. environ = os.environ.copy()
  93. # Add an environment variable containing the raw sys.path, to be used by
  94. # subprocesses to make sure it's identical to the parent. See
  95. # workertrial._setupPath.
  96. environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path)
  97. for worker in protocols:
  98. args = [sys.executable, workertrialPath]
  99. args.extend(arguments)
  100. spawner(worker, sys.executable, args=args, childFDs=childFDs,
  101. env=environ)
  102. def _driveWorker(self, worker, result, testCases, cooperate):
  103. """
  104. Drive a L{LocalWorkerAMP} instance, iterating the tests and calling
  105. C{run} for every one of them.
  106. @param worker: The L{LocalWorkerAMP} to drive.
  107. @param result: The global L{DistReporter} instance.
  108. @param testCases: The global list of tests to iterate.
  109. @param cooperate: The cooperate function to use, to be customized in
  110. tests.
  111. @type cooperate: C{function}
  112. @return: A C{Deferred} firing when all the tests are finished.
  113. """
  114. def resultErrback(error, case):
  115. result.original.addFailure(case, error)
  116. return error
  117. def task(case):
  118. d = worker.run(case, result)
  119. d.addErrback(resultErrback, case)
  120. return d
  121. return cooperate(task(case) for case in testCases).whenDone()
  122. def run(self, suite, reactor=None, cooperate=cooperate,
  123. untilFailure=False):
  124. """
  125. Spawn local worker processes and load tests. After that, run them.
  126. @param suite: A tests suite to be run.
  127. @param reactor: The reactor to use, to be customized in tests.
  128. @type reactor: A provider of
  129. L{twisted.internet.interfaces.IReactorProcess}
  130. @param cooperate: The cooperate function to use, to be customized in
  131. tests.
  132. @type cooperate: C{function}
  133. @param untilFailure: If C{True}, continue to run the tests until they
  134. fail.
  135. @type untilFailure: C{bool}.
  136. @return: The test result.
  137. @rtype: L{DistReporter}
  138. """
  139. if reactor is None:
  140. from twisted.internet import reactor
  141. result = self._makeResult()
  142. count = suite.countTestCases()
  143. self._stream.write("Running %d tests.\n" % (count,))
  144. if not count:
  145. # Take a shortcut if there is no test
  146. suite.run(result.original)
  147. self.writeResults(result)
  148. return result
  149. testDir, testDirLock = _unusedTestDirectory(
  150. FilePath(self._workingDirectory))
  151. workerNumber = min(count, self._workerNumber)
  152. ampWorkers = [LocalWorkerAMP() for x in range(workerNumber)]
  153. workers = self.createLocalWorkers(ampWorkers, testDir.path)
  154. processEndDeferreds = [worker.endDeferred for worker in workers]
  155. self.launchWorkerProcesses(reactor.spawnProcess, workers,
  156. self._workerArguments)
  157. def runTests():
  158. testCases = iter(list(_iterateTests(suite)))
  159. workerDeferreds = []
  160. for worker in ampWorkers:
  161. workerDeferreds.append(
  162. self._driveWorker(worker, result, testCases,
  163. cooperate=cooperate))
  164. return DeferredList(workerDeferreds, consumeErrors=True,
  165. fireOnOneErrback=True)
  166. stopping = []
  167. def nextRun(ign):
  168. self.writeResults(result)
  169. if not untilFailure:
  170. return
  171. if not result.wasSuccessful():
  172. return
  173. d = runTests()
  174. return d.addCallback(nextRun)
  175. def stop(ign):
  176. testDirLock.unlock()
  177. if not stopping:
  178. stopping.append(None)
  179. reactor.stop()
  180. def beforeShutDown():
  181. if not stopping:
  182. stopping.append(None)
  183. d = DeferredList(processEndDeferreds, consumeErrors=True)
  184. return d.addCallback(continueShutdown)
  185. def continueShutdown(ign):
  186. self.writeResults(result)
  187. return ign
  188. d = runTests()
  189. d.addCallback(nextRun)
  190. d.addBoth(stop)
  191. reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown)
  192. reactor.run()
  193. return result
  194. def runUntilFailure(self, suite):
  195. """
  196. Run the tests with local worker processes until they fail.
  197. @param suite: A tests suite to be run.
  198. """
  199. return self.run(suite, untilFailure=True)