123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- from __future__ import annotations
- import sys
- assert sys.platform != "win32"
- import contextlib
- import io
- import termios
- import tty
- from asyncio import AbstractEventLoop, get_running_loop
- from typing import Callable, ContextManager, Generator, TextIO
- from ..key_binding import KeyPress
- from .base import Input
- from .posix_utils import PosixStdinReader
- from .vt100_parser import Vt100Parser
- __all__ = [
- "Vt100Input",
- "raw_mode",
- "cooked_mode",
- ]
- class Vt100Input(Input):
- """
- Vt100 input for Posix systems.
- (This uses a posix file descriptor that can be registered in the event loop.)
- """
- # For the error messages. Only display "Input is not a terminal" once per
- # file descriptor.
- _fds_not_a_terminal: set[int] = set()
- def __init__(self, stdin: TextIO) -> None:
- # Test whether the given input object has a file descriptor.
- # (Idle reports stdin to be a TTY, but fileno() is not implemented.)
- try:
- # This should not raise, but can return 0.
- stdin.fileno()
- except io.UnsupportedOperation as e:
- if "idlelib.run" in sys.modules:
- raise io.UnsupportedOperation(
- "Stdin is not a terminal. Running from Idle is not supported."
- ) from e
- else:
- raise io.UnsupportedOperation("Stdin is not a terminal.") from e
- # Even when we have a file descriptor, it doesn't mean it's a TTY.
- # Normally, this requires a real TTY device, but people instantiate
- # this class often during unit tests as well. They use for instance
- # pexpect to pipe data into an application. For convenience, we print
- # an error message and go on.
- isatty = stdin.isatty()
- fd = stdin.fileno()
- if not isatty and fd not in Vt100Input._fds_not_a_terminal:
- msg = "Warning: Input is not a terminal (fd=%r).\n"
- sys.stderr.write(msg % fd)
- sys.stderr.flush()
- Vt100Input._fds_not_a_terminal.add(fd)
- #
- self.stdin = stdin
- # Create a backup of the fileno(). We want this to work even if the
- # underlying file is closed, so that `typeahead_hash()` keeps working.
- self._fileno = stdin.fileno()
- self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
- self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding)
- self.vt100_parser = Vt100Parser(
- lambda key_press: self._buffer.append(key_press)
- )
- def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
- """
- Return a context manager that makes this input active in the current
- event loop.
- """
- return _attached_input(self, input_ready_callback)
- def detach(self) -> ContextManager[None]:
- """
- Return a context manager that makes sure that this input is not active
- in the current event loop.
- """
- return _detached_input(self)
- def read_keys(self) -> list[KeyPress]:
- "Read list of KeyPress."
- # Read text from stdin.
- data = self.stdin_reader.read()
- # Pass it through our vt100 parser.
- self.vt100_parser.feed(data)
- # Return result.
- result = self._buffer
- self._buffer = []
- return result
- def flush_keys(self) -> list[KeyPress]:
- """
- Flush pending keys and return them.
- (Used for flushing the 'escape' key.)
- """
- # Flush all pending keys. (This is most important to flush the vt100
- # 'Escape' key early when nothing else follows.)
- self.vt100_parser.flush()
- # Return result.
- result = self._buffer
- self._buffer = []
- return result
- @property
- def closed(self) -> bool:
- return self.stdin_reader.closed
- def raw_mode(self) -> ContextManager[None]:
- return raw_mode(self.stdin.fileno())
- def cooked_mode(self) -> ContextManager[None]:
- return cooked_mode(self.stdin.fileno())
- def fileno(self) -> int:
- return self.stdin.fileno()
- def typeahead_hash(self) -> str:
- return f"fd-{self._fileno}"
- _current_callbacks: dict[
- tuple[AbstractEventLoop, int], Callable[[], None] | None
- ] = {} # (loop, fd) -> current callback
- @contextlib.contextmanager
- def _attached_input(
- input: Vt100Input, callback: Callable[[], None]
- ) -> Generator[None, None, None]:
- """
- Context manager that makes this input active in the current event loop.
- :param input: :class:`~prompt_toolkit.input.Input` object.
- :param callback: Called when the input is ready to read.
- """
- loop = get_running_loop()
- fd = input.fileno()
- previous = _current_callbacks.get((loop, fd))
- def callback_wrapper() -> None:
- """Wrapper around the callback that already removes the reader when
- the input is closed. Otherwise, we keep continuously calling this
- callback, until we leave the context manager (which can happen a bit
- later). This fixes issues when piping /dev/null into a prompt_toolkit
- application."""
- if input.closed:
- loop.remove_reader(fd)
- callback()
- try:
- loop.add_reader(fd, callback_wrapper)
- except PermissionError:
- # For `EPollSelector`, adding /dev/null to the event loop will raise
- # `PermissionError` (that doesn't happen for `SelectSelector`
- # apparently). Whenever we get a `PermissionError`, we can raise
- # `EOFError`, because there's not more to be read anyway. `EOFError` is
- # an exception that people expect in
- # `prompt_toolkit.application.Application.run()`.
- # To reproduce, do: `ptpython 0< /dev/null 1< /dev/null`
- raise EOFError
- _current_callbacks[loop, fd] = callback
- try:
- yield
- finally:
- loop.remove_reader(fd)
- if previous:
- loop.add_reader(fd, previous)
- _current_callbacks[loop, fd] = previous
- else:
- del _current_callbacks[loop, fd]
- @contextlib.contextmanager
- def _detached_input(input: Vt100Input) -> Generator[None, None, None]:
- loop = get_running_loop()
- fd = input.fileno()
- previous = _current_callbacks.get((loop, fd))
- if previous:
- loop.remove_reader(fd)
- _current_callbacks[loop, fd] = None
- try:
- yield
- finally:
- if previous:
- loop.add_reader(fd, previous)
- _current_callbacks[loop, fd] = previous
- class raw_mode:
- """
- ::
- with raw_mode(stdin):
- ''' the pseudo-terminal stdin is now used in raw mode '''
- We ignore errors when executing `tcgetattr` fails.
- """
- # There are several reasons for ignoring errors:
- # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would
- # execute this code (In a Python REPL, for instance):
- #
- # import os; f = open(os.devnull); os.dup2(f.fileno(), 0)
- #
- # The result is that the eventloop will stop correctly, because it has
- # to logic to quit when stdin is closed. However, we should not fail at
- # this point. See:
- # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393
- # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392
- # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated.
- # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165
- def __init__(self, fileno: int) -> None:
- self.fileno = fileno
- self.attrs_before: list[int | list[bytes | int]] | None
- try:
- self.attrs_before = termios.tcgetattr(fileno)
- except termios.error:
- # Ignore attribute errors.
- self.attrs_before = None
- def __enter__(self) -> None:
- # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this:
- try:
- newattr = termios.tcgetattr(self.fileno)
- except termios.error:
- pass
- else:
- newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
- newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])
- # VMIN defines the number of characters read at a time in
- # non-canonical mode. It seems to default to 1 on Linux, but on
- # Solaris and derived operating systems it defaults to 4. (This is
- # because the VMIN slot is the same as the VEOF slot, which
- # defaults to ASCII EOT = Ctrl-D = 4.)
- newattr[tty.CC][termios.VMIN] = 1
- termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)
- @classmethod
- def _patch_lflag(cls, attrs: int) -> int:
- return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
- @classmethod
- def _patch_iflag(cls, attrs: int) -> int:
- return attrs & ~(
- # Disable XON/XOFF flow control on output and input.
- # (Don't capture Ctrl-S and Ctrl-Q.)
- # Like executing: "stty -ixon."
- termios.IXON
- | termios.IXOFF
- |
- # Don't translate carriage return into newline on input.
- termios.ICRNL
- | termios.INLCR
- | termios.IGNCR
- )
- def __exit__(self, *a: object) -> None:
- if self.attrs_before is not None:
- try:
- termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
- except termios.error:
- pass
- # # Put the terminal in application mode.
- # self._stdout.write('\x1b[?1h')
- class cooked_mode(raw_mode):
- """
- The opposite of ``raw_mode``, used when we need cooked mode inside a
- `raw_mode` block. Used in `Application.run_in_terminal`.::
- with cooked_mode(stdin):
- ''' the pseudo-terminal stdin is now used in cooked mode. '''
- """
- @classmethod
- def _patch_lflag(cls, attrs: int) -> int:
- return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
- @classmethod
- def _patch_iflag(cls, attrs: int) -> int:
- # Turn the ICRNL flag back on. (Without this, calling `input()` in
- # run_in_terminal doesn't work and displays ^M instead. Ptpython
- # evaluates commands using `run_in_terminal`, so it's important that
- # they translate ^M back into ^J.)
- return attrs | termios.ICRNL
|