backgroundjobs.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. # -*- coding: utf-8 -*-
  2. """Manage background (threaded) jobs conveniently from an interactive shell.
  3. This module provides a BackgroundJobManager class. This is the main class
  4. meant for public usage, it implements an object which can create and manage
  5. new background jobs.
  6. It also provides the actual job classes managed by these BackgroundJobManager
  7. objects, see their docstrings below.
  8. This system was inspired by discussions with B. Granger and the
  9. BackgroundCommand class described in the book Python Scripting for
  10. Computational Science, by H. P. Langtangen:
  11. http://folk.uio.no/hpl/scripting
  12. (although ultimately no code from this text was used, as IPython's system is a
  13. separate implementation).
  14. An example notebook is provided in our documentation illustrating interactive
  15. use of the system.
  16. """
  17. from __future__ import print_function
  18. #*****************************************************************************
  19. # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
  20. #
  21. # Distributed under the terms of the BSD License. The full license is in
  22. # the file COPYING, distributed as part of this software.
  23. #*****************************************************************************
  24. # Code begins
  25. import sys
  26. import threading
  27. from IPython import get_ipython
  28. from IPython.core.ultratb import AutoFormattedTB
  29. from logging import error
  30. from IPython.utils.py3compat import string_types
  31. class BackgroundJobManager(object):
  32. """Class to manage a pool of backgrounded threaded jobs.
  33. Below, we assume that 'jobs' is a BackgroundJobManager instance.
  34. Usage summary (see the method docstrings for details):
  35. jobs.new(...) -> start a new job
  36. jobs() or jobs.status() -> print status summary of all jobs
  37. jobs[N] -> returns job number N.
  38. foo = jobs[N].result -> assign to variable foo the result of job N
  39. jobs[N].traceback() -> print the traceback of dead job N
  40. jobs.remove(N) -> remove (finished) job N
  41. jobs.flush() -> remove all finished jobs
  42. As a convenience feature, BackgroundJobManager instances provide the
  43. utility result and traceback methods which retrieve the corresponding
  44. information from the jobs list:
  45. jobs.result(N) <--> jobs[N].result
  46. jobs.traceback(N) <--> jobs[N].traceback()
  47. While this appears minor, it allows you to use tab completion
  48. interactively on the job manager instance.
  49. """
  50. def __init__(self):
  51. # Lists for job management, accessed via a property to ensure they're
  52. # up to date.x
  53. self._running = []
  54. self._completed = []
  55. self._dead = []
  56. # A dict of all jobs, so users can easily access any of them
  57. self.all = {}
  58. # For reporting
  59. self._comp_report = []
  60. self._dead_report = []
  61. # Store status codes locally for fast lookups
  62. self._s_created = BackgroundJobBase.stat_created_c
  63. self._s_running = BackgroundJobBase.stat_running_c
  64. self._s_completed = BackgroundJobBase.stat_completed_c
  65. self._s_dead = BackgroundJobBase.stat_dead_c
  66. @property
  67. def running(self):
  68. self._update_status()
  69. return self._running
  70. @property
  71. def dead(self):
  72. self._update_status()
  73. return self._dead
  74. @property
  75. def completed(self):
  76. self._update_status()
  77. return self._completed
  78. def new(self, func_or_exp, *args, **kwargs):
  79. """Add a new background job and start it in a separate thread.
  80. There are two types of jobs which can be created:
  81. 1. Jobs based on expressions which can be passed to an eval() call.
  82. The expression must be given as a string. For example:
  83. job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
  84. The given expression is passed to eval(), along with the optional
  85. global/local dicts provided. If no dicts are given, they are
  86. extracted automatically from the caller's frame.
  87. A Python statement is NOT a valid eval() expression. Basically, you
  88. can only use as an eval() argument something which can go on the right
  89. of an '=' sign and be assigned to a variable.
  90. For example,"print 'hello'" is not valid, but '2+3' is.
  91. 2. Jobs given a function object, optionally passing additional
  92. positional arguments:
  93. job_manager.new(myfunc, x, y)
  94. The function is called with the given arguments.
  95. If you need to pass keyword arguments to your function, you must
  96. supply them as a dict named kw:
  97. job_manager.new(myfunc, x, y, kw=dict(z=1))
  98. The reason for this assymmetry is that the new() method needs to
  99. maintain access to its own keywords, and this prevents name collisions
  100. between arguments to new() and arguments to your own functions.
  101. In both cases, the result is stored in the job.result field of the
  102. background job object.
  103. You can set `daemon` attribute of the thread by giving the keyword
  104. argument `daemon`.
  105. Notes and caveats:
  106. 1. All threads running share the same standard output. Thus, if your
  107. background jobs generate output, it will come out on top of whatever
  108. you are currently writing. For this reason, background jobs are best
  109. used with silent functions which simply return their output.
  110. 2. Threads also all work within the same global namespace, and this
  111. system does not lock interactive variables. So if you send job to the
  112. background which operates on a mutable object for a long time, and
  113. start modifying that same mutable object interactively (or in another
  114. backgrounded job), all sorts of bizarre behaviour will occur.
  115. 3. If a background job is spending a lot of time inside a C extension
  116. module which does not release the Python Global Interpreter Lock
  117. (GIL), this will block the IPython prompt. This is simply because the
  118. Python interpreter can only switch between threads at Python
  119. bytecodes. While the execution is inside C code, the interpreter must
  120. simply wait unless the extension module releases the GIL.
  121. 4. There is no way, due to limitations in the Python threads library,
  122. to kill a thread once it has started."""
  123. if callable(func_or_exp):
  124. kw = kwargs.get('kw',{})
  125. job = BackgroundJobFunc(func_or_exp,*args,**kw)
  126. elif isinstance(func_or_exp, string_types):
  127. if not args:
  128. frame = sys._getframe(1)
  129. glob, loc = frame.f_globals, frame.f_locals
  130. elif len(args)==1:
  131. glob = loc = args[0]
  132. elif len(args)==2:
  133. glob,loc = args
  134. else:
  135. raise ValueError(
  136. 'Expression jobs take at most 2 args (globals,locals)')
  137. job = BackgroundJobExpr(func_or_exp, glob, loc)
  138. else:
  139. raise TypeError('invalid args for new job')
  140. if kwargs.get('daemon', False):
  141. job.daemon = True
  142. job.num = len(self.all)+1 if self.all else 0
  143. self.running.append(job)
  144. self.all[job.num] = job
  145. print('Starting job # %s in a separate thread.' % job.num)
  146. job.start()
  147. return job
  148. def __getitem__(self, job_key):
  149. num = job_key if isinstance(job_key, int) else job_key.num
  150. return self.all[num]
  151. def __call__(self):
  152. """An alias to self.status(),
  153. This allows you to simply call a job manager instance much like the
  154. Unix `jobs` shell command."""
  155. return self.status()
  156. def _update_status(self):
  157. """Update the status of the job lists.
  158. This method moves finished jobs to one of two lists:
  159. - self.completed: jobs which completed successfully
  160. - self.dead: jobs which finished but died.
  161. It also copies those jobs to corresponding _report lists. These lists
  162. are used to report jobs completed/dead since the last update, and are
  163. then cleared by the reporting function after each call."""
  164. # Status codes
  165. srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
  166. # State lists, use the actual lists b/c the public names are properties
  167. # that call this very function on access
  168. running, completed, dead = self._running, self._completed, self._dead
  169. # Now, update all state lists
  170. for num, job in enumerate(running):
  171. stat = job.stat_code
  172. if stat == srun:
  173. continue
  174. elif stat == scomp:
  175. completed.append(job)
  176. self._comp_report.append(job)
  177. running[num] = False
  178. elif stat == sdead:
  179. dead.append(job)
  180. self._dead_report.append(job)
  181. running[num] = False
  182. # Remove dead/completed jobs from running list
  183. running[:] = filter(None, running)
  184. def _group_report(self,group,name):
  185. """Report summary for a given job group.
  186. Return True if the group had any elements."""
  187. if group:
  188. print('%s jobs:' % name)
  189. for job in group:
  190. print('%s : %s' % (job.num,job))
  191. print()
  192. return True
  193. def _group_flush(self,group,name):
  194. """Flush a given job group
  195. Return True if the group had any elements."""
  196. njobs = len(group)
  197. if njobs:
  198. plural = {1:''}.setdefault(njobs,'s')
  199. print('Flushing %s %s job%s.' % (njobs,name,plural))
  200. group[:] = []
  201. return True
  202. def _status_new(self):
  203. """Print the status of newly finished jobs.
  204. Return True if any new jobs are reported.
  205. This call resets its own state every time, so it only reports jobs
  206. which have finished since the last time it was called."""
  207. self._update_status()
  208. new_comp = self._group_report(self._comp_report, 'Completed')
  209. new_dead = self._group_report(self._dead_report,
  210. 'Dead, call jobs.traceback() for details')
  211. self._comp_report[:] = []
  212. self._dead_report[:] = []
  213. return new_comp or new_dead
  214. def status(self,verbose=0):
  215. """Print a status of all jobs currently being managed."""
  216. self._update_status()
  217. self._group_report(self.running,'Running')
  218. self._group_report(self.completed,'Completed')
  219. self._group_report(self.dead,'Dead')
  220. # Also flush the report queues
  221. self._comp_report[:] = []
  222. self._dead_report[:] = []
  223. def remove(self,num):
  224. """Remove a finished (completed or dead) job."""
  225. try:
  226. job = self.all[num]
  227. except KeyError:
  228. error('Job #%s not found' % num)
  229. else:
  230. stat_code = job.stat_code
  231. if stat_code == self._s_running:
  232. error('Job #%s is still running, it can not be removed.' % num)
  233. return
  234. elif stat_code == self._s_completed:
  235. self.completed.remove(job)
  236. elif stat_code == self._s_dead:
  237. self.dead.remove(job)
  238. def flush(self):
  239. """Flush all finished jobs (completed and dead) from lists.
  240. Running jobs are never flushed.
  241. It first calls _status_new(), to update info. If any jobs have
  242. completed since the last _status_new() call, the flush operation
  243. aborts."""
  244. # Remove the finished jobs from the master dict
  245. alljobs = self.all
  246. for job in self.completed+self.dead:
  247. del(alljobs[job.num])
  248. # Now flush these lists completely
  249. fl_comp = self._group_flush(self.completed, 'Completed')
  250. fl_dead = self._group_flush(self.dead, 'Dead')
  251. if not (fl_comp or fl_dead):
  252. print('No jobs to flush.')
  253. def result(self,num):
  254. """result(N) -> return the result of job N."""
  255. try:
  256. return self.all[num].result
  257. except KeyError:
  258. error('Job #%s not found' % num)
  259. def _traceback(self, job):
  260. num = job if isinstance(job, int) else job.num
  261. try:
  262. self.all[num].traceback()
  263. except KeyError:
  264. error('Job #%s not found' % num)
  265. def traceback(self, job=None):
  266. if job is None:
  267. self._update_status()
  268. for deadjob in self.dead:
  269. print("Traceback for: %r" % deadjob)
  270. self._traceback(deadjob)
  271. print()
  272. else:
  273. self._traceback(job)
  274. class BackgroundJobBase(threading.Thread):
  275. """Base class to build BackgroundJob classes.
  276. The derived classes must implement:
  277. - Their own __init__, since the one here raises NotImplementedError. The
  278. derived constructor must call self._init() at the end, to provide common
  279. initialization.
  280. - A strform attribute used in calls to __str__.
  281. - A call() method, which will make the actual execution call and must
  282. return a value to be held in the 'result' field of the job object.
  283. """
  284. # Class constants for status, in string and as numerical codes (when
  285. # updating jobs lists, we don't want to do string comparisons). This will
  286. # be done at every user prompt, so it has to be as fast as possible
  287. stat_created = 'Created'; stat_created_c = 0
  288. stat_running = 'Running'; stat_running_c = 1
  289. stat_completed = 'Completed'; stat_completed_c = 2
  290. stat_dead = 'Dead (Exception), call jobs.traceback() for details'
  291. stat_dead_c = -1
  292. def __init__(self):
  293. """Must be implemented in subclasses.
  294. Subclasses must call :meth:`_init` for standard initialisation.
  295. """
  296. raise NotImplementedError("This class can not be instantiated directly.")
  297. def _init(self):
  298. """Common initialization for all BackgroundJob objects"""
  299. for attr in ['call','strform']:
  300. assert hasattr(self,attr), "Missing attribute <%s>" % attr
  301. # The num tag can be set by an external job manager
  302. self.num = None
  303. self.status = BackgroundJobBase.stat_created
  304. self.stat_code = BackgroundJobBase.stat_created_c
  305. self.finished = False
  306. self.result = '<BackgroundJob has not completed>'
  307. # reuse the ipython traceback handler if we can get to it, otherwise
  308. # make a new one
  309. try:
  310. make_tb = get_ipython().InteractiveTB.text
  311. except:
  312. make_tb = AutoFormattedTB(mode = 'Context',
  313. color_scheme='NoColor',
  314. tb_offset = 1).text
  315. # Note that the actual API for text() requires the three args to be
  316. # passed in, so we wrap it in a simple lambda.
  317. self._make_tb = lambda : make_tb(None, None, None)
  318. # Hold a formatted traceback if one is generated.
  319. self._tb = None
  320. threading.Thread.__init__(self)
  321. def __str__(self):
  322. return self.strform
  323. def __repr__(self):
  324. return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
  325. def traceback(self):
  326. print(self._tb)
  327. def run(self):
  328. try:
  329. self.status = BackgroundJobBase.stat_running
  330. self.stat_code = BackgroundJobBase.stat_running_c
  331. self.result = self.call()
  332. except:
  333. self.status = BackgroundJobBase.stat_dead
  334. self.stat_code = BackgroundJobBase.stat_dead_c
  335. self.finished = None
  336. self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
  337. self._tb = self._make_tb()
  338. else:
  339. self.status = BackgroundJobBase.stat_completed
  340. self.stat_code = BackgroundJobBase.stat_completed_c
  341. self.finished = True
  342. class BackgroundJobExpr(BackgroundJobBase):
  343. """Evaluate an expression as a background job (uses a separate thread)."""
  344. def __init__(self, expression, glob=None, loc=None):
  345. """Create a new job from a string which can be fed to eval().
  346. global/locals dicts can be provided, which will be passed to the eval
  347. call."""
  348. # fail immediately if the given expression can't be compiled
  349. self.code = compile(expression,'<BackgroundJob compilation>','eval')
  350. glob = {} if glob is None else glob
  351. loc = {} if loc is None else loc
  352. self.expression = self.strform = expression
  353. self.glob = glob
  354. self.loc = loc
  355. self._init()
  356. def call(self):
  357. return eval(self.code,self.glob,self.loc)
  358. class BackgroundJobFunc(BackgroundJobBase):
  359. """Run a function call as a background job (uses a separate thread)."""
  360. def __init__(self, func, *args, **kwargs):
  361. """Create a new job from a callable object.
  362. Any positional arguments and keyword args given to this constructor
  363. after the initial callable are passed directly to it."""
  364. if not callable(func):
  365. raise TypeError(
  366. 'first argument to BackgroundJobFunc must be callable')
  367. self.func = func
  368. self.args = args
  369. self.kwargs = kwargs
  370. # The string form will only include the function passed, because
  371. # generating string representations of the arguments is a potentially
  372. # _very_ expensive operation (e.g. with large arrays).
  373. self.strform = str(func)
  374. self._init()
  375. def call(self):
  376. return self.func(*self.args, **self.kwargs)