threadutil.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # Copyright (C) 2002-2016 John Goerzen & contributors
  2. # Thread support module
  3. #
  4. # This program is free software; you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation; either version 2 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  17. from threading import Lock, Thread, BoundedSemaphore, currentThread
  18. try:
  19. from Queue import Queue, Empty
  20. except ImportError: # python3
  21. from queue import Queue, Empty
  22. import traceback
  23. import os.path
  24. from offlineimap.ui import getglobalui
  25. STOP_MONITOR = 'STOP_MONITOR'
  26. ######################################################################
  27. # General utilities
  28. ######################################################################
  29. def semaphorereset(semaphore, originalstate):
  30. """Block until `semaphore` gets back to its original state, ie all acquired
  31. resources have been released."""
  32. for i in range(originalstate):
  33. semaphore.acquire()
  34. # Now release these.
  35. for i in range(originalstate):
  36. semaphore.release()
  37. class accountThreads(object):
  38. """Store the list of all threads in the software so it can be used to find out
  39. what's running and what's not."""
  40. def __init__(self):
  41. self.lock = Lock()
  42. self.list = []
  43. def add(self, thread):
  44. with self.lock:
  45. self.list.append(thread)
  46. def remove(self, thread):
  47. with self.lock:
  48. self.list.remove(thread)
  49. def pop(self):
  50. with self.lock:
  51. if len(self.list) < 1:
  52. return None
  53. return self.list.pop()
  54. def wait(self):
  55. while True:
  56. thread = self.pop()
  57. if thread is None:
  58. break
  59. thread.join()
  60. ######################################################################
  61. # Exit-notify threads
  62. ######################################################################
  63. exitedThreads = Queue()
  64. def monitor():
  65. """An infinite "monitoring" loop watching for finished ExitNotifyThread's.
  66. This one is supposed to run in the main thread.
  67. :param callback: the function to call when a thread terminated. That
  68. function is called with a single argument -- the
  69. ExitNotifyThread that has terminated. The monitor will
  70. not continue to monitor for other threads until
  71. 'callback' returns, so if it intends to perform long
  72. calculations, it should start a new thread itself -- but
  73. NOT an ExitNotifyThread, or else an infinite loop
  74. may result.
  75. Furthermore, the monitor will hold the lock all the
  76. while the other thread is waiting.
  77. :type callback: a callable function
  78. """
  79. global exitedThreads
  80. ui = getglobalui()
  81. while True:
  82. # Loop forever and call 'callback' for each thread that exited
  83. try:
  84. # We need a timeout in the get() call, so that ctrl-c can throw a
  85. # SIGINT (http://bugs.python.org/issue1360). A timeout with empty
  86. # Queue will raise `Empty`.
  87. #
  88. # ExitNotifyThread add themselves to the exitedThreads queue once
  89. # they are done (normally or with exception).
  90. thread = exitedThreads.get(True, 60)
  91. # Request to abort when callback returns True.
  92. if thread.exit_exception is not None:
  93. if isinstance(thread.exit_exception, SystemExit):
  94. # Bring a SystemExit into the main thread.
  95. # Do not send it back to UI layer right now.
  96. # Maybe later send it to ui.terminate?
  97. raise SystemExit
  98. ui.threadException(thread) # Expected to terminate the program.
  99. # Should never hit this line.
  100. raise AssertionError("thread has 'exit_exception' set to"
  101. " '%s' [%s] but this value is unexpected"
  102. " and the ui did not stop the program."%
  103. (repr(thread.exit_exception), type(thread.exit_exception)))
  104. # Only the monitor thread has this exit message set.
  105. elif thread.exit_message == STOP_MONITOR:
  106. break # Exit the loop here.
  107. else:
  108. ui.threadExited(thread)
  109. except Empty:
  110. pass
  111. class ExitNotifyThread(Thread):
  112. """This class is designed to alert a "monitor" to the fact that a
  113. thread has exited and to provide for the ability for it to find out
  114. why. All instances are made daemon threads (setDaemon(True), so we
  115. bail out when the mainloop dies.
  116. The thread can set instance variables self.exit_message for a human
  117. readable reason of the thread exit.
  118. There is one instance of this class at runtime. The main thread waits for
  119. the monitor to end."""
  120. def __init__(self, *args, **kwargs):
  121. super(ExitNotifyThread, self).__init__(*args, **kwargs)
  122. # These are all child threads that are supposed to go away when
  123. # the main thread is killed.
  124. self.setDaemon(True)
  125. self.exit_message = None
  126. self._exit_exc = None
  127. self._exit_stacktrace = None
  128. def run(self):
  129. """Allow profiling of a run and store exceptions."""
  130. global exitedThreads
  131. try:
  132. Thread.run(self)
  133. except Exception as e:
  134. # Thread exited with Exception, store it
  135. tb = traceback.format_exc()
  136. self.set_exit_exception(e, tb)
  137. exitedThreads.put(self, True)
  138. def set_exit_exception(self, exc, st=None):
  139. """Sets Exception and stacktrace of a thread, so that other
  140. threads can query its exit status"""
  141. self._exit_exc = exc
  142. self._exit_stacktrace = st
  143. @property
  144. def exit_exception(self):
  145. """Returns the cause of the exit, one of:
  146. Exception() -- the thread aborted with this exception
  147. None -- normal termination."""
  148. return self._exit_exc
  149. @property
  150. def exit_stacktrace(self):
  151. """Returns a string representing the stack trace if set"""
  152. return self._exit_stacktrace
  153. ######################################################################
  154. # Instance-limited threads
  155. ######################################################################
  156. limitedNamespaces = {}
  157. def initInstanceLimit(limitNamespace, instancemax):
  158. """Initialize the instance-limited thread implementation.
  159. Run up to intancemax threads for the given limitNamespace. This allows to
  160. honor maxsyncaccounts and maxconnections."""
  161. global limitedNamespaces
  162. if not limitNamespace in limitedNamespaces:
  163. limitedNamespaces[limitNamespace] = BoundedSemaphore(instancemax)
  164. class InstanceLimitedThread(ExitNotifyThread):
  165. def __init__(self, limitNamespace, *args, **kwargs):
  166. self.limitNamespace = limitNamespace
  167. super(InstanceLimitedThread, self).__init__(*args, **kwargs)
  168. def start(self):
  169. global limitedNamespaces
  170. # Will block until the semaphore has free slots.
  171. limitedNamespaces[self.limitNamespace].acquire()
  172. ExitNotifyThread.start(self)
  173. def run(self):
  174. global limitedNamespaces
  175. try:
  176. ExitNotifyThread.run(self)
  177. finally:
  178. if limitedNamespaces and limitedNamespaces[self.limitNamespace]:
  179. limitedNamespaces[self.limitNamespace].release()