123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065 |
- """Support for tasks, coroutines and the scheduler."""
- __all__ = (
- 'Task', 'create_task',
- 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
- 'wait', 'wait_for', 'as_completed', 'sleep',
- 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
- 'current_task', 'all_tasks',
- 'create_eager_task_factory', 'eager_task_factory',
- '_register_task', '_unregister_task', '_enter_task', '_leave_task',
- )
- import concurrent.futures
- import contextvars
- import functools
- import inspect
- import itertools
- import types
- import warnings
- import weakref
- from types import GenericAlias
- from . import base_tasks
- from . import coroutines
- from . import events
- from . import exceptions
- from . import futures
- from . import timeouts
- # Helper to generate new task names
- # This uses itertools.count() instead of a "+= 1" operation because the latter
- # is not thread safe. See bpo-11866 for a longer explanation.
- _task_name_counter = itertools.count(1).__next__
- def current_task(loop=None):
- """Return a currently executed task."""
- if loop is None:
- loop = events.get_running_loop()
- return _current_tasks.get(loop)
- def all_tasks(loop=None):
- """Return a set of all tasks for the loop."""
- if loop is None:
- loop = events.get_running_loop()
- # capturing the set of eager tasks first, so if an eager task "graduates"
- # to a regular task in another thread, we don't risk missing it.
- eager_tasks = list(_eager_tasks)
- # Looping over the WeakSet isn't safe as it can be updated from another
- # thread, therefore we cast it to list prior to filtering. The list cast
- # itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur).
- # See issues 34970 and 36607 for details.
- scheduled_tasks = None
- i = 0
- while True:
- try:
- scheduled_tasks = list(_scheduled_tasks)
- except RuntimeError:
- i += 1
- if i >= 1000:
- raise
- else:
- break
- return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
- if futures._get_loop(t) is loop and not t.done()}
- def _set_task_name(task, name):
- if name is not None:
- try:
- set_name = task.set_name
- except AttributeError:
- warnings.warn("Task.set_name() was added in Python 3.8, "
- "the method support will be mandatory for third-party "
- "task implementations since 3.13.",
- DeprecationWarning, stacklevel=3)
- else:
- set_name(name)
- class Task(futures._PyFuture): # Inherit Python Task implementation
- # from a Python Future implementation.
- """A coroutine wrapped in a Future."""
- # An important invariant maintained while a Task not done:
- # _fut_waiter is either None or a Future. The Future
- # can be either done() or not done().
- # The task can be in any of 3 states:
- #
- # - 1: _fut_waiter is not None and not _fut_waiter.done():
- # __step() is *not* scheduled and the Task is waiting for _fut_waiter.
- # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled:
- # the Task is waiting for __step() to be executed.
- # - 3: _fut_waiter is None and __step() is *not* scheduled:
- # the Task is currently executing (in __step()).
- #
- # * In state 1, one of the callbacks of __fut_waiter must be __wakeup().
- # * The transition from 1 to 2 happens when _fut_waiter becomes done(),
- # as it schedules __wakeup() to be called (which calls __step() so
- # we way that __step() is scheduled).
- # * It transitions from 2 to 3 when __step() is executed, and it clears
- # _fut_waiter to None.
- # If False, don't log a message if the task is destroyed while its
- # status is still pending
- _log_destroy_pending = True
- def __init__(self, coro, *, loop=None, name=None, context=None,
- eager_start=False):
- super().__init__(loop=loop)
- if self._source_traceback:
- del self._source_traceback[-1]
- if not coroutines.iscoroutine(coro):
- # raise after Future.__init__(), attrs are required for __del__
- # prevent logging for pending task in __del__
- self._log_destroy_pending = False
- raise TypeError(f"a coroutine was expected, got {coro!r}")
- if name is None:
- self._name = f'Task-{_task_name_counter()}'
- else:
- self._name = str(name)
- self._num_cancels_requested = 0
- self._must_cancel = False
- self._fut_waiter = None
- self._coro = coro
- if context is None:
- self._context = contextvars.copy_context()
- else:
- self._context = context
- if eager_start and self._loop.is_running():
- self.__eager_start()
- else:
- self._loop.call_soon(self.__step, context=self._context)
- _register_task(self)
- def __del__(self):
- if self._state == futures._PENDING and self._log_destroy_pending:
- context = {
- 'task': self,
- 'message': 'Task was destroyed but it is pending!',
- }
- if self._source_traceback:
- context['source_traceback'] = self._source_traceback
- self._loop.call_exception_handler(context)
- super().__del__()
- __class_getitem__ = classmethod(GenericAlias)
- def __repr__(self):
- return base_tasks._task_repr(self)
- def get_coro(self):
- return self._coro
- def get_context(self):
- return self._context
- def get_name(self):
- return self._name
- def set_name(self, value):
- self._name = str(value)
- def set_result(self, result):
- raise RuntimeError('Task does not support set_result operation')
- def set_exception(self, exception):
- raise RuntimeError('Task does not support set_exception operation')
- def get_stack(self, *, limit=None):
- """Return the list of stack frames for this task's coroutine.
- If the coroutine is not done, this returns the stack where it is
- suspended. If the coroutine has completed successfully or was
- cancelled, this returns an empty list. If the coroutine was
- terminated by an exception, this returns the list of traceback
- frames.
- The frames are always ordered from oldest to newest.
- The optional limit gives the maximum number of frames to
- return; by default all available frames are returned. Its
- meaning differs depending on whether a stack or a traceback is
- returned: the newest frames of a stack are returned, but the
- oldest frames of a traceback are returned. (This matches the
- behavior of the traceback module.)
- For reasons beyond our control, only one stack frame is
- returned for a suspended coroutine.
- """
- return base_tasks._task_get_stack(self, limit)
- def print_stack(self, *, limit=None, file=None):
- """Print the stack or traceback for this task's coroutine.
- This produces output similar to that of the traceback module,
- for the frames retrieved by get_stack(). The limit argument
- is passed to get_stack(). The file argument is an I/O stream
- to which the output is written; by default output is written
- to sys.stderr.
- """
- return base_tasks._task_print_stack(self, limit, file)
- def cancel(self, msg=None):
- """Request that this task cancel itself.
- This arranges for a CancelledError to be thrown into the
- wrapped coroutine on the next cycle through the event loop.
- The coroutine then has a chance to clean up or even deny
- the request using try/except/finally.
- Unlike Future.cancel, this does not guarantee that the
- task will be cancelled: the exception might be caught and
- acted upon, delaying cancellation of the task or preventing
- cancellation completely. The task may also return a value or
- raise a different exception.
- Immediately after this method is called, Task.cancelled() will
- not return True (unless the task was already cancelled). A
- task will be marked as cancelled when the wrapped coroutine
- terminates with a CancelledError exception (even if cancel()
- was not called).
- This also increases the task's count of cancellation requests.
- """
- self._log_traceback = False
- if self.done():
- return False
- self._num_cancels_requested += 1
- # These two lines are controversial. See discussion starting at
- # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
- # Also remember that this is duplicated in _asynciomodule.c.
- # if self._num_cancels_requested > 1:
- # return False
- if self._fut_waiter is not None:
- if self._fut_waiter.cancel(msg=msg):
- # Leave self._fut_waiter; it may be a Task that
- # catches and ignores the cancellation so we may have
- # to cancel it again later.
- return True
- # It must be the case that self.__step is already scheduled.
- self._must_cancel = True
- self._cancel_message = msg
- return True
- def cancelling(self):
- """Return the count of the task's cancellation requests.
- This count is incremented when .cancel() is called
- and may be decremented using .uncancel().
- """
- return self._num_cancels_requested
- def uncancel(self):
- """Decrement the task's count of cancellation requests.
- This should be called by the party that called `cancel()` on the task
- beforehand.
- Returns the remaining number of cancellation requests.
- """
- if self._num_cancels_requested > 0:
- self._num_cancels_requested -= 1
- return self._num_cancels_requested
- def __eager_start(self):
- prev_task = _swap_current_task(self._loop, self)
- try:
- _register_eager_task(self)
- try:
- self._context.run(self.__step_run_and_handle_result, None)
- finally:
- _unregister_eager_task(self)
- finally:
- try:
- curtask = _swap_current_task(self._loop, prev_task)
- assert curtask is self
- finally:
- if self.done():
- self._coro = None
- self = None # Needed to break cycles when an exception occurs.
- else:
- _register_task(self)
- def __step(self, exc=None):
- if self.done():
- raise exceptions.InvalidStateError(
- f'_step(): already done: {self!r}, {exc!r}')
- if self._must_cancel:
- if not isinstance(exc, exceptions.CancelledError):
- exc = self._make_cancelled_error()
- self._must_cancel = False
- self._fut_waiter = None
- _enter_task(self._loop, self)
- try:
- self.__step_run_and_handle_result(exc)
- finally:
- _leave_task(self._loop, self)
- self = None # Needed to break cycles when an exception occurs.
- def __step_run_and_handle_result(self, exc):
- coro = self._coro
- try:
- if exc is None:
- # We use the `send` method directly, because coroutines
- # don't have `__iter__` and `__next__` methods.
- result = coro.send(None)
- else:
- result = coro.throw(exc)
- except StopIteration as exc:
- if self._must_cancel:
- # Task is cancelled right before coro stops.
- self._must_cancel = False
- super().cancel(msg=self._cancel_message)
- else:
- super().set_result(exc.value)
- except exceptions.CancelledError as exc:
- # Save the original exception so we can chain it later.
- self._cancelled_exc = exc
- super().cancel() # I.e., Future.cancel(self).
- except (KeyboardInterrupt, SystemExit) as exc:
- super().set_exception(exc)
- raise
- except BaseException as exc:
- super().set_exception(exc)
- else:
- blocking = getattr(result, '_asyncio_future_blocking', None)
- if blocking is not None:
- # Yielded Future must come from Future.__iter__().
- if futures._get_loop(result) is not self._loop:
- new_exc = RuntimeError(
- f'Task {self!r} got Future '
- f'{result!r} attached to a different loop')
- self._loop.call_soon(
- self.__step, new_exc, context=self._context)
- elif blocking:
- if result is self:
- new_exc = RuntimeError(
- f'Task cannot await on itself: {self!r}')
- self._loop.call_soon(
- self.__step, new_exc, context=self._context)
- else:
- result._asyncio_future_blocking = False
- result.add_done_callback(
- self.__wakeup, context=self._context)
- self._fut_waiter = result
- if self._must_cancel:
- if self._fut_waiter.cancel(
- msg=self._cancel_message):
- self._must_cancel = False
- else:
- new_exc = RuntimeError(
- f'yield was used instead of yield from '
- f'in task {self!r} with {result!r}')
- self._loop.call_soon(
- self.__step, new_exc, context=self._context)
- elif result is None:
- # Bare yield relinquishes control for one event loop iteration.
- self._loop.call_soon(self.__step, context=self._context)
- elif inspect.isgenerator(result):
- # Yielding a generator is just wrong.
- new_exc = RuntimeError(
- f'yield was used instead of yield from for '
- f'generator in task {self!r} with {result!r}')
- self._loop.call_soon(
- self.__step, new_exc, context=self._context)
- else:
- # Yielding something else is an error.
- new_exc = RuntimeError(f'Task got bad yield: {result!r}')
- self._loop.call_soon(
- self.__step, new_exc, context=self._context)
- finally:
- self = None # Needed to break cycles when an exception occurs.
- def __wakeup(self, future):
- try:
- future.result()
- except BaseException as exc:
- # This may also be a cancellation.
- self.__step(exc)
- else:
- # Don't pass the value of `future.result()` explicitly,
- # as `Future.__iter__` and `Future.__await__` don't need it.
- # If we call `_step(value, None)` instead of `_step()`,
- # Python eval loop would use `.send(value)` method call,
- # instead of `__next__()`, which is slower for futures
- # that return non-generator iterators from their `__iter__`.
- self.__step()
- self = None # Needed to break cycles when an exception occurs.
- _PyTask = Task
- try:
- import _asyncio
- except ImportError:
- pass
- else:
- # _CTask is needed for tests.
- Task = _CTask = _asyncio.Task
- def create_task(coro, *, name=None, context=None):
- """Schedule the execution of a coroutine object in a spawn task.
- Return a Task object.
- """
- loop = events.get_running_loop()
- if context is None:
- # Use legacy API if context is not needed
- task = loop.create_task(coro)
- else:
- task = loop.create_task(coro, context=context)
- _set_task_name(task, name)
- return task
- # wait() and as_completed() similar to those in PEP 3148.
- FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
- FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
- ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
- async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
- """Wait for the Futures or Tasks given by fs to complete.
- The fs iterable must not be empty.
- Coroutines will be wrapped in Tasks.
- Returns two sets of Future: (done, pending).
- Usage:
- done, pending = await asyncio.wait(fs)
- Note: This does not raise TimeoutError! Futures that aren't done
- when the timeout occurs are returned in the second set.
- """
- if futures.isfuture(fs) or coroutines.iscoroutine(fs):
- raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
- if not fs:
- raise ValueError('Set of Tasks/Futures is empty.')
- if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
- raise ValueError(f'Invalid return_when value: {return_when}')
- fs = set(fs)
- if any(coroutines.iscoroutine(f) for f in fs):
- raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
- loop = events.get_running_loop()
- return await _wait(fs, timeout, return_when, loop)
- def _release_waiter(waiter, *args):
- if not waiter.done():
- waiter.set_result(None)
- async def wait_for(fut, timeout):
- """Wait for the single Future or coroutine to complete, with timeout.
- Coroutine will be wrapped in Task.
- Returns result of the Future or coroutine. When a timeout occurs,
- it cancels the task and raises TimeoutError. To avoid the task
- cancellation, wrap it in shield().
- If the wait is cancelled, the task is also cancelled.
- If the task supresses the cancellation and returns a value instead,
- that value is returned.
- This function is a coroutine.
- """
- # The special case for timeout <= 0 is for the following case:
- #
- # async def test_waitfor():
- # func_started = False
- #
- # async def func():
- # nonlocal func_started
- # func_started = True
- #
- # try:
- # await asyncio.wait_for(func(), 0)
- # except asyncio.TimeoutError:
- # assert not func_started
- # else:
- # assert False
- #
- # asyncio.run(test_waitfor())
- if timeout is not None and timeout <= 0:
- fut = ensure_future(fut)
- if fut.done():
- return fut.result()
- await _cancel_and_wait(fut)
- try:
- return fut.result()
- except exceptions.CancelledError as exc:
- raise TimeoutError from exc
- async with timeouts.timeout(timeout):
- return await fut
- async def _wait(fs, timeout, return_when, loop):
- """Internal helper for wait().
- The fs argument must be a collection of Futures.
- """
- assert fs, 'Set of Futures is empty.'
- waiter = loop.create_future()
- timeout_handle = None
- if timeout is not None:
- timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
- counter = len(fs)
- def _on_completion(f):
- nonlocal counter
- counter -= 1
- if (counter <= 0 or
- return_when == FIRST_COMPLETED or
- return_when == FIRST_EXCEPTION and (not f.cancelled() and
- f.exception() is not None)):
- if timeout_handle is not None:
- timeout_handle.cancel()
- if not waiter.done():
- waiter.set_result(None)
- for f in fs:
- f.add_done_callback(_on_completion)
- try:
- await waiter
- finally:
- if timeout_handle is not None:
- timeout_handle.cancel()
- for f in fs:
- f.remove_done_callback(_on_completion)
- done, pending = set(), set()
- for f in fs:
- if f.done():
- done.add(f)
- else:
- pending.add(f)
- return done, pending
- async def _cancel_and_wait(fut):
- """Cancel the *fut* future or task and wait until it completes."""
- loop = events.get_running_loop()
- waiter = loop.create_future()
- cb = functools.partial(_release_waiter, waiter)
- fut.add_done_callback(cb)
- try:
- fut.cancel()
- # We cannot wait on *fut* directly to make
- # sure _cancel_and_wait itself is reliably cancellable.
- await waiter
- finally:
- fut.remove_done_callback(cb)
- # This is *not* a @coroutine! It is just an iterator (yielding Futures).
- def as_completed(fs, *, timeout=None):
- """Return an iterator whose values are coroutines.
- When waiting for the yielded coroutines you'll get the results (or
- exceptions!) of the original Futures (or coroutines), in the order
- in which and as soon as they complete.
- This differs from PEP 3148; the proper way to use this is:
- for f in as_completed(fs):
- result = await f # The 'await' may raise.
- # Use result.
- If a timeout is specified, the 'await' will raise
- TimeoutError when the timeout occurs before all Futures are done.
- Note: The futures 'f' are not necessarily members of fs.
- """
- if futures.isfuture(fs) or coroutines.iscoroutine(fs):
- raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
- from .queues import Queue # Import here to avoid circular import problem.
- done = Queue()
- loop = events.get_event_loop()
- todo = {ensure_future(f, loop=loop) for f in set(fs)}
- timeout_handle = None
- def _on_timeout():
- for f in todo:
- f.remove_done_callback(_on_completion)
- done.put_nowait(None) # Queue a dummy value for _wait_for_one().
- todo.clear() # Can't do todo.remove(f) in the loop.
- def _on_completion(f):
- if not todo:
- return # _on_timeout() was here first.
- todo.remove(f)
- done.put_nowait(f)
- if not todo and timeout_handle is not None:
- timeout_handle.cancel()
- async def _wait_for_one():
- f = await done.get()
- if f is None:
- # Dummy value from _on_timeout().
- raise exceptions.TimeoutError
- return f.result() # May raise f.exception().
- for f in todo:
- f.add_done_callback(_on_completion)
- if todo and timeout is not None:
- timeout_handle = loop.call_later(timeout, _on_timeout)
- for _ in range(len(todo)):
- yield _wait_for_one()
- @types.coroutine
- def __sleep0():
- """Skip one event loop run cycle.
- This is a private helper for 'asyncio.sleep()', used
- when the 'delay' is set to 0. It uses a bare 'yield'
- expression (which Task.__step knows how to handle)
- instead of creating a Future object.
- """
- yield
- async def sleep(delay, result=None):
- """Coroutine that completes after a given time (in seconds)."""
- if delay <= 0:
- await __sleep0()
- return result
- loop = events.get_running_loop()
- future = loop.create_future()
- h = loop.call_later(delay,
- futures._set_result_unless_cancelled,
- future, result)
- try:
- return await future
- finally:
- h.cancel()
- def ensure_future(coro_or_future, *, loop=None):
- """Wrap a coroutine or an awaitable in a future.
- If the argument is a Future, it is returned directly.
- """
- if futures.isfuture(coro_or_future):
- if loop is not None and loop is not futures._get_loop(coro_or_future):
- raise ValueError('The future belongs to a different loop than '
- 'the one specified as the loop argument')
- return coro_or_future
- should_close = True
- if not coroutines.iscoroutine(coro_or_future):
- if inspect.isawaitable(coro_or_future):
- async def _wrap_awaitable(awaitable):
- return await awaitable
- coro_or_future = _wrap_awaitable(coro_or_future)
- should_close = False
- else:
- raise TypeError('An asyncio.Future, a coroutine or an awaitable '
- 'is required')
- if loop is None:
- loop = events.get_event_loop()
- try:
- return loop.create_task(coro_or_future)
- except RuntimeError:
- if should_close:
- coro_or_future.close()
- raise
- class _GatheringFuture(futures.Future):
- """Helper for gather().
- This overrides cancel() to cancel all the children and act more
- like Task.cancel(), which doesn't immediately mark itself as
- cancelled.
- """
- def __init__(self, children, *, loop):
- assert loop is not None
- super().__init__(loop=loop)
- self._children = children
- self._cancel_requested = False
- def cancel(self, msg=None):
- if self.done():
- return False
- ret = False
- for child in self._children:
- if child.cancel(msg=msg):
- ret = True
- if ret:
- # If any child tasks were actually cancelled, we should
- # propagate the cancellation request regardless of
- # *return_exceptions* argument. See issue 32684.
- self._cancel_requested = True
- return ret
- def gather(*coros_or_futures, return_exceptions=False):
- """Return a future aggregating results from the given coroutines/futures.
- Coroutines will be wrapped in a future and scheduled in the event
- loop. They will not necessarily be scheduled in the same order as
- passed in.
- All futures must share the same event loop. If all the tasks are
- done successfully, the returned future's result is the list of
- results (in the order of the original sequence, not necessarily
- the order of results arrival). If *return_exceptions* is True,
- exceptions in the tasks are treated the same as successful
- results, and gathered in the result list; otherwise, the first
- raised exception will be immediately propagated to the returned
- future.
- Cancellation: if the outer Future is cancelled, all children (that
- have not completed yet) are also cancelled. If any child is
- cancelled, this is treated as if it raised CancelledError --
- the outer Future is *not* cancelled in this case. (This is to
- prevent the cancellation of one child to cause other children to
- be cancelled.)
- If *return_exceptions* is False, cancelling gather() after it
- has been marked done won't cancel any submitted awaitables.
- For instance, gather can be marked done after propagating an
- exception to the caller, therefore, calling ``gather.cancel()``
- after catching an exception (raised by one of the awaitables) from
- gather won't cancel any other awaitables.
- """
- if not coros_or_futures:
- loop = events.get_event_loop()
- outer = loop.create_future()
- outer.set_result([])
- return outer
- def _done_callback(fut):
- nonlocal nfinished
- nfinished += 1
- if outer is None or outer.done():
- if not fut.cancelled():
- # Mark exception retrieved.
- fut.exception()
- return
- if not return_exceptions:
- if fut.cancelled():
- # Check if 'fut' is cancelled first, as
- # 'fut.exception()' will *raise* a CancelledError
- # instead of returning it.
- exc = fut._make_cancelled_error()
- outer.set_exception(exc)
- return
- else:
- exc = fut.exception()
- if exc is not None:
- outer.set_exception(exc)
- return
- if nfinished == nfuts:
- # All futures are done; create a list of results
- # and set it to the 'outer' future.
- results = []
- for fut in children:
- if fut.cancelled():
- # Check if 'fut' is cancelled first, as 'fut.exception()'
- # will *raise* a CancelledError instead of returning it.
- # Also, since we're adding the exception return value
- # to 'results' instead of raising it, don't bother
- # setting __context__. This also lets us preserve
- # calling '_make_cancelled_error()' at most once.
- res = exceptions.CancelledError(
- '' if fut._cancel_message is None else
- fut._cancel_message)
- else:
- res = fut.exception()
- if res is None:
- res = fut.result()
- results.append(res)
- if outer._cancel_requested:
- # If gather is being cancelled we must propagate the
- # cancellation regardless of *return_exceptions* argument.
- # See issue 32684.
- exc = fut._make_cancelled_error()
- outer.set_exception(exc)
- else:
- outer.set_result(results)
- arg_to_fut = {}
- children = []
- nfuts = 0
- nfinished = 0
- done_futs = []
- loop = None
- outer = None # bpo-46672
- for arg in coros_or_futures:
- if arg not in arg_to_fut:
- fut = ensure_future(arg, loop=loop)
- if loop is None:
- loop = futures._get_loop(fut)
- if fut is not arg:
- # 'arg' was not a Future, therefore, 'fut' is a new
- # Future created specifically for 'arg'. Since the caller
- # can't control it, disable the "destroy pending task"
- # warning.
- fut._log_destroy_pending = False
- nfuts += 1
- arg_to_fut[arg] = fut
- if fut.done():
- done_futs.append(fut)
- else:
- fut.add_done_callback(_done_callback)
- else:
- # There's a duplicate Future object in coros_or_futures.
- fut = arg_to_fut[arg]
- children.append(fut)
- outer = _GatheringFuture(children, loop=loop)
- # Run done callbacks after GatheringFuture created so any post-processing
- # can be performed at this point
- # optimization: in the special case that *all* futures finished eagerly,
- # this will effectively complete the gather eagerly, with the last
- # callback setting the result (or exception) on outer before returning it
- for fut in done_futs:
- _done_callback(fut)
- return outer
- def shield(arg):
- """Wait for a future, shielding it from cancellation.
- The statement
- task = asyncio.create_task(something())
- res = await shield(task)
- is exactly equivalent to the statement
- res = await something()
- *except* that if the coroutine containing it is cancelled, the
- task running in something() is not cancelled. From the POV of
- something(), the cancellation did not happen. But its caller is
- still cancelled, so the yield-from expression still raises
- CancelledError. Note: If something() is cancelled by other means
- this will still cancel shield().
- If you want to completely ignore cancellation (not recommended)
- you can combine shield() with a try/except clause, as follows:
- task = asyncio.create_task(something())
- try:
- res = await shield(task)
- except CancelledError:
- res = None
- Save a reference to tasks passed to this function, to avoid
- a task disappearing mid-execution. The event loop only keeps
- weak references to tasks. A task that isn't referenced elsewhere
- may get garbage collected at any time, even before it's done.
- """
- inner = ensure_future(arg)
- if inner.done():
- # Shortcut.
- return inner
- loop = futures._get_loop(inner)
- outer = loop.create_future()
- def _inner_done_callback(inner):
- if outer.cancelled():
- if not inner.cancelled():
- # Mark inner's result as retrieved.
- inner.exception()
- return
- if inner.cancelled():
- outer.cancel()
- else:
- exc = inner.exception()
- if exc is not None:
- outer.set_exception(exc)
- else:
- outer.set_result(inner.result())
- def _outer_done_callback(outer):
- if not inner.done():
- inner.remove_done_callback(_inner_done_callback)
- inner.add_done_callback(_inner_done_callback)
- outer.add_done_callback(_outer_done_callback)
- return outer
- def run_coroutine_threadsafe(coro, loop):
- """Submit a coroutine object to a given event loop.
- Return a concurrent.futures.Future to access the result.
- """
- if not coroutines.iscoroutine(coro):
- raise TypeError('A coroutine object is required')
- future = concurrent.futures.Future()
- def callback():
- try:
- futures._chain_future(ensure_future(coro, loop=loop), future)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
- if future.set_running_or_notify_cancel():
- future.set_exception(exc)
- raise
- loop.call_soon_threadsafe(callback)
- return future
- def create_eager_task_factory(custom_task_constructor):
- """Create a function suitable for use as a task factory on an event-loop.
- Example usage:
- loop.set_task_factory(
- asyncio.create_eager_task_factory(my_task_constructor))
- Now, tasks created will be started immediately (rather than being first
- scheduled to an event loop). The constructor argument can be any callable
- that returns a Task-compatible object and has a signature compatible
- with `Task.__init__`; it must have the `eager_start` keyword argument.
- Most applications will use `Task` for `custom_task_constructor` and in
- this case there's no need to call `create_eager_task_factory()`
- directly. Instead the global `eager_task_factory` instance can be
- used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
- """
- def factory(loop, coro, *, name=None, context=None):
- return custom_task_constructor(
- coro, loop=loop, name=name, context=context, eager_start=True)
- return factory
- eager_task_factory = create_eager_task_factory(Task)
- # Collectively these two sets hold references to the complete set of active
- # tasks. Eagerly executed tasks use a faster regular set as an optimization
- # but may graduate to a WeakSet if the task blocks on IO.
- _scheduled_tasks = weakref.WeakSet()
- _eager_tasks = set()
- # Dictionary containing tasks that are currently active in
- # all running event loops. {EventLoop: Task}
- _current_tasks = {}
- def _register_task(task):
- """Register an asyncio Task scheduled to run on an event loop."""
- _scheduled_tasks.add(task)
- def _register_eager_task(task):
- """Register an asyncio Task about to be eagerly executed."""
- _eager_tasks.add(task)
- def _enter_task(loop, task):
- current_task = _current_tasks.get(loop)
- if current_task is not None:
- raise RuntimeError(f"Cannot enter into task {task!r} while another "
- f"task {current_task!r} is being executed.")
- _current_tasks[loop] = task
- def _leave_task(loop, task):
- current_task = _current_tasks.get(loop)
- if current_task is not task:
- raise RuntimeError(f"Leaving task {task!r} does not match "
- f"the current task {current_task!r}.")
- del _current_tasks[loop]
- def _swap_current_task(loop, task):
- prev_task = _current_tasks.get(loop)
- if task is None:
- del _current_tasks[loop]
- else:
- _current_tasks[loop] = task
- return prev_task
- def _unregister_task(task):
- """Unregister a completed, scheduled Task."""
- _scheduled_tasks.discard(task)
- def _unregister_eager_task(task):
- """Unregister a task which finished its first eager step."""
- _eager_tasks.discard(task)
- _py_current_task = current_task
- _py_register_task = _register_task
- _py_register_eager_task = _register_eager_task
- _py_unregister_task = _unregister_task
- _py_unregister_eager_task = _unregister_eager_task
- _py_enter_task = _enter_task
- _py_leave_task = _leave_task
- _py_swap_current_task = _swap_current_task
- try:
- from _asyncio import (_register_task, _register_eager_task,
- _unregister_task, _unregister_eager_task,
- _enter_task, _leave_task, _swap_current_task,
- _scheduled_tasks, _eager_tasks, _current_tasks,
- current_task)
- except ImportError:
- pass
- else:
- _c_current_task = current_task
- _c_register_task = _register_task
- _c_register_eager_task = _register_eager_task
- _c_unregister_task = _unregister_task
- _c_unregister_eager_task = _unregister_eager_task
- _c_enter_task = _enter_task
- _c_leave_task = _leave_task
- _c_swap_current_task = _swap_current_task
|