threads.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Extended thread dispatching support.
  5. For basic support see reactor threading API docs.
  6. """
  7. from __future__ import annotations
  8. import queue as Queue
  9. from typing import Callable, TypeVar
  10. from typing_extensions import ParamSpec
  11. from twisted.internet import defer
  12. from twisted.internet.interfaces import IReactorFromThreads
  13. from twisted.python import failure
  14. from twisted.python.threadpool import ThreadPool
  15. _P = ParamSpec("_P")
  16. _R = TypeVar("_R")
  17. def deferToThreadPool(
  18. reactor: IReactorFromThreads,
  19. threadpool: ThreadPool,
  20. f: Callable[_P, _R],
  21. *args: _P.args,
  22. **kwargs: _P.kwargs,
  23. ) -> defer.Deferred[_R]:
  24. """
  25. Call the function C{f} using a thread from the given threadpool and return
  26. the result as a Deferred.
  27. This function is only used by client code which is maintaining its own
  28. threadpool. To run a function in the reactor's threadpool, use
  29. C{deferToThread}.
  30. @param reactor: The reactor in whose main thread the Deferred will be
  31. invoked.
  32. @param threadpool: An object which supports the C{callInThreadWithCallback}
  33. method of C{twisted.python.threadpool.ThreadPool}.
  34. @param f: The function to call.
  35. @param args: positional arguments to pass to f.
  36. @param kwargs: keyword arguments to pass to f.
  37. @return: A Deferred which fires a callback with the result of f, or an
  38. errback with a L{twisted.python.failure.Failure} if f throws an
  39. exception.
  40. """
  41. d: defer.Deferred[_R] = defer.Deferred()
  42. def onResult(success: bool, result: _R | BaseException) -> None:
  43. if success:
  44. reactor.callFromThread(d.callback, result)
  45. else:
  46. reactor.callFromThread(d.errback, result)
  47. threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)
  48. return d
  49. def deferToThread(f, *args, **kwargs):
  50. """
  51. Run a function in a thread and return the result as a Deferred.
  52. @param f: The function to call.
  53. @param args: positional arguments to pass to f.
  54. @param kwargs: keyword arguments to pass to f.
  55. @return: A Deferred which fires a callback with the result of f,
  56. or an errback with a L{twisted.python.failure.Failure} if f throws
  57. an exception.
  58. """
  59. from twisted.internet import reactor
  60. return deferToThreadPool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
  61. def _runMultiple(tupleList):
  62. """
  63. Run a list of functions.
  64. """
  65. for f, args, kwargs in tupleList:
  66. f(*args, **kwargs)
  67. def callMultipleInThread(tupleList):
  68. """
  69. Run a list of functions in the same thread.
  70. tupleList should be a list of (function, argsList, kwargsDict) tuples.
  71. """
  72. from twisted.internet import reactor
  73. reactor.callInThread(_runMultiple, tupleList)
  74. def blockingCallFromThread(reactor, f, *a, **kw):
  75. """
  76. Run a function in the reactor from a thread, and wait for the result
  77. synchronously. If the function returns a L{Deferred}, wait for its
  78. result and return that.
  79. @param reactor: The L{IReactorThreads} provider which will be used to
  80. schedule the function call.
  81. @param f: the callable to run in the reactor thread
  82. @type f: any callable.
  83. @param a: the arguments to pass to C{f}.
  84. @param kw: the keyword arguments to pass to C{f}.
  85. @return: the result of the L{Deferred} returned by C{f}, or the result
  86. of C{f} if it returns anything other than a L{Deferred}.
  87. @raise Exception: If C{f} raises a synchronous exception,
  88. C{blockingCallFromThread} will raise that exception. If C{f}
  89. returns a L{Deferred} which fires with a L{Failure},
  90. C{blockingCallFromThread} will raise that failure's exception (see
  91. L{Failure.raiseException}).
  92. """
  93. queue = Queue.Queue()
  94. def _callFromThread():
  95. result = defer.maybeDeferred(f, *a, **kw)
  96. result.addBoth(queue.put)
  97. reactor.callFromThread(_callFromThread)
  98. result = queue.get()
  99. if isinstance(result, failure.Failure):
  100. result.raiseException()
  101. return result
  102. __all__ = [
  103. "deferToThread",
  104. "deferToThreadPool",
  105. "callMultipleInThread",
  106. "blockingCallFromThread",
  107. ]