_threadworker.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # -*- test-case-name: twisted._threads.test.test_threadworker -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Implementation of an L{IWorker} based on native threads and queues.
  6. """
  7. from __future__ import absolute_import, division, print_function
  8. from zope.interface import implementer
  9. from ._ithreads import IExclusiveWorker
  10. from ._convenience import Quit
  11. _stop = object()
  12. @implementer(IExclusiveWorker)
  13. class ThreadWorker(object):
  14. """
  15. An L{IExclusiveWorker} implemented based on a single thread and a queue.
  16. This worker ensures exclusivity (i.e. it is an L{IExclusiveWorker} and not
  17. an L{IWorker}) by performing all of the work passed to C{do} on the I{same}
  18. thread.
  19. """
  20. def __init__(self, startThread, queue):
  21. """
  22. Create a L{ThreadWorker} with a function to start a thread and a queue
  23. to use to communicate with that thread.
  24. @param startThread: a callable that takes a callable to run in another
  25. thread.
  26. @type startThread: callable taking a 0-argument callable and returning
  27. nothing.
  28. @param queue: A L{Queue} to use to give tasks to the thread created by
  29. C{startThread}.
  30. @param queue: L{Queue}
  31. """
  32. self._q = queue
  33. self._hasQuit = Quit()
  34. def work():
  35. for task in iter(queue.get, _stop):
  36. task()
  37. startThread(work)
  38. def do(self, task):
  39. """
  40. Perform the given task on the thread owned by this L{ThreadWorker}.
  41. @param task: the function to call on a thread.
  42. """
  43. self._hasQuit.check()
  44. self._q.put(task)
  45. def quit(self):
  46. """
  47. Reject all future work and stop the thread started by C{__init__}.
  48. """
  49. # Reject all future work. Set this _before_ enqueueing _stop, so
  50. # that no work is ever enqueued _after_ _stop.
  51. self._hasQuit.set()
  52. self._q.put(_stop)
  53. @implementer(IExclusiveWorker)
  54. class LockWorker(object):
  55. """
  56. An L{IWorker} implemented based on a mutual-exclusion lock.
  57. """
  58. def __init__(self, lock, local):
  59. """
  60. @param lock: A mutual-exclusion lock, with C{acquire} and C{release}
  61. methods.
  62. @type lock: L{threading.Lock}
  63. @param local: Local storage.
  64. @type local: L{threading.local}
  65. """
  66. self._quit = Quit()
  67. self._lock = lock
  68. self._local = local
  69. def do(self, work):
  70. """
  71. Do the given work on this thread, with the mutex acquired. If this is
  72. called re-entrantly, return and wait for the outer invocation to do the
  73. work.
  74. @param work: the work to do with the lock held.
  75. """
  76. lock = self._lock
  77. local = self._local
  78. self._quit.check()
  79. working = getattr(local, "working", None)
  80. if working is None:
  81. working = local.working = []
  82. working.append(work)
  83. lock.acquire()
  84. try:
  85. while working:
  86. working.pop(0)()
  87. finally:
  88. lock.release()
  89. local.working = None
  90. else:
  91. working.append(work)
  92. def quit(self):
  93. """
  94. Quit this L{LockWorker}.
  95. """
  96. self._quit.set()
  97. self._lock = None