inputhook.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """
  2. Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in
  3. the asyncio event loop.
  4. The way this works is by using a custom 'selector' that runs the other event
  5. loop until the real selector is ready.
  6. It's the responsibility of this event hook to return when there is input ready.
  7. There are two ways to detect when input is ready:
  8. The inputhook itself is a callable that receives an `InputHookContext`. This
  9. callable should run the other event loop, and return when the main loop has
  10. stuff to do. There are two ways to detect when to return:
  11. - Call the `input_is_ready` method periodically. Quit when this returns `True`.
  12. - Add the `fileno` as a watch to the external eventloop. Quit when file descriptor
  13. becomes readable. (But don't read from it.)
  14. Note that this is not the same as checking for `sys.stdin.fileno()`. The
  15. eventloop of prompt-toolkit allows thread-based executors, for example for
  16. asynchronous autocompletion. When the completion for instance is ready, we
  17. also want prompt-toolkit to gain control again in order to display that.
  18. """
  19. from __future__ import annotations
  20. import asyncio
  21. import os
  22. import select
  23. import selectors
  24. import sys
  25. import threading
  26. from asyncio import AbstractEventLoop, get_running_loop
  27. from selectors import BaseSelector, SelectorKey
  28. from typing import TYPE_CHECKING, Any, Callable, Mapping
  29. __all__ = [
  30. "new_eventloop_with_inputhook",
  31. "set_eventloop_with_inputhook",
  32. "InputHookSelector",
  33. "InputHookContext",
  34. "InputHook",
  35. ]
  36. if TYPE_CHECKING:
  37. from _typeshed import FileDescriptorLike
  38. from typing_extensions import TypeAlias
  39. _EventMask = int
  40. class InputHookContext:
  41. """
  42. Given as a parameter to the inputhook.
  43. """
  44. def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None:
  45. self._fileno = fileno
  46. self.input_is_ready = input_is_ready
  47. def fileno(self) -> int:
  48. return self._fileno
  49. InputHook: TypeAlias = Callable[[InputHookContext], None]
  50. def new_eventloop_with_inputhook(
  51. inputhook: Callable[[InputHookContext], None],
  52. ) -> AbstractEventLoop:
  53. """
  54. Create a new event loop with the given inputhook.
  55. """
  56. selector = InputHookSelector(selectors.DefaultSelector(), inputhook)
  57. loop = asyncio.SelectorEventLoop(selector)
  58. return loop
  59. def set_eventloop_with_inputhook(
  60. inputhook: Callable[[InputHookContext], None],
  61. ) -> AbstractEventLoop:
  62. """
  63. Create a new event loop with the given inputhook, and activate it.
  64. """
  65. # Deprecated!
  66. loop = new_eventloop_with_inputhook(inputhook)
  67. asyncio.set_event_loop(loop)
  68. return loop
  69. class InputHookSelector(BaseSelector):
  70. """
  71. Usage:
  72. selector = selectors.SelectSelector()
  73. loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
  74. asyncio.set_event_loop(loop)
  75. """
  76. def __init__(
  77. self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None]
  78. ) -> None:
  79. self.selector = selector
  80. self.inputhook = inputhook
  81. self._r, self._w = os.pipe()
  82. def register(
  83. self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
  84. ) -> SelectorKey:
  85. return self.selector.register(fileobj, events, data=data)
  86. def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
  87. return self.selector.unregister(fileobj)
  88. def modify(
  89. self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
  90. ) -> SelectorKey:
  91. return self.selector.modify(fileobj, events, data=None)
  92. def select(
  93. self, timeout: float | None = None
  94. ) -> list[tuple[SelectorKey, _EventMask]]:
  95. # If there are tasks in the current event loop,
  96. # don't run the input hook.
  97. if len(getattr(get_running_loop(), "_ready", [])) > 0:
  98. return self.selector.select(timeout=timeout)
  99. ready = False
  100. result = None
  101. # Run selector in other thread.
  102. def run_selector() -> None:
  103. nonlocal ready, result
  104. result = self.selector.select(timeout=timeout)
  105. os.write(self._w, b"x")
  106. ready = True
  107. th = threading.Thread(target=run_selector)
  108. th.start()
  109. def input_is_ready() -> bool:
  110. return ready
  111. # Call inputhook.
  112. # The inputhook function is supposed to return when our selector
  113. # becomes ready. The inputhook can do that by registering the fd in its
  114. # own loop, or by checking the `input_is_ready` function regularly.
  115. self.inputhook(InputHookContext(self._r, input_is_ready))
  116. # Flush the read end of the pipe.
  117. try:
  118. # Before calling 'os.read', call select.select. This is required
  119. # when the gevent monkey patch has been applied. 'os.read' is never
  120. # monkey patched and won't be cooperative, so that would block all
  121. # other select() calls otherwise.
  122. # See: http://www.gevent.org/gevent.os.html
  123. # Note: On Windows, this is apparently not an issue.
  124. # However, if we would ever want to add a select call, it
  125. # should use `windll.kernel32.WaitForMultipleObjects`,
  126. # because `select.select` can't wait for a pipe on Windows.
  127. if sys.platform != "win32":
  128. select.select([self._r], [], [], None)
  129. os.read(self._r, 1024)
  130. except OSError:
  131. # This happens when the window resizes and a SIGWINCH was received.
  132. # We get 'Error: [Errno 4] Interrupted system call'
  133. # Just ignore.
  134. pass
  135. # Wait for the real selector to be done.
  136. th.join()
  137. assert result is not None
  138. return result
  139. def close(self) -> None:
  140. """
  141. Clean up resources.
  142. """
  143. if self._r:
  144. os.close(self._r)
  145. os.close(self._w)
  146. self._r = self._w = -1
  147. self.selector.close()
  148. def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
  149. return self.selector.get_map()