123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- # Copyright (C) 2002-2016 John Goerzen & contributors
- # Thread support module
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- from threading import Lock, Thread, BoundedSemaphore
- from queue import Queue, Empty
- import traceback
- from offlineimap.ui import getglobalui
- STOP_MONITOR = 'STOP_MONITOR'
- # General utilities
- def semaphorereset(semaphore, originalstate):
- """Block until `semaphore` gets back to its original state, ie all acquired
- resources have been released."""
- for i in range(originalstate):
- semaphore.acquire()
- # Now release these.
- for i in range(originalstate):
- semaphore.release()
- class accountThreads:
- """Store the list of all threads in the software so it can be used to find out
- what's running and what's not."""
- def __init__(self):
- self.lock = Lock()
- self.list = []
- def add(self, thread):
- with self.lock:
- self.list.append(thread)
- def remove(self, thread):
- with self.lock:
- self.list.remove(thread)
- def pop(self):
- with self.lock:
- if len(self.list) < 1:
- return None
- return self.list.pop()
- def wait(self):
- while True:
- thread = self.pop()
- if thread is None:
- break
- thread.join()
- ######################################################################
- # Exit-notify threads
- ######################################################################
- exitedThreads = Queue()
- def monitor():
- """An infinite "monitoring" loop watching for finished ExitNotifyThread's.
- This one is supposed to run in the main thread.
- """
- global exitedThreads
- ui = getglobalui()
- while True:
- # Loop forever and call 'callback' for each thread that exited
- try:
- # We need a timeout in the get() call, so that ctrl-c can throw a
- # SIGINT (http://bugs.python.org/issue1360). A timeout with empty
- # Queue will raise `Empty`.
- #
- # ExitNotifyThread add themselves to the exitedThreads queue once
- # they are done (normally or with exception).
- thread = exitedThreads.get(True, 60)
- # Request to abort when callback returns True.
- if thread.exit_exception is not None:
- if isinstance(thread.exit_exception, SystemExit):
- # Bring a SystemExit into the main thread.
- # Do not send it back to UI layer right now.
- # Maybe later send it to ui.terminate?
- raise SystemExit
- ui.threadException(thread) # Expected to terminate the program.
- # Should never hit this line.
- raise AssertionError("thread has 'exit_exception' set to"
- " '%s' [%s] but this value is unexpected"
- " and the ui did not stop the program." %
- (repr(thread.exit_exception), type(thread.exit_exception)))
- # Only the monitor thread has this exit message set.
- elif thread.exit_message == STOP_MONITOR:
- break # Exit the loop here.
- else:
- ui.threadExited(thread)
- except Empty:
- pass
- class ExitNotifyThread(Thread):
- """This class is designed to alert a "monitor" to the fact that a
- thread has exited and to provide for the ability for it to find out
- why. All instances are made daemon threads (.daemon=True, so we
- bail out when the mainloop dies.
- The thread can set instance variables self.exit_message for a human
- readable reason of the thread exit.
- There is one instance of this class at runtime. The main thread waits for
- the monitor to end."""
- def __init__(self, *args, **kwargs):
- super(ExitNotifyThread, self).__init__(*args, **kwargs)
- # These are all child threads that are supposed to go away when
- # the main thread is killed.
- self.daemon = True
- self.exit_message = None
- self._exit_exc = None
- self._exit_stacktrace = None
- def run(self):
- """Allow profiling of a run and store exceptions."""
- global exitedThreads
- try:
- Thread.run(self)
- except Exception as e:
- # Thread exited with Exception, store it
- tb = traceback.format_exc()
- self.set_exit_exception(e, tb)
- exitedThreads.put(self, True)
- def set_exit_exception(self, exc, st=None):
- """Sets Exception and stacktrace of a thread, so that other
- threads can query its exit status"""
- self._exit_exc = exc
- self._exit_stacktrace = st
- @property
- def exit_exception(self):
- """Returns the cause of the exit, one of:
- Exception() -- the thread aborted with this exception
- None -- normal termination."""
- return self._exit_exc
- @property
- def exit_stacktrace(self):
- """Returns a string representing the stack trace if set"""
- return self._exit_stacktrace
- ######################################################################
- # Instance-limited threads
- ######################################################################
- limitedNamespaces = {}
- def initInstanceLimit(limitNamespace, instancemax):
- """Initialize the instance-limited thread implementation.
- Run up to intancemax threads for the given limitNamespace. This allows to
- honor maxsyncaccounts and maxconnections."""
- global limitedNamespaces
- if limitNamespace not in limitedNamespaces:
- limitedNamespaces[limitNamespace] = BoundedSemaphore(instancemax)
- class InstanceLimitedThread(ExitNotifyThread):
- def __init__(self, limitNamespace, *args, **kwargs):
- self.limitNamespace = limitNamespace
- super(InstanceLimitedThread, self).__init__(*args, **kwargs)
- def start(self):
- global limitedNamespaces
- # Will block until the semaphore has free slots.
- limitedNamespaces[self.limitNamespace].acquire()
- ExitNotifyThread.start(self)
- def run(self):
- global limitedNamespaces
- try:
- ExitNotifyThread.run(self)
- finally:
- if limitedNamespaces and limitedNamespaces[self.limitNamespace]:
- limitedNamespaces[self.limitNamespace].release()
|