_team.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # -*- test-case-name: twisted._threads.test.test_team -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Implementation of a L{Team} of workers; a thread-pool that can allocate work to
  6. workers.
  7. """
  8. from __future__ import absolute_import, division, print_function
  9. from collections import deque
  10. from zope.interface import implementer
  11. from . import IWorker
  12. from ._convenience import Quit
  13. class Statistics(object):
  14. """
  15. Statistics about a L{Team}'s current activity.
  16. @ivar idleWorkerCount: The number of idle workers.
  17. @type idleWorkerCount: L{int}
  18. @ivar busyWorkerCount: The number of busy workers.
  19. @type busyWorkerCount: L{int}
  20. @ivar backloggedWorkCount: The number of work items passed to L{Team.do}
  21. which have not yet been sent to a worker to be performed because not
  22. enough workers are available.
  23. @type backloggedWorkCount: L{int}
  24. """
  25. def __init__(self, idleWorkerCount, busyWorkerCount,
  26. backloggedWorkCount):
  27. self.idleWorkerCount = idleWorkerCount
  28. self.busyWorkerCount = busyWorkerCount
  29. self.backloggedWorkCount = backloggedWorkCount
  30. @implementer(IWorker)
  31. class Team(object):
  32. """
  33. A composite L{IWorker} implementation.
  34. @ivar _quit: A L{Quit} flag indicating whether this L{Team} has been quit
  35. yet. This may be set by an arbitrary thread since L{Team.quit} may be
  36. called from anywhere.
  37. @ivar _coordinator: the L{IExclusiveWorker} coordinating access to this
  38. L{Team}'s internal resources.
  39. @ivar _createWorker: a callable that will create new workers.
  40. @ivar _logException: a 0-argument callable called in an exception context
  41. when there is an unhandled error from a task passed to L{Team.do}
  42. @ivar _idle: a L{set} of idle workers.
  43. @ivar _busyCount: the number of workers currently busy.
  44. @ivar _pending: a C{deque} of tasks - that is, 0-argument callables passed
  45. to L{Team.do} - that are outstanding.
  46. @ivar _shouldQuitCoordinator: A flag indicating that the coordinator should
  47. be quit at the next available opportunity. Unlike L{Team._quit}, this
  48. flag is only set by the coordinator.
  49. @ivar _toShrink: the number of workers to shrink this L{Team} by at the
  50. next available opportunity; set in the coordinator.
  51. """
  52. def __init__(self, coordinator, createWorker, logException):
  53. """
  54. @param coordinator: an L{IExclusiveWorker} which will coordinate access
  55. to resources on this L{Team}; that is to say, an
  56. L{IExclusiveWorker} whose C{do} method ensures that its given work
  57. will be executed in a mutually exclusive context, not in parallel
  58. with other work enqueued by C{do} (although possibly in parallel
  59. with the caller).
  60. @param createWorker: A 0-argument callable that will create an
  61. L{IWorker} to perform work.
  62. @param logException: A 0-argument callable called in an exception
  63. context when the work passed to C{do} raises an exception.
  64. """
  65. self._quit = Quit()
  66. self._coordinator = coordinator
  67. self._createWorker = createWorker
  68. self._logException = logException
  69. # Don't touch these except from the coordinator.
  70. self._idle = set()
  71. self._busyCount = 0
  72. self._pending = deque()
  73. self._shouldQuitCoordinator = False
  74. self._toShrink = 0
  75. def statistics(self):
  76. """
  77. Gather information on the current status of this L{Team}.
  78. @return: a L{Statistics} describing the current state of this L{Team}.
  79. """
  80. return Statistics(len(self._idle), self._busyCount, len(self._pending))
  81. def grow(self, n):
  82. """
  83. Increase the the number of idle workers by C{n}.
  84. @param n: The number of new idle workers to create.
  85. @type n: L{int}
  86. """
  87. self._quit.check()
  88. @self._coordinator.do
  89. def createOneWorker():
  90. for x in range(n):
  91. worker = self._createWorker()
  92. if worker is None:
  93. return
  94. self._recycleWorker(worker)
  95. def shrink(self, n=None):
  96. """
  97. Decrease the number of idle workers by C{n}.
  98. @param n: The number of idle workers to shut down, or L{None} (or
  99. unspecified) to shut down all workers.
  100. @type n: L{int} or L{None}
  101. """
  102. self._quit.check()
  103. self._coordinator.do(lambda: self._quitIdlers(n))
  104. def _quitIdlers(self, n=None):
  105. """
  106. The implmentation of C{shrink}, performed by the coordinator worker.
  107. @param n: see L{Team.shrink}
  108. """
  109. if n is None:
  110. n = len(self._idle) + self._busyCount
  111. for x in range(n):
  112. if self._idle:
  113. self._idle.pop().quit()
  114. else:
  115. self._toShrink += 1
  116. if self._shouldQuitCoordinator and self._busyCount == 0:
  117. self._coordinator.quit()
  118. def do(self, task):
  119. """
  120. Perform some work in a worker created by C{createWorker}.
  121. @param task: the callable to run
  122. """
  123. self._quit.check()
  124. self._coordinator.do(lambda: self._coordinateThisTask(task))
  125. def _coordinateThisTask(self, task):
  126. """
  127. Select a worker to dispatch to, either an idle one or a new one, and
  128. perform it.
  129. This method should run on the coordinator worker.
  130. @param task: the task to dispatch
  131. @type task: 0-argument callable
  132. """
  133. worker = (self._idle.pop() if self._idle
  134. else self._createWorker())
  135. if worker is None:
  136. # The createWorker method may return None if we're out of resources
  137. # to create workers.
  138. self._pending.append(task)
  139. return
  140. self._busyCount += 1
  141. @worker.do
  142. def doWork():
  143. try:
  144. task()
  145. except:
  146. self._logException()
  147. @self._coordinator.do
  148. def idleAndPending():
  149. self._busyCount -= 1
  150. self._recycleWorker(worker)
  151. def _recycleWorker(self, worker):
  152. """
  153. Called only from coordinator.
  154. Recycle the given worker into the idle pool.
  155. @param worker: a worker created by C{createWorker} and now idle.
  156. @type worker: L{IWorker}
  157. """
  158. self._idle.add(worker)
  159. if self._pending:
  160. # Re-try the first enqueued thing.
  161. # (Explicitly do _not_ honor _quit.)
  162. self._coordinateThisTask(self._pending.popleft())
  163. elif self._shouldQuitCoordinator:
  164. self._quitIdlers()
  165. elif self._toShrink > 0:
  166. self._toShrink -= 1
  167. self._idle.remove(worker)
  168. worker.quit()
  169. def quit(self):
  170. """
  171. Stop doing work and shut down all idle workers.
  172. """
  173. self._quit.set()
  174. # In case all the workers are idle when we do this.
  175. @self._coordinator.do
  176. def startFinishing():
  177. self._shouldQuitCoordinator = True
  178. self._quitIdlers()