123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import enum
- from types import TracebackType
- from typing import final, Optional, Type
- from . import events
- from . import exceptions
- from . import tasks
- __all__ = (
- "Timeout",
- "timeout",
- "timeout_at",
- )
- class _State(enum.Enum):
- CREATED = "created"
- ENTERED = "active"
- EXPIRING = "expiring"
- EXPIRED = "expired"
- EXITED = "finished"
- @final
- class Timeout:
- """Asynchronous context manager for cancelling overdue coroutines.
- Use `timeout()` or `timeout_at()` rather than instantiating this class directly.
- """
- def __init__(self, when: Optional[float]) -> None:
- """Schedule a timeout that will trigger at a given loop time.
- - If `when` is `None`, the timeout will never trigger.
- - If `when < loop.time()`, the timeout will trigger on the next
- iteration of the event loop.
- """
- self._state = _State.CREATED
- self._timeout_handler: Optional[events.TimerHandle] = None
- self._task: Optional[tasks.Task] = None
- self._when = when
- def when(self) -> Optional[float]:
- """Return the current deadline."""
- return self._when
- def reschedule(self, when: Optional[float]) -> None:
- """Reschedule the timeout."""
- if self._state is not _State.ENTERED:
- if self._state is _State.CREATED:
- raise RuntimeError("Timeout has not been entered")
- raise RuntimeError(
- f"Cannot change state of {self._state.value} Timeout",
- )
- self._when = when
- if self._timeout_handler is not None:
- self._timeout_handler.cancel()
- if when is None:
- self._timeout_handler = None
- else:
- loop = events.get_running_loop()
- if when <= loop.time():
- self._timeout_handler = loop.call_soon(self._on_timeout)
- else:
- self._timeout_handler = loop.call_at(when, self._on_timeout)
- def expired(self) -> bool:
- """Is timeout expired during execution?"""
- return self._state in (_State.EXPIRING, _State.EXPIRED)
- def __repr__(self) -> str:
- info = ['']
- if self._state is _State.ENTERED:
- when = round(self._when, 3) if self._when is not None else None
- info.append(f"when={when}")
- info_str = ' '.join(info)
- return f"<Timeout [{self._state.value}]{info_str}>"
- async def __aenter__(self) -> "Timeout":
- if self._state is not _State.CREATED:
- raise RuntimeError("Timeout has already been entered")
- task = tasks.current_task()
- if task is None:
- raise RuntimeError("Timeout should be used inside a task")
- self._state = _State.ENTERED
- self._task = task
- self._cancelling = self._task.cancelling()
- self.reschedule(self._when)
- return self
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> Optional[bool]:
- assert self._state in (_State.ENTERED, _State.EXPIRING)
- if self._timeout_handler is not None:
- self._timeout_handler.cancel()
- self._timeout_handler = None
- if self._state is _State.EXPIRING:
- self._state = _State.EXPIRED
- if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
- # Since there are no new cancel requests, we're
- # handling this.
- raise TimeoutError from exc_val
- elif self._state is _State.ENTERED:
- self._state = _State.EXITED
- return None
- def _on_timeout(self) -> None:
- assert self._state is _State.ENTERED
- self._task.cancel()
- self._state = _State.EXPIRING
- # drop the reference early
- self._timeout_handler = None
- def timeout(delay: Optional[float]) -> Timeout:
- """Timeout async context manager.
- Useful in cases when you want to apply timeout logic around block
- of code or in cases when asyncio.wait_for is not suitable. For example:
- >>> async with asyncio.timeout(10): # 10 seconds timeout
- ... await long_running_task()
- delay - value in seconds or None to disable timeout logic
- long_running_task() is interrupted by raising asyncio.CancelledError,
- the top-most affected timeout() context manager converts CancelledError
- into TimeoutError.
- """
- loop = events.get_running_loop()
- return Timeout(loop.time() + delay if delay is not None else None)
- def timeout_at(when: Optional[float]) -> Timeout:
- """Schedule the timeout at absolute time.
- Like timeout() but argument gives absolute time in the same clock system
- as loop.time().
- Please note: it is not POSIX time but a time with
- undefined starting base, e.g. the time of the system power on.
- >>> async with asyncio.timeout_at(loop.time() + 10):
- ... await long_running_task()
- when - a deadline when timeout occurs or None to disable timeout logic
- long_running_task() is interrupted by raising asyncio.CancelledError,
- the top-most affected timeout() context manager converts CancelledError
- into TimeoutError.
- """
- return Timeout(when)
|