123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- # -*- test-case-name: twisted._threads.test -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Top level thread pool interface, used to implement
- L{twisted.python.threadpool}.
- """
- from __future__ import absolute_import, division, print_function
- from threading import Thread, Lock, local as LocalStorage
- try:
- from Queue import Queue
- except ImportError:
- from queue import Queue
- from twisted.python.log import err
- from ._threadworker import LockWorker
- from ._team import Team
- from ._threadworker import ThreadWorker
- def pool(currentLimit, threadFactory=Thread):
- """
- Construct a L{Team} that spawns threads as a thread pool, with the given
- limiting function.
- @note: Future maintainers: while the public API for the eventual move to
- twisted.threads should look I{something} like this, and while this
- function is necessary to implement the API described by
- L{twisted.python.threadpool}, I am starting to think the idea of a hard
- upper limit on threadpool size is just bad (turning memory performance
- issues into correctness issues well before we run into memory
- pressure), and instead we should build something with reactor
- integration for slowly releasing idle threads when they're not needed
- and I{rate} limiting the creation of new threads rather than just
- hard-capping it.
- @param currentLimit: a callable that returns the current limit on the
- number of workers that the returned L{Team} should create; if it
- already has more workers than that value, no new workers will be
- created.
- @type currentLimit: 0-argument callable returning L{int}
- @param reactor: If passed, the L{IReactorFromThreads} / L{IReactorCore} to
- be used to coordinate actions on the L{Team} itself. Otherwise, a
- L{LockWorker} will be used.
- @return: a new L{Team}.
- """
- def startThread(target):
- return threadFactory(target=target).start()
- def limitedWorkerCreator():
- stats = team.statistics()
- if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit():
- return None
- return ThreadWorker(startThread, Queue())
- team = Team(coordinator=LockWorker(Lock(), LocalStorage()),
- createWorker=limitedWorkerCreator,
- logException=err)
- return team
|