_pool.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # -*- test-case-name: twisted._threads.test -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Top level thread pool interface, used to implement
  6. L{twisted.python.threadpool}.
  7. """
  8. from queue import Queue
  9. from threading import Lock, Thread, local as LocalStorage
  10. from typing import Callable, Optional
  11. from typing_extensions import Protocol
  12. from twisted.python.log import err
  13. from ._ithreads import IWorker
  14. from ._team import Team
  15. from ._threadworker import LockWorker, ThreadWorker
  16. class _ThreadFactory(Protocol):
  17. def __call__(self, *, target: Callable[..., object]) -> Thread:
  18. ...
  19. def pool(
  20. currentLimit: Callable[[], int], threadFactory: _ThreadFactory = Thread
  21. ) -> Team:
  22. """
  23. Construct a L{Team} that spawns threads as a thread pool, with the given
  24. limiting function.
  25. @note: Future maintainers: while the public API for the eventual move to
  26. twisted.threads should look I{something} like this, and while this
  27. function is necessary to implement the API described by
  28. L{twisted.python.threadpool}, I am starting to think the idea of a hard
  29. upper limit on threadpool size is just bad (turning memory performance
  30. issues into correctness issues well before we run into memory
  31. pressure), and instead we should build something with reactor
  32. integration for slowly releasing idle threads when they're not needed
  33. and I{rate} limiting the creation of new threads rather than just
  34. hard-capping it.
  35. @param currentLimit: a callable that returns the current limit on the
  36. number of workers that the returned L{Team} should create; if it
  37. already has more workers than that value, no new workers will be
  38. created.
  39. @type currentLimit: 0-argument callable returning L{int}
  40. @param threadFactory: Factory that, when given a C{target} keyword argument,
  41. returns a L{threading.Thread} that will run that target.
  42. @type threadFactory: callable returning a L{threading.Thread}
  43. @return: a new L{Team}.
  44. """
  45. def startThread(target: Callable[..., object]) -> None:
  46. return threadFactory(target=target).start()
  47. def limitedWorkerCreator() -> Optional[IWorker]:
  48. stats = team.statistics()
  49. if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit():
  50. return None
  51. return ThreadWorker(startThread, Queue())
  52. team = Team(
  53. coordinator=LockWorker(Lock(), LocalStorage()),
  54. createWorker=limitedWorkerCreator,
  55. logException=err,
  56. )
  57. return team