threads.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Extended thread dispatching support.
  5. For basic support see reactor threading API docs.
  6. """
  7. from __future__ import division, absolute_import
  8. from twisted.python.compat import _PY3
  9. if not _PY3:
  10. import Queue
  11. else:
  12. import queue as Queue
  13. from twisted.python import failure
  14. from twisted.internet import defer
  15. def deferToThreadPool(reactor, threadpool, f, *args, **kwargs):
  16. """
  17. Call the function C{f} using a thread from the given threadpool and return
  18. the result as a Deferred.
  19. This function is only used by client code which is maintaining its own
  20. threadpool. To run a function in the reactor's threadpool, use
  21. C{deferToThread}.
  22. @param reactor: The reactor in whose main thread the Deferred will be
  23. invoked.
  24. @param threadpool: An object which supports the C{callInThreadWithCallback}
  25. method of C{twisted.python.threadpool.ThreadPool}.
  26. @param f: The function to call.
  27. @param *args: positional arguments to pass to f.
  28. @param **kwargs: keyword arguments to pass to f.
  29. @return: A Deferred which fires a callback with the result of f, or an
  30. errback with a L{twisted.python.failure.Failure} if f throws an
  31. exception.
  32. """
  33. d = defer.Deferred()
  34. def onResult(success, result):
  35. if success:
  36. reactor.callFromThread(d.callback, result)
  37. else:
  38. reactor.callFromThread(d.errback, result)
  39. threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)
  40. return d
  41. def deferToThread(f, *args, **kwargs):
  42. """
  43. Run a function in a thread and return the result as a Deferred.
  44. @param f: The function to call.
  45. @param *args: positional arguments to pass to f.
  46. @param **kwargs: keyword arguments to pass to f.
  47. @return: A Deferred which fires a callback with the result of f,
  48. or an errback with a L{twisted.python.failure.Failure} if f throws
  49. an exception.
  50. """
  51. from twisted.internet import reactor
  52. return deferToThreadPool(reactor, reactor.getThreadPool(),
  53. f, *args, **kwargs)
  54. def _runMultiple(tupleList):
  55. """
  56. Run a list of functions.
  57. """
  58. for f, args, kwargs in tupleList:
  59. f(*args, **kwargs)
  60. def callMultipleInThread(tupleList):
  61. """
  62. Run a list of functions in the same thread.
  63. tupleList should be a list of (function, argsList, kwargsDict) tuples.
  64. """
  65. from twisted.internet import reactor
  66. reactor.callInThread(_runMultiple, tupleList)
  67. def blockingCallFromThread(reactor, f, *a, **kw):
  68. """
  69. Run a function in the reactor from a thread, and wait for the result
  70. synchronously. If the function returns a L{Deferred}, wait for its
  71. result and return that.
  72. @param reactor: The L{IReactorThreads} provider which will be used to
  73. schedule the function call.
  74. @param f: the callable to run in the reactor thread
  75. @type f: any callable.
  76. @param a: the arguments to pass to C{f}.
  77. @param kw: the keyword arguments to pass to C{f}.
  78. @return: the result of the L{Deferred} returned by C{f}, or the result
  79. of C{f} if it returns anything other than a L{Deferred}.
  80. @raise: If C{f} raises a synchronous exception,
  81. C{blockingCallFromThread} will raise that exception. If C{f}
  82. returns a L{Deferred} which fires with a L{Failure},
  83. C{blockingCallFromThread} will raise that failure's exception (see
  84. L{Failure.raiseException}).
  85. """
  86. queue = Queue.Queue()
  87. def _callFromThread():
  88. result = defer.maybeDeferred(f, *a, **kw)
  89. result.addBoth(queue.put)
  90. reactor.callFromThread(_callFromThread)
  91. result = queue.get()
  92. if isinstance(result, failure.Failure):
  93. result.raiseException()
  94. return result
  95. __all__ = ["deferToThread", "deferToThreadPool", "callMultipleInThread",
  96. "blockingCallFromThread"]