timeouts.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import enum
  2. from types import TracebackType
  3. from typing import final, Optional, Type
  4. from . import events
  5. from . import exceptions
  6. from . import tasks
  7. __all__ = (
  8. "Timeout",
  9. "timeout",
  10. "timeout_at",
  11. )
  12. class _State(enum.Enum):
  13. CREATED = "created"
  14. ENTERED = "active"
  15. EXPIRING = "expiring"
  16. EXPIRED = "expired"
  17. EXITED = "finished"
  18. @final
  19. class Timeout:
  20. """Asynchronous context manager for cancelling overdue coroutines.
  21. Use `timeout()` or `timeout_at()` rather than instantiating this class directly.
  22. """
  23. def __init__(self, when: Optional[float]) -> None:
  24. """Schedule a timeout that will trigger at a given loop time.
  25. - If `when` is `None`, the timeout will never trigger.
  26. - If `when < loop.time()`, the timeout will trigger on the next
  27. iteration of the event loop.
  28. """
  29. self._state = _State.CREATED
  30. self._timeout_handler: Optional[events.TimerHandle] = None
  31. self._task: Optional[tasks.Task] = None
  32. self._when = when
  33. def when(self) -> Optional[float]:
  34. """Return the current deadline."""
  35. return self._when
  36. def reschedule(self, when: Optional[float]) -> None:
  37. """Reschedule the timeout."""
  38. if self._state is not _State.ENTERED:
  39. if self._state is _State.CREATED:
  40. raise RuntimeError("Timeout has not been entered")
  41. raise RuntimeError(
  42. f"Cannot change state of {self._state.value} Timeout",
  43. )
  44. self._when = when
  45. if self._timeout_handler is not None:
  46. self._timeout_handler.cancel()
  47. if when is None:
  48. self._timeout_handler = None
  49. else:
  50. loop = events.get_running_loop()
  51. if when <= loop.time():
  52. self._timeout_handler = loop.call_soon(self._on_timeout)
  53. else:
  54. self._timeout_handler = loop.call_at(when, self._on_timeout)
  55. def expired(self) -> bool:
  56. """Is timeout expired during execution?"""
  57. return self._state in (_State.EXPIRING, _State.EXPIRED)
  58. def __repr__(self) -> str:
  59. info = ['']
  60. if self._state is _State.ENTERED:
  61. when = round(self._when, 3) if self._when is not None else None
  62. info.append(f"when={when}")
  63. info_str = ' '.join(info)
  64. return f"<Timeout [{self._state.value}]{info_str}>"
  65. async def __aenter__(self) -> "Timeout":
  66. if self._state is not _State.CREATED:
  67. raise RuntimeError("Timeout has already been entered")
  68. task = tasks.current_task()
  69. if task is None:
  70. raise RuntimeError("Timeout should be used inside a task")
  71. self._state = _State.ENTERED
  72. self._task = task
  73. self._cancelling = self._task.cancelling()
  74. self.reschedule(self._when)
  75. return self
  76. async def __aexit__(
  77. self,
  78. exc_type: Optional[Type[BaseException]],
  79. exc_val: Optional[BaseException],
  80. exc_tb: Optional[TracebackType],
  81. ) -> Optional[bool]:
  82. assert self._state in (_State.ENTERED, _State.EXPIRING)
  83. if self._timeout_handler is not None:
  84. self._timeout_handler.cancel()
  85. self._timeout_handler = None
  86. if self._state is _State.EXPIRING:
  87. self._state = _State.EXPIRED
  88. if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
  89. # Since there are no new cancel requests, we're
  90. # handling this.
  91. raise TimeoutError from exc_val
  92. elif self._state is _State.ENTERED:
  93. self._state = _State.EXITED
  94. return None
  95. def _on_timeout(self) -> None:
  96. assert self._state is _State.ENTERED
  97. self._task.cancel()
  98. self._state = _State.EXPIRING
  99. # drop the reference early
  100. self._timeout_handler = None
  101. def timeout(delay: Optional[float]) -> Timeout:
  102. """Timeout async context manager.
  103. Useful in cases when you want to apply timeout logic around block
  104. of code or in cases when asyncio.wait_for is not suitable. For example:
  105. >>> async with asyncio.timeout(10): # 10 seconds timeout
  106. ... await long_running_task()
  107. delay - value in seconds or None to disable timeout logic
  108. long_running_task() is interrupted by raising asyncio.CancelledError,
  109. the top-most affected timeout() context manager converts CancelledError
  110. into TimeoutError.
  111. """
  112. loop = events.get_running_loop()
  113. return Timeout(loop.time() + delay if delay is not None else None)
  114. def timeout_at(when: Optional[float]) -> Timeout:
  115. """Schedule the timeout at absolute time.
  116. Like timeout() but argument gives absolute time in the same clock system
  117. as loop.time().
  118. Please note: it is not POSIX time but a time with
  119. undefined starting base, e.g. the time of the system power on.
  120. >>> async with asyncio.timeout_at(loop.time() + 10):
  121. ... await long_running_task()
  122. when - a deadline when timeout occurs or None to disable timeout logic
  123. long_running_task() is interrupted by raising asyncio.CancelledError,
  124. the top-most affected timeout() context manager converts CancelledError
  125. into TimeoutError.
  126. """
  127. return Timeout(when)