vt100.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. from __future__ import annotations
  2. import sys
  3. assert sys.platform != "win32"
  4. import contextlib
  5. import io
  6. import termios
  7. import tty
  8. from asyncio import AbstractEventLoop, get_running_loop
  9. from typing import Callable, ContextManager, Generator, TextIO
  10. from ..key_binding import KeyPress
  11. from .base import Input
  12. from .posix_utils import PosixStdinReader
  13. from .vt100_parser import Vt100Parser
  14. __all__ = [
  15. "Vt100Input",
  16. "raw_mode",
  17. "cooked_mode",
  18. ]
  19. class Vt100Input(Input):
  20. """
  21. Vt100 input for Posix systems.
  22. (This uses a posix file descriptor that can be registered in the event loop.)
  23. """
  24. # For the error messages. Only display "Input is not a terminal" once per
  25. # file descriptor.
  26. _fds_not_a_terminal: set[int] = set()
  27. def __init__(self, stdin: TextIO) -> None:
  28. # Test whether the given input object has a file descriptor.
  29. # (Idle reports stdin to be a TTY, but fileno() is not implemented.)
  30. try:
  31. # This should not raise, but can return 0.
  32. stdin.fileno()
  33. except io.UnsupportedOperation as e:
  34. if "idlelib.run" in sys.modules:
  35. raise io.UnsupportedOperation(
  36. "Stdin is not a terminal. Running from Idle is not supported."
  37. ) from e
  38. else:
  39. raise io.UnsupportedOperation("Stdin is not a terminal.") from e
  40. # Even when we have a file descriptor, it doesn't mean it's a TTY.
  41. # Normally, this requires a real TTY device, but people instantiate
  42. # this class often during unit tests as well. They use for instance
  43. # pexpect to pipe data into an application. For convenience, we print
  44. # an error message and go on.
  45. isatty = stdin.isatty()
  46. fd = stdin.fileno()
  47. if not isatty and fd not in Vt100Input._fds_not_a_terminal:
  48. msg = "Warning: Input is not a terminal (fd=%r).\n"
  49. sys.stderr.write(msg % fd)
  50. sys.stderr.flush()
  51. Vt100Input._fds_not_a_terminal.add(fd)
  52. #
  53. self.stdin = stdin
  54. # Create a backup of the fileno(). We want this to work even if the
  55. # underlying file is closed, so that `typeahead_hash()` keeps working.
  56. self._fileno = stdin.fileno()
  57. self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
  58. self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding)
  59. self.vt100_parser = Vt100Parser(
  60. lambda key_press: self._buffer.append(key_press)
  61. )
  62. def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
  63. """
  64. Return a context manager that makes this input active in the current
  65. event loop.
  66. """
  67. return _attached_input(self, input_ready_callback)
  68. def detach(self) -> ContextManager[None]:
  69. """
  70. Return a context manager that makes sure that this input is not active
  71. in the current event loop.
  72. """
  73. return _detached_input(self)
  74. def read_keys(self) -> list[KeyPress]:
  75. "Read list of KeyPress."
  76. # Read text from stdin.
  77. data = self.stdin_reader.read()
  78. # Pass it through our vt100 parser.
  79. self.vt100_parser.feed(data)
  80. # Return result.
  81. result = self._buffer
  82. self._buffer = []
  83. return result
  84. def flush_keys(self) -> list[KeyPress]:
  85. """
  86. Flush pending keys and return them.
  87. (Used for flushing the 'escape' key.)
  88. """
  89. # Flush all pending keys. (This is most important to flush the vt100
  90. # 'Escape' key early when nothing else follows.)
  91. self.vt100_parser.flush()
  92. # Return result.
  93. result = self._buffer
  94. self._buffer = []
  95. return result
  96. @property
  97. def closed(self) -> bool:
  98. return self.stdin_reader.closed
  99. def raw_mode(self) -> ContextManager[None]:
  100. return raw_mode(self.stdin.fileno())
  101. def cooked_mode(self) -> ContextManager[None]:
  102. return cooked_mode(self.stdin.fileno())
  103. def fileno(self) -> int:
  104. return self.stdin.fileno()
  105. def typeahead_hash(self) -> str:
  106. return f"fd-{self._fileno}"
  107. _current_callbacks: dict[
  108. tuple[AbstractEventLoop, int], Callable[[], None] | None
  109. ] = {} # (loop, fd) -> current callback
  110. @contextlib.contextmanager
  111. def _attached_input(
  112. input: Vt100Input, callback: Callable[[], None]
  113. ) -> Generator[None, None, None]:
  114. """
  115. Context manager that makes this input active in the current event loop.
  116. :param input: :class:`~prompt_toolkit.input.Input` object.
  117. :param callback: Called when the input is ready to read.
  118. """
  119. loop = get_running_loop()
  120. fd = input.fileno()
  121. previous = _current_callbacks.get((loop, fd))
  122. def callback_wrapper() -> None:
  123. """Wrapper around the callback that already removes the reader when
  124. the input is closed. Otherwise, we keep continuously calling this
  125. callback, until we leave the context manager (which can happen a bit
  126. later). This fixes issues when piping /dev/null into a prompt_toolkit
  127. application."""
  128. if input.closed:
  129. loop.remove_reader(fd)
  130. callback()
  131. try:
  132. loop.add_reader(fd, callback_wrapper)
  133. except PermissionError:
  134. # For `EPollSelector`, adding /dev/null to the event loop will raise
  135. # `PermissionError` (that doesn't happen for `SelectSelector`
  136. # apparently). Whenever we get a `PermissionError`, we can raise
  137. # `EOFError`, because there's not more to be read anyway. `EOFError` is
  138. # an exception that people expect in
  139. # `prompt_toolkit.application.Application.run()`.
  140. # To reproduce, do: `ptpython 0< /dev/null 1< /dev/null`
  141. raise EOFError
  142. _current_callbacks[loop, fd] = callback
  143. try:
  144. yield
  145. finally:
  146. loop.remove_reader(fd)
  147. if previous:
  148. loop.add_reader(fd, previous)
  149. _current_callbacks[loop, fd] = previous
  150. else:
  151. del _current_callbacks[loop, fd]
  152. @contextlib.contextmanager
  153. def _detached_input(input: Vt100Input) -> Generator[None, None, None]:
  154. loop = get_running_loop()
  155. fd = input.fileno()
  156. previous = _current_callbacks.get((loop, fd))
  157. if previous:
  158. loop.remove_reader(fd)
  159. _current_callbacks[loop, fd] = None
  160. try:
  161. yield
  162. finally:
  163. if previous:
  164. loop.add_reader(fd, previous)
  165. _current_callbacks[loop, fd] = previous
  166. class raw_mode:
  167. """
  168. ::
  169. with raw_mode(stdin):
  170. ''' the pseudo-terminal stdin is now used in raw mode '''
  171. We ignore errors when executing `tcgetattr` fails.
  172. """
  173. # There are several reasons for ignoring errors:
  174. # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would
  175. # execute this code (In a Python REPL, for instance):
  176. #
  177. # import os; f = open(os.devnull); os.dup2(f.fileno(), 0)
  178. #
  179. # The result is that the eventloop will stop correctly, because it has
  180. # to logic to quit when stdin is closed. However, we should not fail at
  181. # this point. See:
  182. # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393
  183. # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392
  184. # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated.
  185. # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165
  186. def __init__(self, fileno: int) -> None:
  187. self.fileno = fileno
  188. self.attrs_before: list[int | list[bytes | int]] | None
  189. try:
  190. self.attrs_before = termios.tcgetattr(fileno)
  191. except termios.error:
  192. # Ignore attribute errors.
  193. self.attrs_before = None
  194. def __enter__(self) -> None:
  195. # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this:
  196. try:
  197. newattr = termios.tcgetattr(self.fileno)
  198. except termios.error:
  199. pass
  200. else:
  201. newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
  202. newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])
  203. # VMIN defines the number of characters read at a time in
  204. # non-canonical mode. It seems to default to 1 on Linux, but on
  205. # Solaris and derived operating systems it defaults to 4. (This is
  206. # because the VMIN slot is the same as the VEOF slot, which
  207. # defaults to ASCII EOT = Ctrl-D = 4.)
  208. newattr[tty.CC][termios.VMIN] = 1
  209. termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)
  210. @classmethod
  211. def _patch_lflag(cls, attrs: int) -> int:
  212. return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
  213. @classmethod
  214. def _patch_iflag(cls, attrs: int) -> int:
  215. return attrs & ~(
  216. # Disable XON/XOFF flow control on output and input.
  217. # (Don't capture Ctrl-S and Ctrl-Q.)
  218. # Like executing: "stty -ixon."
  219. termios.IXON
  220. | termios.IXOFF
  221. |
  222. # Don't translate carriage return into newline on input.
  223. termios.ICRNL
  224. | termios.INLCR
  225. | termios.IGNCR
  226. )
  227. def __exit__(self, *a: object) -> None:
  228. if self.attrs_before is not None:
  229. try:
  230. termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
  231. except termios.error:
  232. pass
  233. # # Put the terminal in application mode.
  234. # self._stdout.write('\x1b[?1h')
  235. class cooked_mode(raw_mode):
  236. """
  237. The opposite of ``raw_mode``, used when we need cooked mode inside a
  238. `raw_mode` block. Used in `Application.run_in_terminal`.::
  239. with cooked_mode(stdin):
  240. ''' the pseudo-terminal stdin is now used in cooked mode. '''
  241. """
  242. @classmethod
  243. def _patch_lflag(cls, attrs: int) -> int:
  244. return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
  245. @classmethod
  246. def _patch_iflag(cls, attrs: int) -> int:
  247. # Turn the ICRNL flag back on. (Without this, calling `input()` in
  248. # run_in_terminal doesn't work and displays ^M instead. Ptpython
  249. # evaluates commands using `run_in_terminal`, so it's important that
  250. # they translate ^M back into ^J.)
  251. return attrs | termios.ICRNL