_pool.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. # -*- test-case-name: twisted._threads.test -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Top level thread pool interface, used to implement
  6. L{twisted.python.threadpool}.
  7. """
  8. from __future__ import absolute_import, division, print_function
  9. from threading import Thread, Lock, local as LocalStorage
  10. try:
  11. from Queue import Queue
  12. except ImportError:
  13. from queue import Queue
  14. from twisted.python.log import err
  15. from ._threadworker import LockWorker
  16. from ._team import Team
  17. from ._threadworker import ThreadWorker
  18. def pool(currentLimit, threadFactory=Thread):
  19. """
  20. Construct a L{Team} that spawns threads as a thread pool, with the given
  21. limiting function.
  22. @note: Future maintainers: while the public API for the eventual move to
  23. twisted.threads should look I{something} like this, and while this
  24. function is necessary to implement the API described by
  25. L{twisted.python.threadpool}, I am starting to think the idea of a hard
  26. upper limit on threadpool size is just bad (turning memory performance
  27. issues into correctness issues well before we run into memory
  28. pressure), and instead we should build something with reactor
  29. integration for slowly releasing idle threads when they're not needed
  30. and I{rate} limiting the creation of new threads rather than just
  31. hard-capping it.
  32. @param currentLimit: a callable that returns the current limit on the
  33. number of workers that the returned L{Team} should create; if it
  34. already has more workers than that value, no new workers will be
  35. created.
  36. @type currentLimit: 0-argument callable returning L{int}
  37. @param reactor: If passed, the L{IReactorFromThreads} / L{IReactorCore} to
  38. be used to coordinate actions on the L{Team} itself. Otherwise, a
  39. L{LockWorker} will be used.
  40. @return: a new L{Team}.
  41. """
  42. def startThread(target):
  43. return threadFactory(target=target).start()
  44. def limitedWorkerCreator():
  45. stats = team.statistics()
  46. if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit():
  47. return None
  48. return ThreadWorker(startThread, Queue())
  49. team = Team(coordinator=LockWorker(Lock(), LocalStorage()),
  50. createWorker=limitedWorkerCreator,
  51. logException=err)
  52. return team