worker.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. # -*- test-case-name: twisted.trial._dist.test.test_worker -*-
  2. #
  3. # Copyright (c) Twisted Matrix Laboratories.
  4. # See LICENSE for details.
  5. """
  6. This module implements the worker classes.
  7. @since: 12.3
  8. """
  9. import os
  10. from zope.interface import implementer
  11. from twisted.internet.protocol import ProcessProtocol
  12. from twisted.internet.interfaces import ITransport, IAddress
  13. from twisted.internet.defer import Deferred
  14. from twisted.protocols.amp import AMP
  15. from twisted.python.failure import Failure
  16. from twisted.python.reflect import namedObject
  17. from twisted.trial.unittest import Todo
  18. from twisted.trial.runner import TrialSuite, TestLoader
  19. from twisted.trial._dist import workercommands, managercommands
  20. from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT
  21. from twisted.trial._dist.workerreporter import WorkerReporter
  22. class WorkerProtocol(AMP):
  23. """
  24. The worker-side trial distributed protocol.
  25. """
  26. def __init__(self, forceGarbageCollection=False):
  27. self._loader = TestLoader()
  28. self._result = WorkerReporter(self)
  29. self._forceGarbageCollection = forceGarbageCollection
  30. def run(self, testCase):
  31. """
  32. Run a test case by name.
  33. """
  34. case = self._loader.loadByName(testCase)
  35. suite = TrialSuite([case], self._forceGarbageCollection)
  36. suite.run(self._result)
  37. return {'success': True}
  38. workercommands.Run.responder(run)
  39. def start(self, directory):
  40. """
  41. Set up the worker, moving into given directory for tests to run in
  42. them.
  43. """
  44. os.chdir(directory)
  45. return {'success': True}
  46. workercommands.Start.responder(start)
  47. class LocalWorkerAMP(AMP):
  48. """
  49. Local implementation of the manager commands.
  50. """
  51. def addSuccess(self, testName):
  52. """
  53. Add a success to the reporter.
  54. """
  55. self._result.addSuccess(self._testCase)
  56. return {'success': True}
  57. managercommands.AddSuccess.responder(addSuccess)
  58. def _buildFailure(self, error, errorClass, frames):
  59. """
  60. Helper to build a C{Failure} with some traceback.
  61. @param error: An C{Exception} instance.
  62. @param error: The class name of the C{error} class.
  63. @param frames: A flat list of strings representing the information need
  64. to approximatively rebuild C{Failure} frames.
  65. @return: A L{Failure} instance with enough information about a test
  66. error.
  67. """
  68. errorType = namedObject(errorClass)
  69. failure = Failure(error, errorType)
  70. for i in range(0, len(frames), 3):
  71. failure.frames.append(
  72. (frames[i], frames[i + 1], int(frames[i + 2]), [], []))
  73. return failure
  74. def addError(self, testName, error, errorClass, frames):
  75. """
  76. Add an error to the reporter.
  77. """
  78. failure = self._buildFailure(error, errorClass, frames)
  79. self._result.addError(self._testCase, failure)
  80. return {'success': True}
  81. managercommands.AddError.responder(addError)
  82. def addFailure(self, testName, fail, failClass, frames):
  83. """
  84. Add a failure to the reporter.
  85. """
  86. failure = self._buildFailure(fail, failClass, frames)
  87. self._result.addFailure(self._testCase, failure)
  88. return {'success': True}
  89. managercommands.AddFailure.responder(addFailure)
  90. def addSkip(self, testName, reason):
  91. """
  92. Add a skip to the reporter.
  93. """
  94. self._result.addSkip(self._testCase, reason)
  95. return {'success': True}
  96. managercommands.AddSkip.responder(addSkip)
  97. def addExpectedFailure(self, testName, error, todo):
  98. """
  99. Add an expected failure to the reporter.
  100. """
  101. _todo = Todo(todo)
  102. self._result.addExpectedFailure(self._testCase, error, _todo)
  103. return {'success': True}
  104. managercommands.AddExpectedFailure.responder(addExpectedFailure)
  105. def addUnexpectedSuccess(self, testName, todo):
  106. """
  107. Add an unexpected success to the reporter.
  108. """
  109. self._result.addUnexpectedSuccess(self._testCase, todo)
  110. return {'success': True}
  111. managercommands.AddUnexpectedSuccess.responder(addUnexpectedSuccess)
  112. def testWrite(self, out):
  113. """
  114. Print test output from the worker.
  115. """
  116. self._testStream.write(out + '\n')
  117. self._testStream.flush()
  118. return {'success': True}
  119. managercommands.TestWrite.responder(testWrite)
  120. def _stopTest(self, result):
  121. """
  122. Stop the current running test case, forwarding the result.
  123. """
  124. self._result.stopTest(self._testCase)
  125. return result
  126. def run(self, testCase, result):
  127. """
  128. Run a test.
  129. """
  130. self._testCase = testCase
  131. self._result = result
  132. self._result.startTest(testCase)
  133. testCaseId = testCase.id()
  134. d = self.callRemote(workercommands.Run, testCase=testCaseId)
  135. return d.addCallback(self._stopTest)
  136. def setTestStream(self, stream):
  137. """
  138. Set the stream used to log output from tests.
  139. """
  140. self._testStream = stream
  141. @implementer(IAddress)
  142. class LocalWorkerAddress(object):
  143. """
  144. A L{IAddress} implementation meant to provide stub addresses for
  145. L{ITransport.getPeer} and L{ITransport.getHost}.
  146. """
  147. @implementer(ITransport)
  148. class LocalWorkerTransport(object):
  149. """
  150. A stub transport implementation used to support L{AMP} over a
  151. L{ProcessProtocol} transport.
  152. """
  153. def __init__(self, transport):
  154. self._transport = transport
  155. def write(self, data):
  156. """
  157. Forward data to transport.
  158. """
  159. self._transport.writeToChild(_WORKER_AMP_STDIN, data)
  160. def writeSequence(self, sequence):
  161. """
  162. Emulate C{writeSequence} by iterating data in the C{sequence}.
  163. """
  164. for data in sequence:
  165. self._transport.writeToChild(_WORKER_AMP_STDIN, data)
  166. def loseConnection(self):
  167. """
  168. Closes the transport.
  169. """
  170. self._transport.loseConnection()
  171. def getHost(self):
  172. """
  173. Return a L{LocalWorkerAddress} instance.
  174. """
  175. return LocalWorkerAddress()
  176. def getPeer(self):
  177. """
  178. Return a L{LocalWorkerAddress} instance.
  179. """
  180. return LocalWorkerAddress()
  181. class LocalWorker(ProcessProtocol):
  182. """
  183. Local process worker protocol. This worker runs as a local process and
  184. communicates via stdin/out.
  185. @ivar _ampProtocol: The L{AMP} protocol instance used to communicate with
  186. the worker.
  187. @ivar _logDirectory: The directory where logs will reside.
  188. @ivar _logFile: The name of the main log file for tests output.
  189. """
  190. def __init__(self, ampProtocol, logDirectory, logFile):
  191. self._ampProtocol = ampProtocol
  192. self._logDirectory = logDirectory
  193. self._logFile = logFile
  194. self.endDeferred = Deferred()
  195. def connectionMade(self):
  196. """
  197. When connection is made, create the AMP protocol instance.
  198. """
  199. self._ampProtocol.makeConnection(LocalWorkerTransport(self.transport))
  200. if not os.path.exists(self._logDirectory):
  201. os.makedirs(self._logDirectory)
  202. self._outLog = open(os.path.join(self._logDirectory, 'out.log'), 'wb')
  203. self._errLog = open(os.path.join(self._logDirectory, 'err.log'), 'wb')
  204. self._testLog = open(
  205. os.path.join(self._logDirectory, self._logFile), 'w')
  206. self._ampProtocol.setTestStream(self._testLog)
  207. logDirectory = self._logDirectory
  208. d = self._ampProtocol.callRemote(workercommands.Start,
  209. directory=logDirectory)
  210. # Ignore the potential errors, the test suite will fail properly and it
  211. # would just print garbage.
  212. d.addErrback(lambda x: None)
  213. def connectionLost(self, reason):
  214. """
  215. On connection lost, close the log files that we're managing for stdin
  216. and stdout.
  217. """
  218. self._outLog.close()
  219. self._errLog.close()
  220. self._testLog.close()
  221. def processEnded(self, reason):
  222. """
  223. When the process closes, call C{connectionLost} for cleanup purposes
  224. and forward the information to the C{_ampProtocol}.
  225. """
  226. self.connectionLost(reason)
  227. self._ampProtocol.connectionLost(reason)
  228. self.endDeferred.callback(reason)
  229. def outReceived(self, data):
  230. """
  231. Send data received from stdout to log.
  232. """
  233. self._outLog.write(data)
  234. def errReceived(self, data):
  235. """
  236. Write error data to log.
  237. """
  238. self._errLog.write(data)
  239. def childDataReceived(self, childFD, data):
  240. """
  241. Handle data received on the specific pipe for the C{_ampProtocol}.
  242. """
  243. if childFD == _WORKER_AMP_STDOUT:
  244. self._ampProtocol.dataReceived(data)
  245. else:
  246. ProcessProtocol.childDataReceived(self, childFD, data)