12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706 |
- """Thread module emulating a subset of Java's threading model."""
- import os as _os
- import sys as _sys
- import _thread
- import functools
- from time import monotonic as _time
- from _weakrefset import WeakSet
- from itertools import count as _count
- try:
- from _collections import deque as _deque
- except ImportError:
- from collections import deque as _deque
- # Note regarding PEP 8 compliant names
- # This threading model was originally inspired by Java, and inherited
- # the convention of camelCase function and method names from that
- # language. Those original names are not in any imminent danger of
- # being deprecated (even for Py3k),so this module provides them as an
- # alias for the PEP 8 compliant names
- # Note that using the new PEP 8 compliant names facilitates substitution
- # with the multiprocessing module, which doesn't provide the old
- # Java inspired names.
- __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
- 'enumerate', 'main_thread', 'TIMEOUT_MAX',
- 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
- 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
- 'setprofile', 'settrace', 'local', 'stack_size',
- 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile',
- 'setprofile_all_threads','settrace_all_threads']
- # Rename some stuff so "from threading import *" is safe
- _start_new_thread = _thread.start_new_thread
- _daemon_threads_allowed = _thread.daemon_threads_allowed
- _allocate_lock = _thread.allocate_lock
- _set_sentinel = _thread._set_sentinel
- get_ident = _thread.get_ident
- try:
- _is_main_interpreter = _thread._is_main_interpreter
- except AttributeError:
- # See https://github.com/python/cpython/issues/112826.
- # We can pretend a subinterpreter is the main interpreter for the
- # sake of _shutdown(), since that only means we do not wait for the
- # subinterpreter's threads to finish. Instead, they will be stopped
- # later by the mechanism we use for daemon threads. The likelihood
- # of this case is small because rarely will the _thread module be
- # replaced by a module without _is_main_interpreter().
- # Furthermore, this is all irrelevant in applications
- # that do not use subinterpreters.
- def _is_main_interpreter():
- return True
- try:
- get_native_id = _thread.get_native_id
- _HAVE_THREAD_NATIVE_ID = True
- __all__.append('get_native_id')
- except AttributeError:
- _HAVE_THREAD_NATIVE_ID = False
- ThreadError = _thread.error
- try:
- _CRLock = _thread.RLock
- except AttributeError:
- _CRLock = None
- TIMEOUT_MAX = _thread.TIMEOUT_MAX
- del _thread
- # Support for profile and trace hooks
- _profile_hook = None
- _trace_hook = None
- def setprofile(func):
- """Set a profile function for all threads started from the threading module.
- The func will be passed to sys.setprofile() for each thread, before its
- run() method is called.
- """
- global _profile_hook
- _profile_hook = func
- def setprofile_all_threads(func):
- """Set a profile function for all threads started from the threading module
- and all Python threads that are currently executing.
- The func will be passed to sys.setprofile() for each thread, before its
- run() method is called.
- """
- setprofile(func)
- _sys._setprofileallthreads(func)
- def getprofile():
- """Get the profiler function as set by threading.setprofile()."""
- return _profile_hook
- def settrace(func):
- """Set a trace function for all threads started from the threading module.
- The func will be passed to sys.settrace() for each thread, before its run()
- method is called.
- """
- global _trace_hook
- _trace_hook = func
- def settrace_all_threads(func):
- """Set a trace function for all threads started from the threading module
- and all Python threads that are currently executing.
- The func will be passed to sys.settrace() for each thread, before its run()
- method is called.
- """
- settrace(func)
- _sys._settraceallthreads(func)
- def gettrace():
- """Get the trace function as set by threading.settrace()."""
- return _trace_hook
- # Synchronization classes
- Lock = _allocate_lock
- def RLock(*args, **kwargs):
- """Factory function that returns a new reentrant lock.
- A reentrant lock must be released by the thread that acquired it. Once a
- thread has acquired a reentrant lock, the same thread may acquire it again
- without blocking; the thread must release it once for each time it has
- acquired it.
- """
- if _CRLock is None:
- return _PyRLock(*args, **kwargs)
- return _CRLock(*args, **kwargs)
- class _RLock:
- """This class implements reentrant lock objects.
- A reentrant lock must be released by the thread that acquired it. Once a
- thread has acquired a reentrant lock, the same thread may acquire it
- again without blocking; the thread must release it once for each time it
- has acquired it.
- """
- def __init__(self):
- self._block = _allocate_lock()
- self._owner = None
- self._count = 0
- def __repr__(self):
- owner = self._owner
- try:
- owner = _active[owner].name
- except KeyError:
- pass
- return "<%s %s.%s object owner=%r count=%d at %s>" % (
- "locked" if self._block.locked() else "unlocked",
- self.__class__.__module__,
- self.__class__.__qualname__,
- owner,
- self._count,
- hex(id(self))
- )
- def _at_fork_reinit(self):
- self._block._at_fork_reinit()
- self._owner = None
- self._count = 0
- def acquire(self, blocking=True, timeout=-1):
- """Acquire a lock, blocking or non-blocking.
- When invoked without arguments: if this thread already owns the lock,
- increment the recursion level by one, and return immediately. Otherwise,
- if another thread owns the lock, block until the lock is unlocked. Once
- the lock is unlocked (not owned by any thread), then grab ownership, set
- the recursion level to one, and return. If more than one thread is
- blocked waiting until the lock is unlocked, only one at a time will be
- able to grab ownership of the lock. There is no return value in this
- case.
- When invoked with the blocking argument set to true, do the same thing
- as when called without arguments, and return true.
- When invoked with the blocking argument set to false, do not block. If a
- call without an argument would block, return false immediately;
- otherwise, do the same thing as when called without arguments, and
- return true.
- When invoked with the floating-point timeout argument set to a positive
- value, block for at most the number of seconds specified by timeout
- and as long as the lock cannot be acquired. Return true if the lock has
- been acquired, false if the timeout has elapsed.
- """
- me = get_ident()
- if self._owner == me:
- self._count += 1
- return 1
- rc = self._block.acquire(blocking, timeout)
- if rc:
- self._owner = me
- self._count = 1
- return rc
- __enter__ = acquire
- def release(self):
- """Release a lock, decrementing the recursion level.
- If after the decrement it is zero, reset the lock to unlocked (not owned
- by any thread), and if any other threads are blocked waiting for the
- lock to become unlocked, allow exactly one of them to proceed. If after
- the decrement the recursion level is still nonzero, the lock remains
- locked and owned by the calling thread.
- Only call this method when the calling thread owns the lock. A
- RuntimeError is raised if this method is called when the lock is
- unlocked.
- There is no return value.
- """
- if self._owner != get_ident():
- raise RuntimeError("cannot release un-acquired lock")
- self._count = count = self._count - 1
- if not count:
- self._owner = None
- self._block.release()
- def __exit__(self, t, v, tb):
- self.release()
- # Internal methods used by condition variables
- def _acquire_restore(self, state):
- self._block.acquire()
- self._count, self._owner = state
- def _release_save(self):
- if self._count == 0:
- raise RuntimeError("cannot release un-acquired lock")
- count = self._count
- self._count = 0
- owner = self._owner
- self._owner = None
- self._block.release()
- return (count, owner)
- def _is_owned(self):
- return self._owner == get_ident()
- # Internal method used for reentrancy checks
- def _recursion_count(self):
- if self._owner != get_ident():
- return 0
- return self._count
- _PyRLock = _RLock
- class Condition:
- """Class that implements a condition variable.
- A condition variable allows one or more threads to wait until they are
- notified by another thread.
- If the lock argument is given and not None, it must be a Lock or RLock
- object, and it is used as the underlying lock. Otherwise, a new RLock object
- is created and used as the underlying lock.
- """
- def __init__(self, lock=None):
- if lock is None:
- lock = RLock()
- self._lock = lock
- # Export the lock's acquire() and release() methods
- self.acquire = lock.acquire
- self.release = lock.release
- # If the lock defines _release_save() and/or _acquire_restore(),
- # these override the default implementations (which just call
- # release() and acquire() on the lock). Ditto for _is_owned().
- if hasattr(lock, '_release_save'):
- self._release_save = lock._release_save
- if hasattr(lock, '_acquire_restore'):
- self._acquire_restore = lock._acquire_restore
- if hasattr(lock, '_is_owned'):
- self._is_owned = lock._is_owned
- self._waiters = _deque()
- def _at_fork_reinit(self):
- self._lock._at_fork_reinit()
- self._waiters.clear()
- def __enter__(self):
- return self._lock.__enter__()
- def __exit__(self, *args):
- return self._lock.__exit__(*args)
- def __repr__(self):
- return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
- def _release_save(self):
- self._lock.release() # No state to save
- def _acquire_restore(self, x):
- self._lock.acquire() # Ignore saved state
- def _is_owned(self):
- # Return True if lock is owned by current_thread.
- # This method is called only if _lock doesn't have _is_owned().
- if self._lock.acquire(False):
- self._lock.release()
- return False
- else:
- return True
- def wait(self, timeout=None):
- """Wait until notified or until a timeout occurs.
- If the calling thread has not acquired the lock when this method is
- called, a RuntimeError is raised.
- This method releases the underlying lock, and then blocks until it is
- awakened by a notify() or notify_all() call for the same condition
- variable in another thread, or until the optional timeout occurs. Once
- awakened or timed out, it re-acquires the lock and returns.
- When the timeout argument is present and not None, it should be a
- floating point number specifying a timeout for the operation in seconds
- (or fractions thereof).
- When the underlying lock is an RLock, it is not released using its
- release() method, since this may not actually unlock the lock when it
- was acquired multiple times recursively. Instead, an internal interface
- of the RLock class is used, which really unlocks it even when it has
- been recursively acquired several times. Another internal interface is
- then used to restore the recursion level when the lock is reacquired.
- """
- if not self._is_owned():
- raise RuntimeError("cannot wait on un-acquired lock")
- waiter = _allocate_lock()
- waiter.acquire()
- self._waiters.append(waiter)
- saved_state = self._release_save()
- gotit = False
- try: # restore state no matter what (e.g., KeyboardInterrupt)
- if timeout is None:
- waiter.acquire()
- gotit = True
- else:
- if timeout > 0:
- gotit = waiter.acquire(True, timeout)
- else:
- gotit = waiter.acquire(False)
- return gotit
- finally:
- self._acquire_restore(saved_state)
- if not gotit:
- try:
- self._waiters.remove(waiter)
- except ValueError:
- pass
- def wait_for(self, predicate, timeout=None):
- """Wait until a condition evaluates to True.
- predicate should be a callable which result will be interpreted as a
- boolean value. A timeout may be provided giving the maximum time to
- wait.
- """
- endtime = None
- waittime = timeout
- result = predicate()
- while not result:
- if waittime is not None:
- if endtime is None:
- endtime = _time() + waittime
- else:
- waittime = endtime - _time()
- if waittime <= 0:
- break
- self.wait(waittime)
- result = predicate()
- return result
- def notify(self, n=1):
- """Wake up one or more threads waiting on this condition, if any.
- If the calling thread has not acquired the lock when this method is
- called, a RuntimeError is raised.
- This method wakes up at most n of the threads waiting for the condition
- variable; it is a no-op if no threads are waiting.
- """
- if not self._is_owned():
- raise RuntimeError("cannot notify on un-acquired lock")
- waiters = self._waiters
- while waiters and n > 0:
- waiter = waiters[0]
- try:
- waiter.release()
- except RuntimeError:
- # gh-92530: The previous call of notify() released the lock,
- # but was interrupted before removing it from the queue.
- # It can happen if a signal handler raises an exception,
- # like CTRL+C which raises KeyboardInterrupt.
- pass
- else:
- n -= 1
- try:
- waiters.remove(waiter)
- except ValueError:
- pass
- def notify_all(self):
- """Wake up all threads waiting on this condition.
- If the calling thread has not acquired the lock when this method
- is called, a RuntimeError is raised.
- """
- self.notify(len(self._waiters))
- def notifyAll(self):
- """Wake up all threads waiting on this condition.
- This method is deprecated, use notify_all() instead.
- """
- import warnings
- warnings.warn('notifyAll() is deprecated, use notify_all() instead',
- DeprecationWarning, stacklevel=2)
- self.notify_all()
- class Semaphore:
- """This class implements semaphore objects.
- Semaphores manage a counter representing the number of release() calls minus
- the number of acquire() calls, plus an initial value. The acquire() method
- blocks if necessary until it can return without making the counter
- negative. If not given, value defaults to 1.
- """
- # After Tim Peters' semaphore class, but not quite the same (no maximum)
- def __init__(self, value=1):
- if value < 0:
- raise ValueError("semaphore initial value must be >= 0")
- self._cond = Condition(Lock())
- self._value = value
- def __repr__(self):
- cls = self.__class__
- return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
- f" value={self._value}>")
- def acquire(self, blocking=True, timeout=None):
- """Acquire a semaphore, decrementing the internal counter by one.
- When invoked without arguments: if the internal counter is larger than
- zero on entry, decrement it by one and return immediately. If it is zero
- on entry, block, waiting until some other thread has called release() to
- make it larger than zero. This is done with proper interlocking so that
- if multiple acquire() calls are blocked, release() will wake exactly one
- of them up. The implementation may pick one at random, so the order in
- which blocked threads are awakened should not be relied on. There is no
- return value in this case.
- When invoked with blocking set to true, do the same thing as when called
- without arguments, and return true.
- When invoked with blocking set to false, do not block. If a call without
- an argument would block, return false immediately; otherwise, do the
- same thing as when called without arguments, and return true.
- When invoked with a timeout other than None, it will block for at
- most timeout seconds. If acquire does not complete successfully in
- that interval, return false. Return true otherwise.
- """
- if not blocking and timeout is not None:
- raise ValueError("can't specify timeout for non-blocking acquire")
- rc = False
- endtime = None
- with self._cond:
- while self._value == 0:
- if not blocking:
- break
- if timeout is not None:
- if endtime is None:
- endtime = _time() + timeout
- else:
- timeout = endtime - _time()
- if timeout <= 0:
- break
- self._cond.wait(timeout)
- else:
- self._value -= 1
- rc = True
- return rc
- __enter__ = acquire
- def release(self, n=1):
- """Release a semaphore, incrementing the internal counter by one or more.
- When the counter is zero on entry and another thread is waiting for it
- to become larger than zero again, wake up that thread.
- """
- if n < 1:
- raise ValueError('n must be one or more')
- with self._cond:
- self._value += n
- self._cond.notify(n)
- def __exit__(self, t, v, tb):
- self.release()
- class BoundedSemaphore(Semaphore):
- """Implements a bounded semaphore.
- A bounded semaphore checks to make sure its current value doesn't exceed its
- initial value. If it does, ValueError is raised. In most situations
- semaphores are used to guard resources with limited capacity.
- If the semaphore is released too many times it's a sign of a bug. If not
- given, value defaults to 1.
- Like regular semaphores, bounded semaphores manage a counter representing
- the number of release() calls minus the number of acquire() calls, plus an
- initial value. The acquire() method blocks if necessary until it can return
- without making the counter negative. If not given, value defaults to 1.
- """
- def __init__(self, value=1):
- super().__init__(value)
- self._initial_value = value
- def __repr__(self):
- cls = self.__class__
- return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
- f" value={self._value}/{self._initial_value}>")
- def release(self, n=1):
- """Release a semaphore, incrementing the internal counter by one or more.
- When the counter is zero on entry and another thread is waiting for it
- to become larger than zero again, wake up that thread.
- If the number of releases exceeds the number of acquires,
- raise a ValueError.
- """
- if n < 1:
- raise ValueError('n must be one or more')
- with self._cond:
- if self._value + n > self._initial_value:
- raise ValueError("Semaphore released too many times")
- self._value += n
- self._cond.notify(n)
- class Event:
- """Class implementing event objects.
- Events manage a flag that can be set to true with the set() method and reset
- to false with the clear() method. The wait() method blocks until the flag is
- true. The flag is initially false.
- """
- # After Tim Peters' event class (without is_posted())
- def __init__(self):
- self._cond = Condition(Lock())
- self._flag = False
- def __repr__(self):
- cls = self.__class__
- status = 'set' if self._flag else 'unset'
- return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>"
- def _at_fork_reinit(self):
- # Private method called by Thread._reset_internal_locks()
- self._cond._at_fork_reinit()
- def is_set(self):
- """Return true if and only if the internal flag is true."""
- return self._flag
- def isSet(self):
- """Return true if and only if the internal flag is true.
- This method is deprecated, use is_set() instead.
- """
- import warnings
- warnings.warn('isSet() is deprecated, use is_set() instead',
- DeprecationWarning, stacklevel=2)
- return self.is_set()
- def set(self):
- """Set the internal flag to true.
- All threads waiting for it to become true are awakened. Threads
- that call wait() once the flag is true will not block at all.
- """
- with self._cond:
- self._flag = True
- self._cond.notify_all()
- def clear(self):
- """Reset the internal flag to false.
- Subsequently, threads calling wait() will block until set() is called to
- set the internal flag to true again.
- """
- with self._cond:
- self._flag = False
- def wait(self, timeout=None):
- """Block until the internal flag is true.
- If the internal flag is true on entry, return immediately. Otherwise,
- block until another thread calls set() to set the flag to true, or until
- the optional timeout occurs.
- When the timeout argument is present and not None, it should be a
- floating point number specifying a timeout for the operation in seconds
- (or fractions thereof).
- This method returns the internal flag on exit, so it will always return
- True except if a timeout is given and the operation times out.
- """
- with self._cond:
- signaled = self._flag
- if not signaled:
- signaled = self._cond.wait(timeout)
- return signaled
- # A barrier class. Inspired in part by the pthread_barrier_* api and
- # the CyclicBarrier class from Java. See
- # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
- # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
- # CyclicBarrier.html
- # for information.
- # We maintain two main states, 'filling' and 'draining' enabling the barrier
- # to be cyclic. Threads are not allowed into it until it has fully drained
- # since the previous cycle. In addition, a 'resetting' state exists which is
- # similar to 'draining' except that threads leave with a BrokenBarrierError,
- # and a 'broken' state in which all threads get the exception.
- class Barrier:
- """Implements a Barrier.
- Useful for synchronizing a fixed number of threads at known synchronization
- points. Threads block on 'wait()' and are simultaneously awoken once they
- have all made that call.
- """
- def __init__(self, parties, action=None, timeout=None):
- """Create a barrier, initialised to 'parties' threads.
- 'action' is a callable which, when supplied, will be called by one of
- the threads after they have all entered the barrier and just prior to
- releasing them all. If a 'timeout' is provided, it is used as the
- default for all subsequent 'wait()' calls.
- """
- self._cond = Condition(Lock())
- self._action = action
- self._timeout = timeout
- self._parties = parties
- self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken
- self._count = 0
- def __repr__(self):
- cls = self.__class__
- if self.broken:
- return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: broken>"
- return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:"
- f" waiters={self.n_waiting}/{self.parties}>")
- def wait(self, timeout=None):
- """Wait for the barrier.
- When the specified number of threads have started waiting, they are all
- simultaneously awoken. If an 'action' was provided for the barrier, one
- of the threads will have executed that callback prior to returning.
- Returns an individual index number from 0 to 'parties-1'.
- """
- if timeout is None:
- timeout = self._timeout
- with self._cond:
- self._enter() # Block while the barrier drains.
- index = self._count
- self._count += 1
- try:
- if index + 1 == self._parties:
- # We release the barrier
- self._release()
- else:
- # We wait until someone releases us
- self._wait(timeout)
- return index
- finally:
- self._count -= 1
- # Wake up any threads waiting for barrier to drain.
- self._exit()
- # Block until the barrier is ready for us, or raise an exception
- # if it is broken.
- def _enter(self):
- while self._state in (-1, 1):
- # It is draining or resetting, wait until done
- self._cond.wait()
- #see if the barrier is in a broken state
- if self._state < 0:
- raise BrokenBarrierError
- assert self._state == 0
- # Optionally run the 'action' and release the threads waiting
- # in the barrier.
- def _release(self):
- try:
- if self._action:
- self._action()
- # enter draining state
- self._state = 1
- self._cond.notify_all()
- except:
- #an exception during the _action handler. Break and reraise
- self._break()
- raise
- # Wait in the barrier until we are released. Raise an exception
- # if the barrier is reset or broken.
- def _wait(self, timeout):
- if not self._cond.wait_for(lambda : self._state != 0, timeout):
- #timed out. Break the barrier
- self._break()
- raise BrokenBarrierError
- if self._state < 0:
- raise BrokenBarrierError
- assert self._state == 1
- # If we are the last thread to exit the barrier, signal any threads
- # waiting for the barrier to drain.
- def _exit(self):
- if self._count == 0:
- if self._state in (-1, 1):
- #resetting or draining
- self._state = 0
- self._cond.notify_all()
- def reset(self):
- """Reset the barrier to the initial state.
- Any threads currently waiting will get the BrokenBarrier exception
- raised.
- """
- with self._cond:
- if self._count > 0:
- if self._state == 0:
- #reset the barrier, waking up threads
- self._state = -1
- elif self._state == -2:
- #was broken, set it to reset state
- #which clears when the last thread exits
- self._state = -1
- else:
- self._state = 0
- self._cond.notify_all()
- def abort(self):
- """Place the barrier into a 'broken' state.
- Useful in case of error. Any currently waiting threads and threads
- attempting to 'wait()' will have BrokenBarrierError raised.
- """
- with self._cond:
- self._break()
- def _break(self):
- # An internal error was detected. The barrier is set to
- # a broken state all parties awakened.
- self._state = -2
- self._cond.notify_all()
- @property
- def parties(self):
- """Return the number of threads required to trip the barrier."""
- return self._parties
- @property
- def n_waiting(self):
- """Return the number of threads currently waiting at the barrier."""
- # We don't need synchronization here since this is an ephemeral result
- # anyway. It returns the correct value in the steady state.
- if self._state == 0:
- return self._count
- return 0
- @property
- def broken(self):
- """Return True if the barrier is in a broken state."""
- return self._state == -2
- # exception raised by the Barrier class
- class BrokenBarrierError(RuntimeError):
- pass
- # Helper to generate new thread names
- _counter = _count(1).__next__
- def _newname(name_template):
- return name_template % _counter()
- # Active thread administration.
- #
- # bpo-44422: Use a reentrant lock to allow reentrant calls to functions like
- # threading.enumerate().
- _active_limbo_lock = RLock()
- _active = {} # maps thread id to Thread object
- _limbo = {}
- _dangling = WeakSet()
- # Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
- # to wait until all Python thread states get deleted:
- # see Thread._set_tstate_lock().
- _shutdown_locks_lock = _allocate_lock()
- _shutdown_locks = set()
- def _maintain_shutdown_locks():
- """
- Drop any shutdown locks that don't correspond to running threads anymore.
- Calling this from time to time avoids an ever-growing _shutdown_locks
- set when Thread objects are not joined explicitly. See bpo-37788.
- This must be called with _shutdown_locks_lock acquired.
- """
- # If a lock was released, the corresponding thread has exited
- to_remove = [lock for lock in _shutdown_locks if not lock.locked()]
- _shutdown_locks.difference_update(to_remove)
- # Main class for threads
- class Thread:
- """A class that represents a thread of control.
- This class can be safely subclassed in a limited fashion. There are two ways
- to specify the activity: by passing a callable object to the constructor, or
- by overriding the run() method in a subclass.
- """
- _initialized = False
- def __init__(self, group=None, target=None, name=None,
- args=(), kwargs=None, *, daemon=None):
- """This constructor should always be called with keyword arguments. Arguments are:
- *group* should be None; reserved for future extension when a ThreadGroup
- class is implemented.
- *target* is the callable object to be invoked by the run()
- method. Defaults to None, meaning nothing is called.
- *name* is the thread name. By default, a unique name is constructed of
- the form "Thread-N" where N is a small decimal number.
- *args* is a list or tuple of arguments for the target invocation. Defaults to ().
- *kwargs* is a dictionary of keyword arguments for the target
- invocation. Defaults to {}.
- If a subclass overrides the constructor, it must make sure to invoke
- the base class constructor (Thread.__init__()) before doing anything
- else to the thread.
- """
- assert group is None, "group argument must be None for now"
- if kwargs is None:
- kwargs = {}
- if name:
- name = str(name)
- else:
- name = _newname("Thread-%d")
- if target is not None:
- try:
- target_name = target.__name__
- name += f" ({target_name})"
- except AttributeError:
- pass
- self._target = target
- self._name = name
- self._args = args
- self._kwargs = kwargs
- if daemon is not None:
- if daemon and not _daemon_threads_allowed():
- raise RuntimeError('daemon threads are disabled in this (sub)interpreter')
- self._daemonic = daemon
- else:
- self._daemonic = current_thread().daemon
- self._ident = None
- if _HAVE_THREAD_NATIVE_ID:
- self._native_id = None
- self._tstate_lock = None
- self._started = Event()
- self._is_stopped = False
- self._initialized = True
- # Copy of sys.stderr used by self._invoke_excepthook()
- self._stderr = _sys.stderr
- self._invoke_excepthook = _make_invoke_excepthook()
- # For debugging and _after_fork()
- _dangling.add(self)
- def _reset_internal_locks(self, is_alive):
- # private! Called by _after_fork() to reset our internal locks as
- # they may be in an invalid state leading to a deadlock or crash.
- self._started._at_fork_reinit()
- if is_alive:
- # bpo-42350: If the fork happens when the thread is already stopped
- # (ex: after threading._shutdown() has been called), _tstate_lock
- # is None. Do nothing in this case.
- if self._tstate_lock is not None:
- self._tstate_lock._at_fork_reinit()
- self._tstate_lock.acquire()
- else:
- # The thread isn't alive after fork: it doesn't have a tstate
- # anymore.
- self._is_stopped = True
- self._tstate_lock = None
- def __repr__(self):
- assert self._initialized, "Thread.__init__() was not called"
- status = "initial"
- if self._started.is_set():
- status = "started"
- self.is_alive() # easy way to get ._is_stopped set when appropriate
- if self._is_stopped:
- status = "stopped"
- if self._daemonic:
- status += " daemon"
- if self._ident is not None:
- status += " %s" % self._ident
- return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
- def start(self):
- """Start the thread's activity.
- It must be called at most once per thread object. It arranges for the
- object's run() method to be invoked in a separate thread of control.
- This method will raise a RuntimeError if called more than once on the
- same thread object.
- """
- if not self._initialized:
- raise RuntimeError("thread.__init__() not called")
- if self._started.is_set():
- raise RuntimeError("threads can only be started once")
- with _active_limbo_lock:
- _limbo[self] = self
- try:
- _start_new_thread(self._bootstrap, ())
- except Exception:
- with _active_limbo_lock:
- del _limbo[self]
- raise
- self._started.wait()
- def run(self):
- """Method representing the thread's activity.
- You may override this method in a subclass. The standard run() method
- invokes the callable object passed to the object's constructor as the
- target argument, if any, with sequential and keyword arguments taken
- from the args and kwargs arguments, respectively.
- """
- try:
- if self._target is not None:
- self._target(*self._args, **self._kwargs)
- finally:
- # Avoid a refcycle if the thread is running a function with
- # an argument that has a member that points to the thread.
- del self._target, self._args, self._kwargs
- def _bootstrap(self):
- # Wrapper around the real bootstrap code that ignores
- # exceptions during interpreter cleanup. Those typically
- # happen when a daemon thread wakes up at an unfortunate
- # moment, finds the world around it destroyed, and raises some
- # random exception *** while trying to report the exception in
- # _bootstrap_inner() below ***. Those random exceptions
- # don't help anybody, and they confuse users, so we suppress
- # them. We suppress them only when it appears that the world
- # indeed has already been destroyed, so that exceptions in
- # _bootstrap_inner() during normal business hours are properly
- # reported. Also, we only suppress them for daemonic threads;
- # if a non-daemonic encounters this, something else is wrong.
- try:
- self._bootstrap_inner()
- except:
- if self._daemonic and _sys is None:
- return
- raise
- def _set_ident(self):
- self._ident = get_ident()
- if _HAVE_THREAD_NATIVE_ID:
- def _set_native_id(self):
- self._native_id = get_native_id()
- def _set_tstate_lock(self):
- """
- Set a lock object which will be released by the interpreter when
- the underlying thread state (see pystate.h) gets deleted.
- """
- self._tstate_lock = _set_sentinel()
- self._tstate_lock.acquire()
- if not self.daemon:
- with _shutdown_locks_lock:
- _maintain_shutdown_locks()
- _shutdown_locks.add(self._tstate_lock)
- def _bootstrap_inner(self):
- try:
- self._set_ident()
- self._set_tstate_lock()
- if _HAVE_THREAD_NATIVE_ID:
- self._set_native_id()
- self._started.set()
- with _active_limbo_lock:
- _active[self._ident] = self
- del _limbo[self]
- if _trace_hook:
- _sys.settrace(_trace_hook)
- if _profile_hook:
- _sys.setprofile(_profile_hook)
- try:
- self.run()
- except:
- self._invoke_excepthook(self)
- finally:
- self._delete()
- def _stop(self):
- # After calling ._stop(), .is_alive() returns False and .join() returns
- # immediately. ._tstate_lock must be released before calling ._stop().
- #
- # Normal case: C code at the end of the thread's life
- # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
- # that's detected by our ._wait_for_tstate_lock(), called by .join()
- # and .is_alive(). Any number of threads _may_ call ._stop()
- # simultaneously (for example, if multiple threads are blocked in
- # .join() calls), and they're not serialized. That's harmless -
- # they'll just make redundant rebindings of ._is_stopped and
- # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
- # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
- # (the assert is executed only if ._tstate_lock is None).
- #
- # Special case: _main_thread releases ._tstate_lock via this
- # module's _shutdown() function.
- lock = self._tstate_lock
- if lock is not None:
- assert not lock.locked()
- self._is_stopped = True
- self._tstate_lock = None
- if not self.daemon:
- with _shutdown_locks_lock:
- # Remove our lock and other released locks from _shutdown_locks
- _maintain_shutdown_locks()
- def _delete(self):
- "Remove current thread from the dict of currently running threads."
- with _active_limbo_lock:
- del _active[get_ident()]
- # There must not be any python code between the previous line
- # and after the lock is released. Otherwise a tracing function
- # could try to acquire the lock again in the same thread, (in
- # current_thread()), and would block.
- def join(self, timeout=None):
- """Wait until the thread terminates.
- This blocks the calling thread until the thread whose join() method is
- called terminates -- either normally or through an unhandled exception
- or until the optional timeout occurs.
- When the timeout argument is present and not None, it should be a
- floating point number specifying a timeout for the operation in seconds
- (or fractions thereof). As join() always returns None, you must call
- is_alive() after join() to decide whether a timeout happened -- if the
- thread is still alive, the join() call timed out.
- When the timeout argument is not present or None, the operation will
- block until the thread terminates.
- A thread can be join()ed many times.
- join() raises a RuntimeError if an attempt is made to join the current
- thread as that would cause a deadlock. It is also an error to join() a
- thread before it has been started and attempts to do so raises the same
- exception.
- """
- if not self._initialized:
- raise RuntimeError("Thread.__init__() not called")
- if not self._started.is_set():
- raise RuntimeError("cannot join thread before it is started")
- if self is current_thread():
- raise RuntimeError("cannot join current thread")
- if timeout is None:
- self._wait_for_tstate_lock()
- else:
- # the behavior of a negative timeout isn't documented, but
- # historically .join(timeout=x) for x<0 has acted as if timeout=0
- self._wait_for_tstate_lock(timeout=max(timeout, 0))
- def _wait_for_tstate_lock(self, block=True, timeout=-1):
- # Issue #18808: wait for the thread state to be gone.
- # At the end of the thread's life, after all knowledge of the thread
- # is removed from C data structures, C code releases our _tstate_lock.
- # This method passes its arguments to _tstate_lock.acquire().
- # If the lock is acquired, the C code is done, and self._stop() is
- # called. That sets ._is_stopped to True, and ._tstate_lock to None.
- lock = self._tstate_lock
- if lock is None:
- # already determined that the C code is done
- assert self._is_stopped
- return
- try:
- if lock.acquire(block, timeout):
- lock.release()
- self._stop()
- except:
- if lock.locked():
- # bpo-45274: lock.acquire() acquired the lock, but the function
- # was interrupted with an exception before reaching the
- # lock.release(). It can happen if a signal handler raises an
- # exception, like CTRL+C which raises KeyboardInterrupt.
- lock.release()
- self._stop()
- raise
- @property
- def name(self):
- """A string used for identification purposes only.
- It has no semantics. Multiple threads may be given the same name. The
- initial name is set by the constructor.
- """
- assert self._initialized, "Thread.__init__() not called"
- return self._name
- @name.setter
- def name(self, name):
- assert self._initialized, "Thread.__init__() not called"
- self._name = str(name)
- @property
- def ident(self):
- """Thread identifier of this thread or None if it has not been started.
- This is a nonzero integer. See the get_ident() function. Thread
- identifiers may be recycled when a thread exits and another thread is
- created. The identifier is available even after the thread has exited.
- """
- assert self._initialized, "Thread.__init__() not called"
- return self._ident
- if _HAVE_THREAD_NATIVE_ID:
- @property
- def native_id(self):
- """Native integral thread ID of this thread, or None if it has not been started.
- This is a non-negative integer. See the get_native_id() function.
- This represents the Thread ID as reported by the kernel.
- """
- assert self._initialized, "Thread.__init__() not called"
- return self._native_id
- def is_alive(self):
- """Return whether the thread is alive.
- This method returns True just before the run() method starts until just
- after the run() method terminates. See also the module function
- enumerate().
- """
- assert self._initialized, "Thread.__init__() not called"
- if self._is_stopped or not self._started.is_set():
- return False
- self._wait_for_tstate_lock(False)
- return not self._is_stopped
- @property
- def daemon(self):
- """A boolean value indicating whether this thread is a daemon thread.
- This must be set before start() is called, otherwise RuntimeError is
- raised. Its initial value is inherited from the creating thread; the
- main thread is not a daemon thread and therefore all threads created in
- the main thread default to daemon = False.
- The entire Python program exits when only daemon threads are left.
- """
- assert self._initialized, "Thread.__init__() not called"
- return self._daemonic
- @daemon.setter
- def daemon(self, daemonic):
- if not self._initialized:
- raise RuntimeError("Thread.__init__() not called")
- if daemonic and not _daemon_threads_allowed():
- raise RuntimeError('daemon threads are disabled in this interpreter')
- if self._started.is_set():
- raise RuntimeError("cannot set daemon status of active thread")
- self._daemonic = daemonic
- def isDaemon(self):
- """Return whether this thread is a daemon.
- This method is deprecated, use the daemon attribute instead.
- """
- import warnings
- warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
- DeprecationWarning, stacklevel=2)
- return self.daemon
- def setDaemon(self, daemonic):
- """Set whether this thread is a daemon.
- This method is deprecated, use the .daemon property instead.
- """
- import warnings
- warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
- DeprecationWarning, stacklevel=2)
- self.daemon = daemonic
- def getName(self):
- """Return a string used for identification purposes only.
- This method is deprecated, use the name attribute instead.
- """
- import warnings
- warnings.warn('getName() is deprecated, get the name attribute instead',
- DeprecationWarning, stacklevel=2)
- return self.name
- def setName(self, name):
- """Set the name string for this thread.
- This method is deprecated, use the name attribute instead.
- """
- import warnings
- warnings.warn('setName() is deprecated, set the name attribute instead',
- DeprecationWarning, stacklevel=2)
- self.name = name
- try:
- from _thread import (_excepthook as excepthook,
- _ExceptHookArgs as ExceptHookArgs)
- except ImportError:
- # Simple Python implementation if _thread._excepthook() is not available
- from traceback import print_exception as _print_exception
- from collections import namedtuple
- _ExceptHookArgs = namedtuple(
- 'ExceptHookArgs',
- 'exc_type exc_value exc_traceback thread')
- def ExceptHookArgs(args):
- return _ExceptHookArgs(*args)
- def excepthook(args, /):
- """
- Handle uncaught Thread.run() exception.
- """
- if args.exc_type == SystemExit:
- # silently ignore SystemExit
- return
- if _sys is not None and _sys.stderr is not None:
- stderr = _sys.stderr
- elif args.thread is not None:
- stderr = args.thread._stderr
- if stderr is None:
- # do nothing if sys.stderr is None and sys.stderr was None
- # when the thread was created
- return
- else:
- # do nothing if sys.stderr is None and args.thread is None
- return
- if args.thread is not None:
- name = args.thread.name
- else:
- name = get_ident()
- print(f"Exception in thread {name}:",
- file=stderr, flush=True)
- _print_exception(args.exc_type, args.exc_value, args.exc_traceback,
- file=stderr)
- stderr.flush()
- # Original value of threading.excepthook
- __excepthook__ = excepthook
- def _make_invoke_excepthook():
- # Create a local namespace to ensure that variables remain alive
- # when _invoke_excepthook() is called, even if it is called late during
- # Python shutdown. It is mostly needed for daemon threads.
- old_excepthook = excepthook
- old_sys_excepthook = _sys.excepthook
- if old_excepthook is None:
- raise RuntimeError("threading.excepthook is None")
- if old_sys_excepthook is None:
- raise RuntimeError("sys.excepthook is None")
- sys_exc_info = _sys.exc_info
- local_print = print
- local_sys = _sys
- def invoke_excepthook(thread):
- global excepthook
- try:
- hook = excepthook
- if hook is None:
- hook = old_excepthook
- args = ExceptHookArgs([*sys_exc_info(), thread])
- hook(args)
- except Exception as exc:
- exc.__suppress_context__ = True
- del exc
- if local_sys is not None and local_sys.stderr is not None:
- stderr = local_sys.stderr
- else:
- stderr = thread._stderr
- local_print("Exception in threading.excepthook:",
- file=stderr, flush=True)
- if local_sys is not None and local_sys.excepthook is not None:
- sys_excepthook = local_sys.excepthook
- else:
- sys_excepthook = old_sys_excepthook
- sys_excepthook(*sys_exc_info())
- finally:
- # Break reference cycle (exception stored in a variable)
- args = None
- return invoke_excepthook
- # The timer class was contributed by Itamar Shtull-Trauring
- class Timer(Thread):
- """Call a function after a specified number of seconds:
- t = Timer(30.0, f, args=None, kwargs=None)
- t.start()
- t.cancel() # stop the timer's action if it's still waiting
- """
- def __init__(self, interval, function, args=None, kwargs=None):
- Thread.__init__(self)
- self.interval = interval
- self.function = function
- self.args = args if args is not None else []
- self.kwargs = kwargs if kwargs is not None else {}
- self.finished = Event()
- def cancel(self):
- """Stop the timer if it hasn't finished yet."""
- self.finished.set()
- def run(self):
- self.finished.wait(self.interval)
- if not self.finished.is_set():
- self.function(*self.args, **self.kwargs)
- self.finished.set()
- # Special thread class to represent the main thread
- class _MainThread(Thread):
- def __init__(self):
- Thread.__init__(self, name="MainThread", daemon=False)
- self._set_tstate_lock()
- self._started.set()
- self._set_ident()
- if _HAVE_THREAD_NATIVE_ID:
- self._set_native_id()
- with _active_limbo_lock:
- _active[self._ident] = self
- # Dummy thread class to represent threads not started here.
- # These aren't garbage collected when they die, nor can they be waited for.
- # If they invoke anything in threading.py that calls current_thread(), they
- # leave an entry in the _active dict forever after.
- # Their purpose is to return *something* from current_thread().
- # They are marked as daemon threads so we won't wait for them
- # when we exit (conform previous semantics).
- class _DummyThread(Thread):
- def __init__(self):
- Thread.__init__(self, name=_newname("Dummy-%d"),
- daemon=_daemon_threads_allowed())
- self._started.set()
- self._set_ident()
- if _HAVE_THREAD_NATIVE_ID:
- self._set_native_id()
- with _active_limbo_lock:
- _active[self._ident] = self
- def _stop(self):
- pass
- def is_alive(self):
- assert not self._is_stopped and self._started.is_set()
- return True
- def join(self, timeout=None):
- assert False, "cannot join a dummy thread"
- # Global API functions
- def current_thread():
- """Return the current Thread object, corresponding to the caller's thread of control.
- If the caller's thread of control was not created through the threading
- module, a dummy thread object with limited functionality is returned.
- """
- try:
- return _active[get_ident()]
- except KeyError:
- return _DummyThread()
- def currentThread():
- """Return the current Thread object, corresponding to the caller's thread of control.
- This function is deprecated, use current_thread() instead.
- """
- import warnings
- warnings.warn('currentThread() is deprecated, use current_thread() instead',
- DeprecationWarning, stacklevel=2)
- return current_thread()
- def active_count():
- """Return the number of Thread objects currently alive.
- The returned count is equal to the length of the list returned by
- enumerate().
- """
- # NOTE: if the logic in here ever changes, update Modules/posixmodule.c
- # warn_about_fork_with_threads() to match.
- with _active_limbo_lock:
- return len(_active) + len(_limbo)
- def activeCount():
- """Return the number of Thread objects currently alive.
- This function is deprecated, use active_count() instead.
- """
- import warnings
- warnings.warn('activeCount() is deprecated, use active_count() instead',
- DeprecationWarning, stacklevel=2)
- return active_count()
- def _enumerate():
- # Same as enumerate(), but without the lock. Internal use only.
- return list(_active.values()) + list(_limbo.values())
- def enumerate():
- """Return a list of all Thread objects currently alive.
- The list includes daemonic threads, dummy thread objects created by
- current_thread(), and the main thread. It excludes terminated threads and
- threads that have not yet been started.
- """
- with _active_limbo_lock:
- return list(_active.values()) + list(_limbo.values())
- _threading_atexits = []
- _SHUTTING_DOWN = False
- def _register_atexit(func, *arg, **kwargs):
- """CPython internal: register *func* to be called before joining threads.
- The registered *func* is called with its arguments just before all
- non-daemon threads are joined in `_shutdown()`. It provides a similar
- purpose to `atexit.register()`, but its functions are called prior to
- threading shutdown instead of interpreter shutdown.
- For similarity to atexit, the registered functions are called in reverse.
- """
- if _SHUTTING_DOWN:
- raise RuntimeError("can't register atexit after shutdown")
- call = functools.partial(func, *arg, **kwargs)
- _threading_atexits.append(call)
- from _thread import stack_size
- # Create the main thread object,
- # and make it available for the interpreter
- # (Py_Main) as threading._shutdown.
- _main_thread = _MainThread()
- def _shutdown():
- """
- Wait until the Python thread state of all non-daemon threads get deleted.
- """
- # Obscure: other threads may be waiting to join _main_thread. That's
- # dubious, but some code does it. We can't wait for C code to release
- # the main thread's tstate_lock - that won't happen until the interpreter
- # is nearly dead. So we release it here. Note that just calling _stop()
- # isn't enough: other threads may already be waiting on _tstate_lock.
- if _main_thread._is_stopped and _is_main_interpreter():
- # _shutdown() was already called
- return
- global _SHUTTING_DOWN
- _SHUTTING_DOWN = True
- # Call registered threading atexit functions before threads are joined.
- # Order is reversed, similar to atexit.
- for atexit_call in reversed(_threading_atexits):
- atexit_call()
- # Main thread
- if _main_thread.ident == get_ident():
- tlock = _main_thread._tstate_lock
- # The main thread isn't finished yet, so its thread state lock can't
- # have been released.
- assert tlock is not None
- assert tlock.locked()
- tlock.release()
- _main_thread._stop()
- else:
- # bpo-1596321: _shutdown() must be called in the main thread.
- # If the threading module was not imported by the main thread,
- # _main_thread is the thread which imported the threading module.
- # In this case, ignore _main_thread, similar behavior than for threads
- # spawned by C libraries or using _thread.start_new_thread().
- pass
- # Join all non-deamon threads
- while True:
- with _shutdown_locks_lock:
- locks = list(_shutdown_locks)
- _shutdown_locks.clear()
- if not locks:
- break
- for lock in locks:
- # mimic Thread.join()
- lock.acquire()
- lock.release()
- # new threads can be spawned while we were waiting for the other
- # threads to complete
- def main_thread():
- """Return the main thread object.
- In normal conditions, the main thread is the thread from which the
- Python interpreter was started.
- """
- # XXX Figure this out for subinterpreters. (See gh-75698.)
- return _main_thread
- # get thread-local implementation, either from the thread
- # module, or from the python fallback
- try:
- from _thread import _local as local
- except ImportError:
- from _threading_local import local
- def _after_fork():
- """
- Cleanup threading module state that should not exist after a fork.
- """
- # Reset _active_limbo_lock, in case we forked while the lock was held
- # by another (non-forked) thread. http://bugs.python.org/issue874900
- global _active_limbo_lock, _main_thread
- global _shutdown_locks_lock, _shutdown_locks
- _active_limbo_lock = RLock()
- # fork() only copied the current thread; clear references to others.
- new_active = {}
- try:
- current = _active[get_ident()]
- except KeyError:
- # fork() was called in a thread which was not spawned
- # by threading.Thread. For example, a thread spawned
- # by thread.start_new_thread().
- current = _MainThread()
- _main_thread = current
- # reset _shutdown() locks: threads re-register their _tstate_lock below
- _shutdown_locks_lock = _allocate_lock()
- _shutdown_locks = set()
- with _active_limbo_lock:
- # Dangling thread instances must still have their locks reset,
- # because someone may join() them.
- threads = set(_enumerate())
- threads.update(_dangling)
- for thread in threads:
- # Any lock/condition variable may be currently locked or in an
- # invalid state, so we reinitialize them.
- if thread is current:
- # There is only one active thread. We reset the ident to
- # its new value since it can have changed.
- thread._reset_internal_locks(True)
- ident = get_ident()
- if isinstance(thread, _DummyThread):
- thread.__class__ = _MainThread
- thread._name = 'MainThread'
- thread._daemonic = False
- thread._set_tstate_lock()
- thread._ident = ident
- new_active[ident] = thread
- else:
- # All the others are already stopped.
- thread._reset_internal_locks(False)
- thread._stop()
- _limbo.clear()
- _active.clear()
- _active.update(new_active)
- assert len(_active) == 1
- if hasattr(_os, "register_at_fork"):
- _os.register_at_fork(after_in_child=_after_fork)
|