_threadworker.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # -*- test-case-name: twisted._threads.test.test_threadworker -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Implementation of an L{IWorker} based on native threads and queues.
  6. """
  7. from __future__ import annotations
  8. from enum import Enum, auto
  9. from typing import TYPE_CHECKING, Callable, Iterator, Literal, Protocol, TypeVar
  10. if TYPE_CHECKING:
  11. import threading
  12. from zope.interface import implementer
  13. from ._convenience import Quit
  14. from ._ithreads import IExclusiveWorker
  15. class Stop(Enum):
  16. Thread = auto()
  17. StopThread = Stop.Thread
  18. T = TypeVar("T")
  19. U = TypeVar("U")
  20. class SimpleQueue(Protocol[T]):
  21. def put(self, item: T) -> None:
  22. ...
  23. def get(self) -> T:
  24. ...
  25. # when the sentinel value is a literal in a union, this is how iter works
  26. smartiter: Callable[[Callable[[], T | U], U], Iterator[T]]
  27. smartiter = iter # type:ignore[assignment]
  28. @implementer(IExclusiveWorker)
  29. class ThreadWorker:
  30. """
  31. An L{IExclusiveWorker} implemented based on a single thread and a queue.
  32. This worker ensures exclusivity (i.e. it is an L{IExclusiveWorker} and not
  33. an L{IWorker}) by performing all of the work passed to C{do} on the I{same}
  34. thread.
  35. """
  36. def __init__(
  37. self,
  38. startThread: Callable[[Callable[[], object]], object],
  39. queue: SimpleQueue[Callable[[], object] | Literal[Stop.Thread]],
  40. ):
  41. """
  42. Create a L{ThreadWorker} with a function to start a thread and a queue
  43. to use to communicate with that thread.
  44. @param startThread: a callable that takes a callable to run in another
  45. thread.
  46. @param queue: A L{Queue} to use to give tasks to the thread created by
  47. C{startThread}.
  48. """
  49. self._q = queue
  50. self._hasQuit = Quit()
  51. def work() -> None:
  52. for task in smartiter(queue.get, StopThread):
  53. task()
  54. startThread(work)
  55. def do(self, task: Callable[[], None]) -> None:
  56. """
  57. Perform the given task on the thread owned by this L{ThreadWorker}.
  58. @param task: the function to call on a thread.
  59. """
  60. self._hasQuit.check()
  61. self._q.put(task)
  62. def quit(self) -> None:
  63. """
  64. Reject all future work and stop the thread started by C{__init__}.
  65. """
  66. # Reject all future work. Set this _before_ enqueueing _stop, so
  67. # that no work is ever enqueued _after_ _stop.
  68. self._hasQuit.set()
  69. self._q.put(StopThread)
  70. class SimpleLock(Protocol):
  71. def acquire(self) -> bool:
  72. ...
  73. def release(self) -> None:
  74. ...
  75. @implementer(IExclusiveWorker)
  76. class LockWorker:
  77. """
  78. An L{IWorker} implemented based on a mutual-exclusion lock.
  79. """
  80. def __init__(self, lock: SimpleLock, local: threading.local):
  81. """
  82. @param lock: A mutual-exclusion lock, with C{acquire} and C{release}
  83. methods.
  84. @type lock: L{threading.Lock}
  85. @param local: Local storage.
  86. @type local: L{threading.local}
  87. """
  88. self._quit = Quit()
  89. self._lock: SimpleLock | None = lock
  90. self._local = local
  91. def do(self, work: Callable[[], None]) -> None:
  92. """
  93. Do the given work on this thread, with the mutex acquired. If this is
  94. called re-entrantly, return and wait for the outer invocation to do the
  95. work.
  96. @param work: the work to do with the lock held.
  97. """
  98. lock = self._lock
  99. local = self._local
  100. self._quit.check()
  101. working = getattr(local, "working", None)
  102. if working is None:
  103. assert lock is not None, "LockWorker used after quit()"
  104. working = local.working = []
  105. working.append(work)
  106. lock.acquire()
  107. try:
  108. while working:
  109. working.pop(0)()
  110. finally:
  111. lock.release()
  112. local.working = None
  113. else:
  114. working.append(work)
  115. def quit(self) -> None:
  116. """
  117. Quit this L{LockWorker}.
  118. """
  119. self._quit.set()
  120. self._lock = None