_team.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # -*- test-case-name: twisted._threads.test.test_team -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Implementation of a L{Team} of workers; a thread-pool that can allocate work to
  6. workers.
  7. """
  8. from __future__ import annotations
  9. from collections import deque
  10. from typing import Callable, Optional, Set
  11. from zope.interface import implementer
  12. from . import IWorker
  13. from ._convenience import Quit
  14. from ._ithreads import IExclusiveWorker
  15. class Statistics:
  16. """
  17. Statistics about a L{Team}'s current activity.
  18. @ivar idleWorkerCount: The number of idle workers.
  19. @type idleWorkerCount: L{int}
  20. @ivar busyWorkerCount: The number of busy workers.
  21. @type busyWorkerCount: L{int}
  22. @ivar backloggedWorkCount: The number of work items passed to L{Team.do}
  23. which have not yet been sent to a worker to be performed because not
  24. enough workers are available.
  25. @type backloggedWorkCount: L{int}
  26. """
  27. def __init__(
  28. self, idleWorkerCount: int, busyWorkerCount: int, backloggedWorkCount: int
  29. ) -> None:
  30. self.idleWorkerCount = idleWorkerCount
  31. self.busyWorkerCount = busyWorkerCount
  32. self.backloggedWorkCount = backloggedWorkCount
  33. @implementer(IWorker)
  34. class Team:
  35. """
  36. A composite L{IWorker} implementation.
  37. @ivar _quit: A L{Quit} flag indicating whether this L{Team} has been quit
  38. yet. This may be set by an arbitrary thread since L{Team.quit} may be
  39. called from anywhere.
  40. @ivar _coordinator: the L{IExclusiveWorker} coordinating access to this
  41. L{Team}'s internal resources.
  42. @ivar _createWorker: a callable that will create new workers.
  43. @ivar _logException: a 0-argument callable called in an exception context
  44. when there is an unhandled error from a task passed to L{Team.do}
  45. @ivar _idle: a L{set} of idle workers.
  46. @ivar _busyCount: the number of workers currently busy.
  47. @ivar _pending: a C{deque} of tasks - that is, 0-argument callables passed
  48. to L{Team.do} - that are outstanding.
  49. @ivar _shouldQuitCoordinator: A flag indicating that the coordinator should
  50. be quit at the next available opportunity. Unlike L{Team._quit}, this
  51. flag is only set by the coordinator.
  52. @ivar _toShrink: the number of workers to shrink this L{Team} by at the
  53. next available opportunity; set in the coordinator.
  54. """
  55. def __init__(
  56. self,
  57. coordinator: IExclusiveWorker,
  58. createWorker: Callable[[], Optional[IWorker]],
  59. logException: Callable[[], None],
  60. ):
  61. """
  62. @param coordinator: an L{IExclusiveWorker} which will coordinate access
  63. to resources on this L{Team}; that is to say, an
  64. L{IExclusiveWorker} whose C{do} method ensures that its given work
  65. will be executed in a mutually exclusive context, not in parallel
  66. with other work enqueued by C{do} (although possibly in parallel
  67. with the caller).
  68. @param createWorker: A 0-argument callable that will create an
  69. L{IWorker} to perform work.
  70. @param logException: A 0-argument callable called in an exception
  71. context when the work passed to C{do} raises an exception.
  72. """
  73. self._quit = Quit()
  74. self._coordinator = coordinator
  75. self._createWorker = createWorker
  76. self._logException = logException
  77. # Don't touch these except from the coordinator.
  78. self._idle: Set[IWorker] = set()
  79. self._busyCount = 0
  80. self._pending: "deque[Callable[..., object]]" = deque()
  81. self._shouldQuitCoordinator = False
  82. self._toShrink = 0
  83. def statistics(self) -> Statistics:
  84. """
  85. Gather information on the current status of this L{Team}.
  86. @return: a L{Statistics} describing the current state of this L{Team}.
  87. """
  88. return Statistics(len(self._idle), self._busyCount, len(self._pending))
  89. def grow(self, n: int) -> None:
  90. """
  91. Increase the the number of idle workers by C{n}.
  92. @param n: The number of new idle workers to create.
  93. @type n: L{int}
  94. """
  95. self._quit.check()
  96. @self._coordinator.do
  97. def createOneWorker() -> None:
  98. for x in range(n):
  99. worker = self._createWorker()
  100. if worker is None:
  101. return
  102. self._recycleWorker(worker)
  103. def shrink(self, n: Optional[int] = None) -> None:
  104. """
  105. Decrease the number of idle workers by C{n}.
  106. @param n: The number of idle workers to shut down, or L{None} (or
  107. unspecified) to shut down all workers.
  108. @type n: L{int} or L{None}
  109. """
  110. self._quit.check()
  111. self._coordinator.do(lambda: self._quitIdlers(n))
  112. def _quitIdlers(self, n: Optional[int] = None) -> None:
  113. """
  114. The implmentation of C{shrink}, performed by the coordinator worker.
  115. @param n: see L{Team.shrink}
  116. """
  117. if n is None:
  118. n = len(self._idle) + self._busyCount
  119. for x in range(n):
  120. if self._idle:
  121. self._idle.pop().quit()
  122. else:
  123. self._toShrink += 1
  124. if self._shouldQuitCoordinator and self._busyCount == 0:
  125. self._coordinator.quit()
  126. def do(self, task: Callable[[], object]) -> None:
  127. """
  128. Perform some work in a worker created by C{createWorker}.
  129. @param task: the callable to run
  130. """
  131. self._quit.check()
  132. self._coordinator.do(lambda: self._coordinateThisTask(task))
  133. def _coordinateThisTask(self, task: Callable[..., object]) -> None:
  134. """
  135. Select a worker to dispatch to, either an idle one or a new one, and
  136. perform it.
  137. This method should run on the coordinator worker.
  138. @param task: the task to dispatch
  139. @type task: 0-argument callable
  140. """
  141. worker = self._idle.pop() if self._idle else self._createWorker()
  142. if worker is None:
  143. # The createWorker method may return None if we're out of resources
  144. # to create workers.
  145. self._pending.append(task)
  146. return
  147. not_none_worker = worker
  148. self._busyCount += 1
  149. @worker.do
  150. def doWork() -> None:
  151. try:
  152. task()
  153. except BaseException:
  154. self._logException()
  155. @self._coordinator.do
  156. def idleAndPending() -> None:
  157. self._busyCount -= 1
  158. self._recycleWorker(not_none_worker)
  159. def _recycleWorker(self, worker: IWorker) -> None:
  160. """
  161. Called only from coordinator.
  162. Recycle the given worker into the idle pool.
  163. @param worker: a worker created by C{createWorker} and now idle.
  164. @type worker: L{IWorker}
  165. """
  166. self._idle.add(worker)
  167. if self._pending:
  168. # Re-try the first enqueued thing.
  169. # (Explicitly do _not_ honor _quit.)
  170. self._coordinateThisTask(self._pending.popleft())
  171. elif self._shouldQuitCoordinator:
  172. self._quitIdlers()
  173. elif self._toShrink > 0:
  174. self._toShrink -= 1
  175. self._idle.remove(worker)
  176. worker.quit()
  177. def quit(self) -> None:
  178. """
  179. Stop doing work and shut down all idle workers.
  180. """
  181. self._quit.set()
  182. # In case all the workers are idle when we do this.
  183. @self._coordinator.do
  184. def startFinishing() -> None:
  185. self._shouldQuitCoordinator = True
  186. self._quitIdlers()