123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- """
- Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in
- the asyncio event loop.
- The way this works is by using a custom 'selector' that runs the other event
- loop until the real selector is ready.
- It's the responsibility of this event hook to return when there is input ready.
- There are two ways to detect when input is ready:
- The inputhook itself is a callable that receives an `InputHookContext`. This
- callable should run the other event loop, and return when the main loop has
- stuff to do. There are two ways to detect when to return:
- - Call the `input_is_ready` method periodically. Quit when this returns `True`.
- - Add the `fileno` as a watch to the external eventloop. Quit when file descriptor
- becomes readable. (But don't read from it.)
- Note that this is not the same as checking for `sys.stdin.fileno()`. The
- eventloop of prompt-toolkit allows thread-based executors, for example for
- asynchronous autocompletion. When the completion for instance is ready, we
- also want prompt-toolkit to gain control again in order to display that.
- """
- from __future__ import annotations
- import asyncio
- import os
- import select
- import selectors
- import sys
- import threading
- from asyncio import AbstractEventLoop, get_running_loop
- from selectors import BaseSelector, SelectorKey
- from typing import TYPE_CHECKING, Any, Callable, Mapping
- __all__ = [
- "new_eventloop_with_inputhook",
- "set_eventloop_with_inputhook",
- "InputHookSelector",
- "InputHookContext",
- "InputHook",
- ]
- if TYPE_CHECKING:
- from _typeshed import FileDescriptorLike
- from typing_extensions import TypeAlias
- _EventMask = int
- class InputHookContext:
- """
- Given as a parameter to the inputhook.
- """
- def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None:
- self._fileno = fileno
- self.input_is_ready = input_is_ready
- def fileno(self) -> int:
- return self._fileno
- InputHook: TypeAlias = Callable[[InputHookContext], None]
- def new_eventloop_with_inputhook(
- inputhook: Callable[[InputHookContext], None],
- ) -> AbstractEventLoop:
- """
- Create a new event loop with the given inputhook.
- """
- selector = InputHookSelector(selectors.DefaultSelector(), inputhook)
- loop = asyncio.SelectorEventLoop(selector)
- return loop
- def set_eventloop_with_inputhook(
- inputhook: Callable[[InputHookContext], None],
- ) -> AbstractEventLoop:
- """
- Create a new event loop with the given inputhook, and activate it.
- """
- # Deprecated!
- loop = new_eventloop_with_inputhook(inputhook)
- asyncio.set_event_loop(loop)
- return loop
- class InputHookSelector(BaseSelector):
- """
- Usage:
- selector = selectors.SelectSelector()
- loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
- asyncio.set_event_loop(loop)
- """
- def __init__(
- self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None]
- ) -> None:
- self.selector = selector
- self.inputhook = inputhook
- self._r, self._w = os.pipe()
- def register(
- self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
- ) -> SelectorKey:
- return self.selector.register(fileobj, events, data=data)
- def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
- return self.selector.unregister(fileobj)
- def modify(
- self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
- ) -> SelectorKey:
- return self.selector.modify(fileobj, events, data=None)
- def select(
- self, timeout: float | None = None
- ) -> list[tuple[SelectorKey, _EventMask]]:
- # If there are tasks in the current event loop,
- # don't run the input hook.
- if len(getattr(get_running_loop(), "_ready", [])) > 0:
- return self.selector.select(timeout=timeout)
- ready = False
- result = None
- # Run selector in other thread.
- def run_selector() -> None:
- nonlocal ready, result
- result = self.selector.select(timeout=timeout)
- os.write(self._w, b"x")
- ready = True
- th = threading.Thread(target=run_selector)
- th.start()
- def input_is_ready() -> bool:
- return ready
- # Call inputhook.
- # The inputhook function is supposed to return when our selector
- # becomes ready. The inputhook can do that by registering the fd in its
- # own loop, or by checking the `input_is_ready` function regularly.
- self.inputhook(InputHookContext(self._r, input_is_ready))
- # Flush the read end of the pipe.
- try:
- # Before calling 'os.read', call select.select. This is required
- # when the gevent monkey patch has been applied. 'os.read' is never
- # monkey patched and won't be cooperative, so that would block all
- # other select() calls otherwise.
- # See: http://www.gevent.org/gevent.os.html
- # Note: On Windows, this is apparently not an issue.
- # However, if we would ever want to add a select call, it
- # should use `windll.kernel32.WaitForMultipleObjects`,
- # because `select.select` can't wait for a pipe on Windows.
- if sys.platform != "win32":
- select.select([self._r], [], [], None)
- os.read(self._r, 1024)
- except OSError:
- # This happens when the window resizes and a SIGWINCH was received.
- # We get 'Error: [Errno 4] Interrupted system call'
- # Just ignore.
- pass
- # Wait for the real selector to be done.
- th.join()
- assert result is not None
- return result
- def close(self) -> None:
- """
- Clean up resources.
- """
- if self._r:
- os.close(self._r)
- os.close(self._w)
- self._r = self._w = -1
- self.selector.close()
- def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
- return self.selector.get_map()
|