pool.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = ['Pool', 'ThreadPool']
  10. #
  11. # Imports
  12. #
  13. import collections
  14. import itertools
  15. import os
  16. import queue
  17. import threading
  18. import time
  19. import traceback
  20. import types
  21. import warnings
  22. # If threading is available then ThreadPool should be provided. Therefore
  23. # we avoid top-level imports which are liable to fail on some systems.
  24. from . import util
  25. from . import get_context, TimeoutError
  26. from .connection import wait
  27. #
  28. # Constants representing the state of a pool
  29. #
  30. INIT = "INIT"
  31. RUN = "RUN"
  32. CLOSE = "CLOSE"
  33. TERMINATE = "TERMINATE"
  34. #
  35. # Miscellaneous
  36. #
  37. job_counter = itertools.count()
  38. def mapstar(args):
  39. return list(map(*args))
  40. def starmapstar(args):
  41. return list(itertools.starmap(args[0], args[1]))
  42. #
  43. # Hack to embed stringification of remote traceback in local traceback
  44. #
  45. class RemoteTraceback(Exception):
  46. def __init__(self, tb):
  47. self.tb = tb
  48. def __str__(self):
  49. return self.tb
  50. class ExceptionWithTraceback:
  51. def __init__(self, exc, tb):
  52. tb = traceback.format_exception(type(exc), exc, tb)
  53. tb = ''.join(tb)
  54. self.exc = exc
  55. self.tb = '\n"""\n%s"""' % tb
  56. def __reduce__(self):
  57. return rebuild_exc, (self.exc, self.tb)
  58. def rebuild_exc(exc, tb):
  59. exc.__cause__ = RemoteTraceback(tb)
  60. return exc
  61. #
  62. # Code run by worker processes
  63. #
  64. class MaybeEncodingError(Exception):
  65. """Wraps possible unpickleable errors, so they can be
  66. safely sent through the socket."""
  67. def __init__(self, exc, value):
  68. self.exc = repr(exc)
  69. self.value = repr(value)
  70. super(MaybeEncodingError, self).__init__(self.exc, self.value)
  71. def __str__(self):
  72. return "Error sending result: '%s'. Reason: '%s'" % (self.value,
  73. self.exc)
  74. def __repr__(self):
  75. return "<%s: %s>" % (self.__class__.__name__, self)
  76. def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
  77. wrap_exception=False):
  78. if (maxtasks is not None) and not (isinstance(maxtasks, int)
  79. and maxtasks >= 1):
  80. raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
  81. put = outqueue.put
  82. get = inqueue.get
  83. if hasattr(inqueue, '_writer'):
  84. inqueue._writer.close()
  85. outqueue._reader.close()
  86. if initializer is not None:
  87. initializer(*initargs)
  88. completed = 0
  89. while maxtasks is None or (maxtasks and completed < maxtasks):
  90. try:
  91. task = get()
  92. except (EOFError, OSError):
  93. util.debug('worker got EOFError or OSError -- exiting')
  94. break
  95. if task is None:
  96. util.debug('worker got sentinel -- exiting')
  97. break
  98. job, i, func, args, kwds = task
  99. try:
  100. result = (True, func(*args, **kwds))
  101. except Exception as e:
  102. if wrap_exception and func is not _helper_reraises_exception:
  103. e = ExceptionWithTraceback(e, e.__traceback__)
  104. result = (False, e)
  105. try:
  106. put((job, i, result))
  107. except Exception as e:
  108. wrapped = MaybeEncodingError(e, result[1])
  109. util.debug("Possible encoding error while sending result: %s" % (
  110. wrapped))
  111. put((job, i, (False, wrapped)))
  112. task = job = result = func = args = kwds = None
  113. completed += 1
  114. util.debug('worker exiting after %d tasks' % completed)
  115. def _helper_reraises_exception(ex):
  116. 'Pickle-able helper function for use by _guarded_task_generation.'
  117. raise ex
  118. #
  119. # Class representing a process pool
  120. #
  121. class _PoolCache(dict):
  122. """
  123. Class that implements a cache for the Pool class that will notify
  124. the pool management threads every time the cache is emptied. The
  125. notification is done by the use of a queue that is provided when
  126. instantiating the cache.
  127. """
  128. def __init__(self, /, *args, notifier=None, **kwds):
  129. self.notifier = notifier
  130. super().__init__(*args, **kwds)
  131. def __delitem__(self, item):
  132. super().__delitem__(item)
  133. # Notify that the cache is empty. This is important because the
  134. # pool keeps maintaining workers until the cache gets drained. This
  135. # eliminates a race condition in which a task is finished after the
  136. # the pool's _handle_workers method has enter another iteration of the
  137. # loop. In this situation, the only event that can wake up the pool
  138. # is the cache to be emptied (no more tasks available).
  139. if not self:
  140. self.notifier.put(None)
  141. class Pool(object):
  142. '''
  143. Class which supports an async version of applying functions to arguments.
  144. '''
  145. _wrap_exception = True
  146. @staticmethod
  147. def Process(ctx, *args, **kwds):
  148. return ctx.Process(*args, **kwds)
  149. def __init__(self, processes=None, initializer=None, initargs=(),
  150. maxtasksperchild=None, context=None):
  151. # Attributes initialized early to make sure that they exist in
  152. # __del__() if __init__() raises an exception
  153. self._pool = []
  154. self._state = INIT
  155. self._ctx = context or get_context()
  156. self._setup_queues()
  157. self._taskqueue = queue.SimpleQueue()
  158. # The _change_notifier queue exist to wake up self._handle_workers()
  159. # when the cache (self._cache) is empty or when there is a change in
  160. # the _state variable of the thread that runs _handle_workers.
  161. self._change_notifier = self._ctx.SimpleQueue()
  162. self._cache = _PoolCache(notifier=self._change_notifier)
  163. self._maxtasksperchild = maxtasksperchild
  164. self._initializer = initializer
  165. self._initargs = initargs
  166. if processes is None:
  167. processes = os.cpu_count() or 1
  168. if processes < 1:
  169. raise ValueError("Number of processes must be at least 1")
  170. if maxtasksperchild is not None:
  171. if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
  172. raise ValueError("maxtasksperchild must be a positive int or None")
  173. if initializer is not None and not callable(initializer):
  174. raise TypeError('initializer must be a callable')
  175. self._processes = processes
  176. try:
  177. self._repopulate_pool()
  178. except Exception:
  179. for p in self._pool:
  180. if p.exitcode is None:
  181. p.terminate()
  182. for p in self._pool:
  183. p.join()
  184. raise
  185. sentinels = self._get_sentinels()
  186. self._worker_handler = threading.Thread(
  187. target=Pool._handle_workers,
  188. args=(self._cache, self._taskqueue, self._ctx, self.Process,
  189. self._processes, self._pool, self._inqueue, self._outqueue,
  190. self._initializer, self._initargs, self._maxtasksperchild,
  191. self._wrap_exception, sentinels, self._change_notifier)
  192. )
  193. self._worker_handler.daemon = True
  194. self._worker_handler._state = RUN
  195. self._worker_handler.start()
  196. self._task_handler = threading.Thread(
  197. target=Pool._handle_tasks,
  198. args=(self._taskqueue, self._quick_put, self._outqueue,
  199. self._pool, self._cache)
  200. )
  201. self._task_handler.daemon = True
  202. self._task_handler._state = RUN
  203. self._task_handler.start()
  204. self._result_handler = threading.Thread(
  205. target=Pool._handle_results,
  206. args=(self._outqueue, self._quick_get, self._cache)
  207. )
  208. self._result_handler.daemon = True
  209. self._result_handler._state = RUN
  210. self._result_handler.start()
  211. self._terminate = util.Finalize(
  212. self, self._terminate_pool,
  213. args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
  214. self._change_notifier, self._worker_handler, self._task_handler,
  215. self._result_handler, self._cache),
  216. exitpriority=15
  217. )
  218. self._state = RUN
  219. # Copy globals as function locals to make sure that they are available
  220. # during Python shutdown when the Pool is destroyed.
  221. def __del__(self, _warn=warnings.warn, RUN=RUN):
  222. if self._state == RUN:
  223. _warn(f"unclosed running multiprocessing pool {self!r}",
  224. ResourceWarning, source=self)
  225. if getattr(self, '_change_notifier', None) is not None:
  226. self._change_notifier.put(None)
  227. def __repr__(self):
  228. cls = self.__class__
  229. return (f'<{cls.__module__}.{cls.__qualname__} '
  230. f'state={self._state} '
  231. f'pool_size={len(self._pool)}>')
  232. def _get_sentinels(self):
  233. task_queue_sentinels = [self._outqueue._reader]
  234. self_notifier_sentinels = [self._change_notifier._reader]
  235. return [*task_queue_sentinels, *self_notifier_sentinels]
  236. @staticmethod
  237. def _get_worker_sentinels(workers):
  238. return [worker.sentinel for worker in
  239. workers if hasattr(worker, "sentinel")]
  240. @staticmethod
  241. def _join_exited_workers(pool):
  242. """Cleanup after any worker processes which have exited due to reaching
  243. their specified lifetime. Returns True if any workers were cleaned up.
  244. """
  245. cleaned = False
  246. for i in reversed(range(len(pool))):
  247. worker = pool[i]
  248. if worker.exitcode is not None:
  249. # worker exited
  250. util.debug('cleaning up worker %d' % i)
  251. worker.join()
  252. cleaned = True
  253. del pool[i]
  254. return cleaned
  255. def _repopulate_pool(self):
  256. return self._repopulate_pool_static(self._ctx, self.Process,
  257. self._processes,
  258. self._pool, self._inqueue,
  259. self._outqueue, self._initializer,
  260. self._initargs,
  261. self._maxtasksperchild,
  262. self._wrap_exception)
  263. @staticmethod
  264. def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
  265. outqueue, initializer, initargs,
  266. maxtasksperchild, wrap_exception):
  267. """Bring the number of pool processes up to the specified number,
  268. for use after reaping workers which have exited.
  269. """
  270. for i in range(processes - len(pool)):
  271. w = Process(ctx, target=worker,
  272. args=(inqueue, outqueue,
  273. initializer,
  274. initargs, maxtasksperchild,
  275. wrap_exception))
  276. w.name = w.name.replace('Process', 'PoolWorker')
  277. w.daemon = True
  278. w.start()
  279. pool.append(w)
  280. util.debug('added worker')
  281. @staticmethod
  282. def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
  283. initializer, initargs, maxtasksperchild,
  284. wrap_exception):
  285. """Clean up any exited workers and start replacements for them.
  286. """
  287. if Pool._join_exited_workers(pool):
  288. Pool._repopulate_pool_static(ctx, Process, processes, pool,
  289. inqueue, outqueue, initializer,
  290. initargs, maxtasksperchild,
  291. wrap_exception)
  292. def _setup_queues(self):
  293. self._inqueue = self._ctx.SimpleQueue()
  294. self._outqueue = self._ctx.SimpleQueue()
  295. self._quick_put = self._inqueue._writer.send
  296. self._quick_get = self._outqueue._reader.recv
  297. def _check_running(self):
  298. if self._state != RUN:
  299. raise ValueError("Pool not running")
  300. def apply(self, func, args=(), kwds={}):
  301. '''
  302. Equivalent of `func(*args, **kwds)`.
  303. Pool must be running.
  304. '''
  305. return self.apply_async(func, args, kwds).get()
  306. def map(self, func, iterable, chunksize=None):
  307. '''
  308. Apply `func` to each element in `iterable`, collecting the results
  309. in a list that is returned.
  310. '''
  311. return self._map_async(func, iterable, mapstar, chunksize).get()
  312. def starmap(self, func, iterable, chunksize=None):
  313. '''
  314. Like `map()` method but the elements of the `iterable` are expected to
  315. be iterables as well and will be unpacked as arguments. Hence
  316. `func` and (a, b) becomes func(a, b).
  317. '''
  318. return self._map_async(func, iterable, starmapstar, chunksize).get()
  319. def starmap_async(self, func, iterable, chunksize=None, callback=None,
  320. error_callback=None):
  321. '''
  322. Asynchronous version of `starmap()` method.
  323. '''
  324. return self._map_async(func, iterable, starmapstar, chunksize,
  325. callback, error_callback)
  326. def _guarded_task_generation(self, result_job, func, iterable):
  327. '''Provides a generator of tasks for imap and imap_unordered with
  328. appropriate handling for iterables which throw exceptions during
  329. iteration.'''
  330. try:
  331. i = -1
  332. for i, x in enumerate(iterable):
  333. yield (result_job, i, func, (x,), {})
  334. except Exception as e:
  335. yield (result_job, i+1, _helper_reraises_exception, (e,), {})
  336. def imap(self, func, iterable, chunksize=1):
  337. '''
  338. Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
  339. '''
  340. self._check_running()
  341. if chunksize == 1:
  342. result = IMapIterator(self)
  343. self._taskqueue.put(
  344. (
  345. self._guarded_task_generation(result._job, func, iterable),
  346. result._set_length
  347. ))
  348. return result
  349. else:
  350. if chunksize < 1:
  351. raise ValueError(
  352. "Chunksize must be 1+, not {0:n}".format(
  353. chunksize))
  354. task_batches = Pool._get_tasks(func, iterable, chunksize)
  355. result = IMapIterator(self)
  356. self._taskqueue.put(
  357. (
  358. self._guarded_task_generation(result._job,
  359. mapstar,
  360. task_batches),
  361. result._set_length
  362. ))
  363. return (item for chunk in result for item in chunk)
  364. def imap_unordered(self, func, iterable, chunksize=1):
  365. '''
  366. Like `imap()` method but ordering of results is arbitrary.
  367. '''
  368. self._check_running()
  369. if chunksize == 1:
  370. result = IMapUnorderedIterator(self)
  371. self._taskqueue.put(
  372. (
  373. self._guarded_task_generation(result._job, func, iterable),
  374. result._set_length
  375. ))
  376. return result
  377. else:
  378. if chunksize < 1:
  379. raise ValueError(
  380. "Chunksize must be 1+, not {0!r}".format(chunksize))
  381. task_batches = Pool._get_tasks(func, iterable, chunksize)
  382. result = IMapUnorderedIterator(self)
  383. self._taskqueue.put(
  384. (
  385. self._guarded_task_generation(result._job,
  386. mapstar,
  387. task_batches),
  388. result._set_length
  389. ))
  390. return (item for chunk in result for item in chunk)
  391. def apply_async(self, func, args=(), kwds={}, callback=None,
  392. error_callback=None):
  393. '''
  394. Asynchronous version of `apply()` method.
  395. '''
  396. self._check_running()
  397. result = ApplyResult(self, callback, error_callback)
  398. self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
  399. return result
  400. def map_async(self, func, iterable, chunksize=None, callback=None,
  401. error_callback=None):
  402. '''
  403. Asynchronous version of `map()` method.
  404. '''
  405. return self._map_async(func, iterable, mapstar, chunksize, callback,
  406. error_callback)
  407. def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
  408. error_callback=None):
  409. '''
  410. Helper function to implement map, starmap and their async counterparts.
  411. '''
  412. self._check_running()
  413. if not hasattr(iterable, '__len__'):
  414. iterable = list(iterable)
  415. if chunksize is None:
  416. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  417. if extra:
  418. chunksize += 1
  419. if len(iterable) == 0:
  420. chunksize = 0
  421. task_batches = Pool._get_tasks(func, iterable, chunksize)
  422. result = MapResult(self, chunksize, len(iterable), callback,
  423. error_callback=error_callback)
  424. self._taskqueue.put(
  425. (
  426. self._guarded_task_generation(result._job,
  427. mapper,
  428. task_batches),
  429. None
  430. )
  431. )
  432. return result
  433. @staticmethod
  434. def _wait_for_updates(sentinels, change_notifier, timeout=None):
  435. wait(sentinels, timeout=timeout)
  436. while not change_notifier.empty():
  437. change_notifier.get()
  438. @classmethod
  439. def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
  440. pool, inqueue, outqueue, initializer, initargs,
  441. maxtasksperchild, wrap_exception, sentinels,
  442. change_notifier):
  443. thread = threading.current_thread()
  444. # Keep maintaining workers until the cache gets drained, unless the pool
  445. # is terminated.
  446. while thread._state == RUN or (cache and thread._state != TERMINATE):
  447. cls._maintain_pool(ctx, Process, processes, pool, inqueue,
  448. outqueue, initializer, initargs,
  449. maxtasksperchild, wrap_exception)
  450. current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
  451. cls._wait_for_updates(current_sentinels, change_notifier)
  452. # send sentinel to stop workers
  453. taskqueue.put(None)
  454. util.debug('worker handler exiting')
  455. @staticmethod
  456. def _handle_tasks(taskqueue, put, outqueue, pool, cache):
  457. thread = threading.current_thread()
  458. for taskseq, set_length in iter(taskqueue.get, None):
  459. task = None
  460. try:
  461. # iterating taskseq cannot fail
  462. for task in taskseq:
  463. if thread._state != RUN:
  464. util.debug('task handler found thread._state != RUN')
  465. break
  466. try:
  467. put(task)
  468. except Exception as e:
  469. job, idx = task[:2]
  470. try:
  471. cache[job]._set(idx, (False, e))
  472. except KeyError:
  473. pass
  474. else:
  475. if set_length:
  476. util.debug('doing set_length()')
  477. idx = task[1] if task else -1
  478. set_length(idx + 1)
  479. continue
  480. break
  481. finally:
  482. task = taskseq = job = None
  483. else:
  484. util.debug('task handler got sentinel')
  485. try:
  486. # tell result handler to finish when cache is empty
  487. util.debug('task handler sending sentinel to result handler')
  488. outqueue.put(None)
  489. # tell workers there is no more work
  490. util.debug('task handler sending sentinel to workers')
  491. for p in pool:
  492. put(None)
  493. except OSError:
  494. util.debug('task handler got OSError when sending sentinels')
  495. util.debug('task handler exiting')
  496. @staticmethod
  497. def _handle_results(outqueue, get, cache):
  498. thread = threading.current_thread()
  499. while 1:
  500. try:
  501. task = get()
  502. except (OSError, EOFError):
  503. util.debug('result handler got EOFError/OSError -- exiting')
  504. return
  505. if thread._state != RUN:
  506. assert thread._state == TERMINATE, "Thread not in TERMINATE"
  507. util.debug('result handler found thread._state=TERMINATE')
  508. break
  509. if task is None:
  510. util.debug('result handler got sentinel')
  511. break
  512. job, i, obj = task
  513. try:
  514. cache[job]._set(i, obj)
  515. except KeyError:
  516. pass
  517. task = job = obj = None
  518. while cache and thread._state != TERMINATE:
  519. try:
  520. task = get()
  521. except (OSError, EOFError):
  522. util.debug('result handler got EOFError/OSError -- exiting')
  523. return
  524. if task is None:
  525. util.debug('result handler ignoring extra sentinel')
  526. continue
  527. job, i, obj = task
  528. try:
  529. cache[job]._set(i, obj)
  530. except KeyError:
  531. pass
  532. task = job = obj = None
  533. if hasattr(outqueue, '_reader'):
  534. util.debug('ensuring that outqueue is not full')
  535. # If we don't make room available in outqueue then
  536. # attempts to add the sentinel (None) to outqueue may
  537. # block. There is guaranteed to be no more than 2 sentinels.
  538. try:
  539. for i in range(10):
  540. if not outqueue._reader.poll():
  541. break
  542. get()
  543. except (OSError, EOFError):
  544. pass
  545. util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
  546. len(cache), thread._state)
  547. @staticmethod
  548. def _get_tasks(func, it, size):
  549. it = iter(it)
  550. while 1:
  551. x = tuple(itertools.islice(it, size))
  552. if not x:
  553. return
  554. yield (func, x)
  555. def __reduce__(self):
  556. raise NotImplementedError(
  557. 'pool objects cannot be passed between processes or pickled'
  558. )
  559. def close(self):
  560. util.debug('closing pool')
  561. if self._state == RUN:
  562. self._state = CLOSE
  563. self._worker_handler._state = CLOSE
  564. self._change_notifier.put(None)
  565. def terminate(self):
  566. util.debug('terminating pool')
  567. self._state = TERMINATE
  568. self._terminate()
  569. def join(self):
  570. util.debug('joining pool')
  571. if self._state == RUN:
  572. raise ValueError("Pool is still running")
  573. elif self._state not in (CLOSE, TERMINATE):
  574. raise ValueError("In unknown state")
  575. self._worker_handler.join()
  576. self._task_handler.join()
  577. self._result_handler.join()
  578. for p in self._pool:
  579. p.join()
  580. @staticmethod
  581. def _help_stuff_finish(inqueue, task_handler, size):
  582. # task_handler may be blocked trying to put items on inqueue
  583. util.debug('removing tasks from inqueue until task handler finished')
  584. inqueue._rlock.acquire()
  585. while task_handler.is_alive() and inqueue._reader.poll():
  586. inqueue._reader.recv()
  587. time.sleep(0)
  588. @classmethod
  589. def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
  590. worker_handler, task_handler, result_handler, cache):
  591. # this is guaranteed to only be called once
  592. util.debug('finalizing pool')
  593. # Notify that the worker_handler state has been changed so the
  594. # _handle_workers loop can be unblocked (and exited) in order to
  595. # send the finalization sentinel all the workers.
  596. worker_handler._state = TERMINATE
  597. change_notifier.put(None)
  598. task_handler._state = TERMINATE
  599. util.debug('helping task handler/workers to finish')
  600. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  601. if (not result_handler.is_alive()) and (len(cache) != 0):
  602. raise AssertionError(
  603. "Cannot have cache with result_handler not alive")
  604. result_handler._state = TERMINATE
  605. change_notifier.put(None)
  606. outqueue.put(None) # sentinel
  607. # We must wait for the worker handler to exit before terminating
  608. # workers because we don't want workers to be restarted behind our back.
  609. util.debug('joining worker handler')
  610. if threading.current_thread() is not worker_handler:
  611. worker_handler.join()
  612. # Terminate workers which haven't already finished.
  613. if pool and hasattr(pool[0], 'terminate'):
  614. util.debug('terminating workers')
  615. for p in pool:
  616. if p.exitcode is None:
  617. p.terminate()
  618. util.debug('joining task handler')
  619. if threading.current_thread() is not task_handler:
  620. task_handler.join()
  621. util.debug('joining result handler')
  622. if threading.current_thread() is not result_handler:
  623. result_handler.join()
  624. if pool and hasattr(pool[0], 'terminate'):
  625. util.debug('joining pool workers')
  626. for p in pool:
  627. if p.is_alive():
  628. # worker has not yet exited
  629. util.debug('cleaning up worker %d' % p.pid)
  630. p.join()
  631. def __enter__(self):
  632. self._check_running()
  633. return self
  634. def __exit__(self, exc_type, exc_val, exc_tb):
  635. self.terminate()
  636. #
  637. # Class whose instances are returned by `Pool.apply_async()`
  638. #
  639. class ApplyResult(object):
  640. def __init__(self, pool, callback, error_callback):
  641. self._pool = pool
  642. self._event = threading.Event()
  643. self._job = next(job_counter)
  644. self._cache = pool._cache
  645. self._callback = callback
  646. self._error_callback = error_callback
  647. self._cache[self._job] = self
  648. def ready(self):
  649. return self._event.is_set()
  650. def successful(self):
  651. if not self.ready():
  652. raise ValueError("{0!r} not ready".format(self))
  653. return self._success
  654. def wait(self, timeout=None):
  655. self._event.wait(timeout)
  656. def get(self, timeout=None):
  657. self.wait(timeout)
  658. if not self.ready():
  659. raise TimeoutError
  660. if self._success:
  661. return self._value
  662. else:
  663. raise self._value
  664. def _set(self, i, obj):
  665. self._success, self._value = obj
  666. if self._callback and self._success:
  667. self._callback(self._value)
  668. if self._error_callback and not self._success:
  669. self._error_callback(self._value)
  670. self._event.set()
  671. del self._cache[self._job]
  672. self._pool = None
  673. __class_getitem__ = classmethod(types.GenericAlias)
  674. AsyncResult = ApplyResult # create alias -- see #17805
  675. #
  676. # Class whose instances are returned by `Pool.map_async()`
  677. #
  678. class MapResult(ApplyResult):
  679. def __init__(self, pool, chunksize, length, callback, error_callback):
  680. ApplyResult.__init__(self, pool, callback,
  681. error_callback=error_callback)
  682. self._success = True
  683. self._value = [None] * length
  684. self._chunksize = chunksize
  685. if chunksize <= 0:
  686. self._number_left = 0
  687. self._event.set()
  688. del self._cache[self._job]
  689. else:
  690. self._number_left = length//chunksize + bool(length % chunksize)
  691. def _set(self, i, success_result):
  692. self._number_left -= 1
  693. success, result = success_result
  694. if success and self._success:
  695. self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  696. if self._number_left == 0:
  697. if self._callback:
  698. self._callback(self._value)
  699. del self._cache[self._job]
  700. self._event.set()
  701. self._pool = None
  702. else:
  703. if not success and self._success:
  704. # only store first exception
  705. self._success = False
  706. self._value = result
  707. if self._number_left == 0:
  708. # only consider the result ready once all jobs are done
  709. if self._error_callback:
  710. self._error_callback(self._value)
  711. del self._cache[self._job]
  712. self._event.set()
  713. self._pool = None
  714. #
  715. # Class whose instances are returned by `Pool.imap()`
  716. #
  717. class IMapIterator(object):
  718. def __init__(self, pool):
  719. self._pool = pool
  720. self._cond = threading.Condition(threading.Lock())
  721. self._job = next(job_counter)
  722. self._cache = pool._cache
  723. self._items = collections.deque()
  724. self._index = 0
  725. self._length = None
  726. self._unsorted = {}
  727. self._cache[self._job] = self
  728. def __iter__(self):
  729. return self
  730. def next(self, timeout=None):
  731. with self._cond:
  732. try:
  733. item = self._items.popleft()
  734. except IndexError:
  735. if self._index == self._length:
  736. self._pool = None
  737. raise StopIteration from None
  738. self._cond.wait(timeout)
  739. try:
  740. item = self._items.popleft()
  741. except IndexError:
  742. if self._index == self._length:
  743. self._pool = None
  744. raise StopIteration from None
  745. raise TimeoutError from None
  746. success, value = item
  747. if success:
  748. return value
  749. raise value
  750. __next__ = next # XXX
  751. def _set(self, i, obj):
  752. with self._cond:
  753. if self._index == i:
  754. self._items.append(obj)
  755. self._index += 1
  756. while self._index in self._unsorted:
  757. obj = self._unsorted.pop(self._index)
  758. self._items.append(obj)
  759. self._index += 1
  760. self._cond.notify()
  761. else:
  762. self._unsorted[i] = obj
  763. if self._index == self._length:
  764. del self._cache[self._job]
  765. self._pool = None
  766. def _set_length(self, length):
  767. with self._cond:
  768. self._length = length
  769. if self._index == self._length:
  770. self._cond.notify()
  771. del self._cache[self._job]
  772. self._pool = None
  773. #
  774. # Class whose instances are returned by `Pool.imap_unordered()`
  775. #
  776. class IMapUnorderedIterator(IMapIterator):
  777. def _set(self, i, obj):
  778. with self._cond:
  779. self._items.append(obj)
  780. self._index += 1
  781. self._cond.notify()
  782. if self._index == self._length:
  783. del self._cache[self._job]
  784. self._pool = None
  785. #
  786. #
  787. #
  788. class ThreadPool(Pool):
  789. _wrap_exception = False
  790. @staticmethod
  791. def Process(ctx, *args, **kwds):
  792. from .dummy import Process
  793. return Process(*args, **kwds)
  794. def __init__(self, processes=None, initializer=None, initargs=()):
  795. Pool.__init__(self, processes, initializer, initargs)
  796. def _setup_queues(self):
  797. self._inqueue = queue.SimpleQueue()
  798. self._outqueue = queue.SimpleQueue()
  799. self._quick_put = self._inqueue.put
  800. self._quick_get = self._outqueue.get
  801. def _get_sentinels(self):
  802. return [self._change_notifier._reader]
  803. @staticmethod
  804. def _get_worker_sentinels(workers):
  805. return []
  806. @staticmethod
  807. def _help_stuff_finish(inqueue, task_handler, size):
  808. # drain inqueue, and put sentinels at its head to make workers finish
  809. try:
  810. while True:
  811. inqueue.get(block=False)
  812. except queue.Empty:
  813. pass
  814. for i in range(size):
  815. inqueue.put(None)
  816. def _wait_for_updates(self, sentinels, change_notifier, timeout):
  817. time.sleep(timeout)