threadutil.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # Copyright (C) 2002-2016 John Goerzen & contributors
  2. # Thread support module
  3. #
  4. # This program is free software; you can redistribute it and/or modify
  5. # it under the terms of the GNU General Public License as published by
  6. # the Free Software Foundation; either version 2 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
  17. from threading import Lock, Thread, BoundedSemaphore
  18. from queue import Queue, Empty
  19. import traceback
  20. from offlineimap.ui import getglobalui
  21. STOP_MONITOR = 'STOP_MONITOR'
  22. # General utilities
  23. def semaphorereset(semaphore, originalstate):
  24. """Block until `semaphore` gets back to its original state, ie all acquired
  25. resources have been released."""
  26. for i in range(originalstate):
  27. semaphore.acquire()
  28. # Now release these.
  29. for i in range(originalstate):
  30. semaphore.release()
  31. class accountThreads:
  32. """Store the list of all threads in the software so it can be used to find out
  33. what's running and what's not."""
  34. def __init__(self):
  35. self.lock = Lock()
  36. self.list = []
  37. def add(self, thread):
  38. with self.lock:
  39. self.list.append(thread)
  40. def remove(self, thread):
  41. with self.lock:
  42. self.list.remove(thread)
  43. def pop(self):
  44. with self.lock:
  45. if len(self.list) < 1:
  46. return None
  47. return self.list.pop()
  48. def wait(self):
  49. while True:
  50. thread = self.pop()
  51. if thread is None:
  52. break
  53. thread.join()
  54. ######################################################################
  55. # Exit-notify threads
  56. ######################################################################
  57. exitedThreads = Queue()
  58. def monitor():
  59. """An infinite "monitoring" loop watching for finished ExitNotifyThread's.
  60. This one is supposed to run in the main thread.
  61. """
  62. global exitedThreads
  63. ui = getglobalui()
  64. while True:
  65. # Loop forever and call 'callback' for each thread that exited
  66. try:
  67. # We need a timeout in the get() call, so that ctrl-c can throw a
  68. # SIGINT (http://bugs.python.org/issue1360). A timeout with empty
  69. # Queue will raise `Empty`.
  70. #
  71. # ExitNotifyThread add themselves to the exitedThreads queue once
  72. # they are done (normally or with exception).
  73. thread = exitedThreads.get(True, 60)
  74. # Request to abort when callback returns True.
  75. if thread.exit_exception is not None:
  76. if isinstance(thread.exit_exception, SystemExit):
  77. # Bring a SystemExit into the main thread.
  78. # Do not send it back to UI layer right now.
  79. # Maybe later send it to ui.terminate?
  80. raise SystemExit
  81. ui.threadException(thread) # Expected to terminate the program.
  82. # Should never hit this line.
  83. raise AssertionError("thread has 'exit_exception' set to"
  84. " '%s' [%s] but this value is unexpected"
  85. " and the ui did not stop the program." %
  86. (repr(thread.exit_exception), type(thread.exit_exception)))
  87. # Only the monitor thread has this exit message set.
  88. elif thread.exit_message == STOP_MONITOR:
  89. break # Exit the loop here.
  90. else:
  91. ui.threadExited(thread)
  92. except Empty:
  93. pass
  94. class ExitNotifyThread(Thread):
  95. """This class is designed to alert a "monitor" to the fact that a
  96. thread has exited and to provide for the ability for it to find out
  97. why. All instances are made daemon threads (.daemon=True, so we
  98. bail out when the mainloop dies.
  99. The thread can set instance variables self.exit_message for a human
  100. readable reason of the thread exit.
  101. There is one instance of this class at runtime. The main thread waits for
  102. the monitor to end."""
  103. def __init__(self, *args, **kwargs):
  104. super(ExitNotifyThread, self).__init__(*args, **kwargs)
  105. # These are all child threads that are supposed to go away when
  106. # the main thread is killed.
  107. self.daemon = True
  108. self.exit_message = None
  109. self._exit_exc = None
  110. self._exit_stacktrace = None
  111. def run(self):
  112. """Allow profiling of a run and store exceptions."""
  113. global exitedThreads
  114. try:
  115. Thread.run(self)
  116. except Exception as e:
  117. # Thread exited with Exception, store it
  118. tb = traceback.format_exc()
  119. self.set_exit_exception(e, tb)
  120. exitedThreads.put(self, True)
  121. def set_exit_exception(self, exc, st=None):
  122. """Sets Exception and stacktrace of a thread, so that other
  123. threads can query its exit status"""
  124. self._exit_exc = exc
  125. self._exit_stacktrace = st
  126. @property
  127. def exit_exception(self):
  128. """Returns the cause of the exit, one of:
  129. Exception() -- the thread aborted with this exception
  130. None -- normal termination."""
  131. return self._exit_exc
  132. @property
  133. def exit_stacktrace(self):
  134. """Returns a string representing the stack trace if set"""
  135. return self._exit_stacktrace
  136. ######################################################################
  137. # Instance-limited threads
  138. ######################################################################
  139. limitedNamespaces = {}
  140. def initInstanceLimit(limitNamespace, instancemax):
  141. """Initialize the instance-limited thread implementation.
  142. Run up to intancemax threads for the given limitNamespace. This allows to
  143. honor maxsyncaccounts and maxconnections."""
  144. global limitedNamespaces
  145. if limitNamespace not in limitedNamespaces:
  146. limitedNamespaces[limitNamespace] = BoundedSemaphore(instancemax)
  147. class InstanceLimitedThread(ExitNotifyThread):
  148. def __init__(self, limitNamespace, *args, **kwargs):
  149. self.limitNamespace = limitNamespace
  150. super(InstanceLimitedThread, self).__init__(*args, **kwargs)
  151. def start(self):
  152. global limitedNamespaces
  153. # Will block until the semaphore has free slots.
  154. limitedNamespaces[self.limitNamespace].acquire()
  155. ExitNotifyThread.start(self)
  156. def run(self):
  157. global limitedNamespaces
  158. try:
  159. ExitNotifyThread.run(self)
  160. finally:
  161. if limitedNamespaces and limitedNamespaces[self.limitNamespace]:
  162. limitedNamespaces[self.limitNamespace].release()