123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- """
- Parser for VT100 input stream.
- """
- from __future__ import annotations
- import re
- from typing import Callable, Dict, Generator
- from ..key_binding.key_processor import KeyPress
- from ..keys import Keys
- from .ansi_escape_sequences import ANSI_SEQUENCES
- __all__ = [
- "Vt100Parser",
- ]
- # Regex matching any CPR response
- # (Note that we use '\Z' instead of '$', because '$' could include a trailing
- # newline.)
- _cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z")
- # Mouse events:
- # Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
- _mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
- # Regex matching any valid prefix of a CPR response.
- # (Note that it doesn't contain the last character, the 'R'. The prefix has to
- # be shorter.)
- _cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z")
- _mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z")
- class _Flush:
- """Helper object to indicate flush operation to the parser."""
- pass
- class _IsPrefixOfLongerMatchCache(Dict[str, bool]):
- """
- Dictionary that maps input sequences to a boolean indicating whether there is
- any key that start with this characters.
- """
- def __missing__(self, prefix: str) -> bool:
- # (hard coded) If this could be a prefix of a CPR response, return
- # True.
- if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(
- prefix
- ):
- result = True
- else:
- # If this could be a prefix of anything else, also return True.
- result = any(
- v
- for k, v in ANSI_SEQUENCES.items()
- if k.startswith(prefix) and k != prefix
- )
- self[prefix] = result
- return result
- _IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
- class Vt100Parser:
- """
- Parser for VT100 input stream.
- Data can be fed through the `feed` method and the given callback will be
- called with KeyPress objects.
- ::
- def callback(key):
- pass
- i = Vt100Parser(callback)
- i.feed('data\x01...')
- :attr feed_key_callback: Function that will be called when a key is parsed.
- """
- # Lookup table of ANSI escape sequences for a VT100 terminal
- # Hint: in order to know what sequences your terminal writes to stdin, run
- # "od -c" and start typing.
- def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None:
- self.feed_key_callback = feed_key_callback
- self.reset()
- def reset(self, request: bool = False) -> None:
- self._in_bracketed_paste = False
- self._start_parser()
- def _start_parser(self) -> None:
- """
- Start the parser coroutine.
- """
- self._input_parser = self._input_parser_generator()
- self._input_parser.send(None) # type: ignore
- def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]:
- """
- Return the key (or keys) that maps to this prefix.
- """
- # (hard coded) If we match a CPR response, return Keys.CPRResponse.
- # (This one doesn't fit in the ANSI_SEQUENCES, because it contains
- # integer variables.)
- if _cpr_response_re.match(prefix):
- return Keys.CPRResponse
- elif _mouse_event_re.match(prefix):
- return Keys.Vt100MouseEvent
- # Otherwise, use the mappings.
- try:
- return ANSI_SEQUENCES[prefix]
- except KeyError:
- return None
- def _input_parser_generator(self) -> Generator[None, str | _Flush, None]:
- """
- Coroutine (state machine) for the input parser.
- """
- prefix = ""
- retry = False
- flush = False
- while True:
- flush = False
- if retry:
- retry = False
- else:
- # Get next character.
- c = yield
- if isinstance(c, _Flush):
- flush = True
- else:
- prefix += c
- # If we have some data, check for matches.
- if prefix:
- is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
- match = self._get_match(prefix)
- # Exact matches found, call handlers..
- if (flush or not is_prefix_of_longer_match) and match:
- self._call_handler(match, prefix)
- prefix = ""
- # No exact match found.
- elif (flush or not is_prefix_of_longer_match) and not match:
- found = False
- retry = True
- # Loop over the input, try the longest match first and
- # shift.
- for i in range(len(prefix), 0, -1):
- match = self._get_match(prefix[:i])
- if match:
- self._call_handler(match, prefix[:i])
- prefix = prefix[i:]
- found = True
- if not found:
- self._call_handler(prefix[0], prefix[0])
- prefix = prefix[1:]
- def _call_handler(
- self, key: str | Keys | tuple[Keys, ...], insert_text: str
- ) -> None:
- """
- Callback to handler.
- """
- if isinstance(key, tuple):
- # Received ANSI sequence that corresponds with multiple keys
- # (probably alt+something). Handle keys individually, but only pass
- # data payload to first KeyPress (so that we won't insert it
- # multiple times).
- for i, k in enumerate(key):
- self._call_handler(k, insert_text if i == 0 else "")
- else:
- if key == Keys.BracketedPaste:
- self._in_bracketed_paste = True
- self._paste_buffer = ""
- else:
- self.feed_key_callback(KeyPress(key, insert_text))
- def feed(self, data: str) -> None:
- """
- Feed the input stream.
- :param data: Input string (unicode).
- """
- # Handle bracketed paste. (We bypass the parser that matches all other
- # key presses and keep reading input until we see the end mark.)
- # This is much faster then parsing character by character.
- if self._in_bracketed_paste:
- self._paste_buffer += data
- end_mark = "\x1b[201~"
- if end_mark in self._paste_buffer:
- end_index = self._paste_buffer.index(end_mark)
- # Feed content to key bindings.
- paste_content = self._paste_buffer[:end_index]
- self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
- # Quit bracketed paste mode and handle remaining input.
- self._in_bracketed_paste = False
- remaining = self._paste_buffer[end_index + len(end_mark) :]
- self._paste_buffer = ""
- self.feed(remaining)
- # Handle normal input character by character.
- else:
- for i, c in enumerate(data):
- if self._in_bracketed_paste:
- # Quit loop and process from this position when the parser
- # entered bracketed paste.
- self.feed(data[i:])
- break
- else:
- self._input_parser.send(c)
- def flush(self) -> None:
- """
- Flush the buffer of the input stream.
- This will allow us to handle the escape key (or maybe meta) sooner.
- The input received by the escape key is actually the same as the first
- characters of e.g. Arrow-Up, so without knowing what follows the escape
- sequence, we don't know whether escape has been pressed, or whether
- it's something else. This flush function should be called after a
- timeout, and processes everything that's still in the buffer as-is, so
- without assuming any characters will follow.
- """
- self._input_parser.send(_Flush())
- def feed_and_flush(self, data: str) -> None:
- """
- Wrapper around ``feed`` and ``flush``.
- """
- self.feed(data)
- self.flush()
|