vt100_parser.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. """
  2. Parser for VT100 input stream.
  3. """
  4. from __future__ import annotations
  5. import re
  6. from typing import Callable, Dict, Generator
  7. from ..key_binding.key_processor import KeyPress
  8. from ..keys import Keys
  9. from .ansi_escape_sequences import ANSI_SEQUENCES
  10. __all__ = [
  11. "Vt100Parser",
  12. ]
  13. # Regex matching any CPR response
  14. # (Note that we use '\Z' instead of '$', because '$' could include a trailing
  15. # newline.)
  16. _cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z")
  17. # Mouse events:
  18. # Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
  19. _mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
  20. # Regex matching any valid prefix of a CPR response.
  21. # (Note that it doesn't contain the last character, the 'R'. The prefix has to
  22. # be shorter.)
  23. _cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z")
  24. _mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z")
  25. class _Flush:
  26. """Helper object to indicate flush operation to the parser."""
  27. pass
  28. class _IsPrefixOfLongerMatchCache(Dict[str, bool]):
  29. """
  30. Dictionary that maps input sequences to a boolean indicating whether there is
  31. any key that start with this characters.
  32. """
  33. def __missing__(self, prefix: str) -> bool:
  34. # (hard coded) If this could be a prefix of a CPR response, return
  35. # True.
  36. if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(
  37. prefix
  38. ):
  39. result = True
  40. else:
  41. # If this could be a prefix of anything else, also return True.
  42. result = any(
  43. v
  44. for k, v in ANSI_SEQUENCES.items()
  45. if k.startswith(prefix) and k != prefix
  46. )
  47. self[prefix] = result
  48. return result
  49. _IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
  50. class Vt100Parser:
  51. """
  52. Parser for VT100 input stream.
  53. Data can be fed through the `feed` method and the given callback will be
  54. called with KeyPress objects.
  55. ::
  56. def callback(key):
  57. pass
  58. i = Vt100Parser(callback)
  59. i.feed('data\x01...')
  60. :attr feed_key_callback: Function that will be called when a key is parsed.
  61. """
  62. # Lookup table of ANSI escape sequences for a VT100 terminal
  63. # Hint: in order to know what sequences your terminal writes to stdin, run
  64. # "od -c" and start typing.
  65. def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None:
  66. self.feed_key_callback = feed_key_callback
  67. self.reset()
  68. def reset(self, request: bool = False) -> None:
  69. self._in_bracketed_paste = False
  70. self._start_parser()
  71. def _start_parser(self) -> None:
  72. """
  73. Start the parser coroutine.
  74. """
  75. self._input_parser = self._input_parser_generator()
  76. self._input_parser.send(None) # type: ignore
  77. def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]:
  78. """
  79. Return the key (or keys) that maps to this prefix.
  80. """
  81. # (hard coded) If we match a CPR response, return Keys.CPRResponse.
  82. # (This one doesn't fit in the ANSI_SEQUENCES, because it contains
  83. # integer variables.)
  84. if _cpr_response_re.match(prefix):
  85. return Keys.CPRResponse
  86. elif _mouse_event_re.match(prefix):
  87. return Keys.Vt100MouseEvent
  88. # Otherwise, use the mappings.
  89. try:
  90. return ANSI_SEQUENCES[prefix]
  91. except KeyError:
  92. return None
  93. def _input_parser_generator(self) -> Generator[None, str | _Flush, None]:
  94. """
  95. Coroutine (state machine) for the input parser.
  96. """
  97. prefix = ""
  98. retry = False
  99. flush = False
  100. while True:
  101. flush = False
  102. if retry:
  103. retry = False
  104. else:
  105. # Get next character.
  106. c = yield
  107. if isinstance(c, _Flush):
  108. flush = True
  109. else:
  110. prefix += c
  111. # If we have some data, check for matches.
  112. if prefix:
  113. is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
  114. match = self._get_match(prefix)
  115. # Exact matches found, call handlers..
  116. if (flush or not is_prefix_of_longer_match) and match:
  117. self._call_handler(match, prefix)
  118. prefix = ""
  119. # No exact match found.
  120. elif (flush or not is_prefix_of_longer_match) and not match:
  121. found = False
  122. retry = True
  123. # Loop over the input, try the longest match first and
  124. # shift.
  125. for i in range(len(prefix), 0, -1):
  126. match = self._get_match(prefix[:i])
  127. if match:
  128. self._call_handler(match, prefix[:i])
  129. prefix = prefix[i:]
  130. found = True
  131. if not found:
  132. self._call_handler(prefix[0], prefix[0])
  133. prefix = prefix[1:]
  134. def _call_handler(
  135. self, key: str | Keys | tuple[Keys, ...], insert_text: str
  136. ) -> None:
  137. """
  138. Callback to handler.
  139. """
  140. if isinstance(key, tuple):
  141. # Received ANSI sequence that corresponds with multiple keys
  142. # (probably alt+something). Handle keys individually, but only pass
  143. # data payload to first KeyPress (so that we won't insert it
  144. # multiple times).
  145. for i, k in enumerate(key):
  146. self._call_handler(k, insert_text if i == 0 else "")
  147. else:
  148. if key == Keys.BracketedPaste:
  149. self._in_bracketed_paste = True
  150. self._paste_buffer = ""
  151. else:
  152. self.feed_key_callback(KeyPress(key, insert_text))
  153. def feed(self, data: str) -> None:
  154. """
  155. Feed the input stream.
  156. :param data: Input string (unicode).
  157. """
  158. # Handle bracketed paste. (We bypass the parser that matches all other
  159. # key presses and keep reading input until we see the end mark.)
  160. # This is much faster then parsing character by character.
  161. if self._in_bracketed_paste:
  162. self._paste_buffer += data
  163. end_mark = "\x1b[201~"
  164. if end_mark in self._paste_buffer:
  165. end_index = self._paste_buffer.index(end_mark)
  166. # Feed content to key bindings.
  167. paste_content = self._paste_buffer[:end_index]
  168. self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
  169. # Quit bracketed paste mode and handle remaining input.
  170. self._in_bracketed_paste = False
  171. remaining = self._paste_buffer[end_index + len(end_mark) :]
  172. self._paste_buffer = ""
  173. self.feed(remaining)
  174. # Handle normal input character by character.
  175. else:
  176. for i, c in enumerate(data):
  177. if self._in_bracketed_paste:
  178. # Quit loop and process from this position when the parser
  179. # entered bracketed paste.
  180. self.feed(data[i:])
  181. break
  182. else:
  183. self._input_parser.send(c)
  184. def flush(self) -> None:
  185. """
  186. Flush the buffer of the input stream.
  187. This will allow us to handle the escape key (or maybe meta) sooner.
  188. The input received by the escape key is actually the same as the first
  189. characters of e.g. Arrow-Up, so without knowing what follows the escape
  190. sequence, we don't know whether escape has been pressed, or whether
  191. it's something else. This flush function should be called after a
  192. timeout, and processes everything that's still in the buffer as-is, so
  193. without assuming any characters will follow.
  194. """
  195. self._input_parser.send(_Flush())
  196. def feed_and_flush(self, data: str) -> None:
  197. """
  198. Wrapper around ``feed`` and ``flush``.
  199. """
  200. self.feed(data)
  201. self.flush()