win32.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886
  1. from __future__ import annotations
  2. import os
  3. import sys
  4. from abc import abstractmethod
  5. from asyncio import get_running_loop
  6. from contextlib import contextmanager
  7. from ..utils import SPHINX_AUTODOC_RUNNING
  8. assert sys.platform == "win32"
  9. # Do not import win32-specific stuff when generating documentation.
  10. # Otherwise RTD would be unable to generate docs for this module.
  11. if not SPHINX_AUTODOC_RUNNING:
  12. import msvcrt
  13. from ctypes import windll
  14. from ctypes import Array, byref, pointer
  15. from ctypes.wintypes import DWORD, HANDLE
  16. from typing import Callable, ContextManager, Iterable, Iterator, TextIO
  17. from prompt_toolkit.eventloop import run_in_executor_with_context
  18. from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles
  19. from prompt_toolkit.key_binding.key_processor import KeyPress
  20. from prompt_toolkit.keys import Keys
  21. from prompt_toolkit.mouse_events import MouseButton, MouseEventType
  22. from prompt_toolkit.win32_types import (
  23. INPUT_RECORD,
  24. KEY_EVENT_RECORD,
  25. MOUSE_EVENT_RECORD,
  26. STD_INPUT_HANDLE,
  27. EventTypes,
  28. )
  29. from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
  30. from .base import Input
  31. from .vt100_parser import Vt100Parser
  32. __all__ = [
  33. "Win32Input",
  34. "ConsoleInputReader",
  35. "raw_mode",
  36. "cooked_mode",
  37. "attach_win32_input",
  38. "detach_win32_input",
  39. ]
  40. # Win32 Constants for MOUSE_EVENT_RECORD.
  41. # See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str
  42. FROM_LEFT_1ST_BUTTON_PRESSED = 0x1
  43. RIGHTMOST_BUTTON_PRESSED = 0x2
  44. MOUSE_MOVED = 0x0001
  45. MOUSE_WHEELED = 0x0004
  46. # See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx
  47. ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
  48. class _Win32InputBase(Input):
  49. """
  50. Base class for `Win32Input` and `Win32PipeInput`.
  51. """
  52. def __init__(self) -> None:
  53. self.win32_handles = _Win32Handles()
  54. @property
  55. @abstractmethod
  56. def handle(self) -> HANDLE:
  57. pass
  58. class Win32Input(_Win32InputBase):
  59. """
  60. `Input` class that reads from the Windows console.
  61. """
  62. def __init__(self, stdin: TextIO | None = None) -> None:
  63. super().__init__()
  64. self._use_virtual_terminal_input = _is_win_vt100_input_enabled()
  65. self.console_input_reader: Vt100ConsoleInputReader | ConsoleInputReader
  66. if self._use_virtual_terminal_input:
  67. self.console_input_reader = Vt100ConsoleInputReader()
  68. else:
  69. self.console_input_reader = ConsoleInputReader()
  70. def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
  71. """
  72. Return a context manager that makes this input active in the current
  73. event loop.
  74. """
  75. return attach_win32_input(self, input_ready_callback)
  76. def detach(self) -> ContextManager[None]:
  77. """
  78. Return a context manager that makes sure that this input is not active
  79. in the current event loop.
  80. """
  81. return detach_win32_input(self)
  82. def read_keys(self) -> list[KeyPress]:
  83. return list(self.console_input_reader.read())
  84. def flush(self) -> None:
  85. pass
  86. @property
  87. def closed(self) -> bool:
  88. return False
  89. def raw_mode(self) -> ContextManager[None]:
  90. return raw_mode(
  91. use_win10_virtual_terminal_input=self._use_virtual_terminal_input
  92. )
  93. def cooked_mode(self) -> ContextManager[None]:
  94. return cooked_mode()
  95. def fileno(self) -> int:
  96. # The windows console doesn't depend on the file handle, so
  97. # this is not used for the event loop (which uses the
  98. # handle instead). But it's used in `Application.run_system_command`
  99. # which opens a subprocess with a given stdin/stdout.
  100. return sys.stdin.fileno()
  101. def typeahead_hash(self) -> str:
  102. return "win32-input"
  103. def close(self) -> None:
  104. self.console_input_reader.close()
  105. @property
  106. def handle(self) -> HANDLE:
  107. return self.console_input_reader.handle
  108. class ConsoleInputReader:
  109. """
  110. :param recognize_paste: When True, try to discover paste actions and turn
  111. the event into a BracketedPaste.
  112. """
  113. # Keys with character data.
  114. mappings = {
  115. b"\x1b": Keys.Escape,
  116. b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@)
  117. b"\x01": Keys.ControlA, # Control-A (home)
  118. b"\x02": Keys.ControlB, # Control-B (emacs cursor left)
  119. b"\x03": Keys.ControlC, # Control-C (interrupt)
  120. b"\x04": Keys.ControlD, # Control-D (exit)
  121. b"\x05": Keys.ControlE, # Control-E (end)
  122. b"\x06": Keys.ControlF, # Control-F (cursor forward)
  123. b"\x07": Keys.ControlG, # Control-G
  124. b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
  125. b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
  126. b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
  127. b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
  128. b"\x0c": Keys.ControlL, # Control-L (clear; form feed)
  129. b"\x0d": Keys.ControlM, # Control-M (enter)
  130. b"\x0e": Keys.ControlN, # Control-N (14) (history forward)
  131. b"\x0f": Keys.ControlO, # Control-O (15)
  132. b"\x10": Keys.ControlP, # Control-P (16) (history back)
  133. b"\x11": Keys.ControlQ, # Control-Q
  134. b"\x12": Keys.ControlR, # Control-R (18) (reverse search)
  135. b"\x13": Keys.ControlS, # Control-S (19) (forward search)
  136. b"\x14": Keys.ControlT, # Control-T
  137. b"\x15": Keys.ControlU, # Control-U
  138. b"\x16": Keys.ControlV, # Control-V
  139. b"\x17": Keys.ControlW, # Control-W
  140. b"\x18": Keys.ControlX, # Control-X
  141. b"\x19": Keys.ControlY, # Control-Y (25)
  142. b"\x1a": Keys.ControlZ, # Control-Z
  143. b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-|
  144. b"\x1d": Keys.ControlSquareClose, # Control-]
  145. b"\x1e": Keys.ControlCircumflex, # Control-^
  146. b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
  147. b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.)
  148. }
  149. # Keys that don't carry character data.
  150. keycodes = {
  151. # Home/End
  152. 33: Keys.PageUp,
  153. 34: Keys.PageDown,
  154. 35: Keys.End,
  155. 36: Keys.Home,
  156. # Arrows
  157. 37: Keys.Left,
  158. 38: Keys.Up,
  159. 39: Keys.Right,
  160. 40: Keys.Down,
  161. 45: Keys.Insert,
  162. 46: Keys.Delete,
  163. # F-keys.
  164. 112: Keys.F1,
  165. 113: Keys.F2,
  166. 114: Keys.F3,
  167. 115: Keys.F4,
  168. 116: Keys.F5,
  169. 117: Keys.F6,
  170. 118: Keys.F7,
  171. 119: Keys.F8,
  172. 120: Keys.F9,
  173. 121: Keys.F10,
  174. 122: Keys.F11,
  175. 123: Keys.F12,
  176. }
  177. LEFT_ALT_PRESSED = 0x0002
  178. RIGHT_ALT_PRESSED = 0x0001
  179. SHIFT_PRESSED = 0x0010
  180. LEFT_CTRL_PRESSED = 0x0008
  181. RIGHT_CTRL_PRESSED = 0x0004
  182. def __init__(self, recognize_paste: bool = True) -> None:
  183. self._fdcon = None
  184. self.recognize_paste = recognize_paste
  185. # When stdin is a tty, use that handle, otherwise, create a handle from
  186. # CONIN$.
  187. self.handle: HANDLE
  188. if sys.stdin.isatty():
  189. self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  190. else:
  191. self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
  192. self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
  193. def close(self) -> None:
  194. "Close fdcon."
  195. if self._fdcon is not None:
  196. os.close(self._fdcon)
  197. def read(self) -> Iterable[KeyPress]:
  198. """
  199. Return a list of `KeyPress` instances. It won't return anything when
  200. there was nothing to read. (This function doesn't block.)
  201. http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
  202. """
  203. max_count = 2048 # Max events to read at the same time.
  204. read = DWORD(0)
  205. arrtype = INPUT_RECORD * max_count
  206. input_records = arrtype()
  207. # Check whether there is some input to read. `ReadConsoleInputW` would
  208. # block otherwise.
  209. # (Actually, the event loop is responsible to make sure that this
  210. # function is only called when there is something to read, but for some
  211. # reason this happened in the asyncio_win32 loop, and it's better to be
  212. # safe anyway.)
  213. if not wait_for_handles([self.handle], timeout=0):
  214. return
  215. # Get next batch of input event.
  216. windll.kernel32.ReadConsoleInputW(
  217. self.handle, pointer(input_records), max_count, pointer(read)
  218. )
  219. # First, get all the keys from the input buffer, in order to determine
  220. # whether we should consider this a paste event or not.
  221. all_keys = list(self._get_keys(read, input_records))
  222. # Fill in 'data' for key presses.
  223. all_keys = [self._insert_key_data(key) for key in all_keys]
  224. # Correct non-bmp characters that are passed as separate surrogate codes
  225. all_keys = list(self._merge_paired_surrogates(all_keys))
  226. if self.recognize_paste and self._is_paste(all_keys):
  227. gen = iter(all_keys)
  228. k: KeyPress | None
  229. for k in gen:
  230. # Pasting: if the current key consists of text or \n, turn it
  231. # into a BracketedPaste.
  232. data = []
  233. while k and (
  234. not isinstance(k.key, Keys)
  235. or k.key in {Keys.ControlJ, Keys.ControlM}
  236. ):
  237. data.append(k.data)
  238. try:
  239. k = next(gen)
  240. except StopIteration:
  241. k = None
  242. if data:
  243. yield KeyPress(Keys.BracketedPaste, "".join(data))
  244. if k is not None:
  245. yield k
  246. else:
  247. yield from all_keys
  248. def _insert_key_data(self, key_press: KeyPress) -> KeyPress:
  249. """
  250. Insert KeyPress data, for vt100 compatibility.
  251. """
  252. if key_press.data:
  253. return key_press
  254. if isinstance(key_press.key, Keys):
  255. data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "")
  256. else:
  257. data = ""
  258. return KeyPress(key_press.key, data)
  259. def _get_keys(
  260. self, read: DWORD, input_records: Array[INPUT_RECORD]
  261. ) -> Iterator[KeyPress]:
  262. """
  263. Generator that yields `KeyPress` objects from the input records.
  264. """
  265. for i in range(read.value):
  266. ir = input_records[i]
  267. # Get the right EventType from the EVENT_RECORD.
  268. # (For some reason the Windows console application 'cmder'
  269. # [http://gooseberrycreative.com/cmder/] can return '0' for
  270. # ir.EventType. -- Just ignore that.)
  271. if ir.EventType in EventTypes:
  272. ev = getattr(ir.Event, EventTypes[ir.EventType])
  273. # Process if this is a key event. (We also have mouse, menu and
  274. # focus events.)
  275. if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
  276. yield from self._event_to_key_presses(ev)
  277. elif isinstance(ev, MOUSE_EVENT_RECORD):
  278. yield from self._handle_mouse(ev)
  279. @staticmethod
  280. def _merge_paired_surrogates(key_presses: list[KeyPress]) -> Iterator[KeyPress]:
  281. """
  282. Combines consecutive KeyPresses with high and low surrogates into
  283. single characters
  284. """
  285. buffered_high_surrogate = None
  286. for key in key_presses:
  287. is_text = not isinstance(key.key, Keys)
  288. is_high_surrogate = is_text and "\ud800" <= key.key <= "\udbff"
  289. is_low_surrogate = is_text and "\udc00" <= key.key <= "\udfff"
  290. if buffered_high_surrogate:
  291. if is_low_surrogate:
  292. # convert high surrogate + low surrogate to single character
  293. fullchar = (
  294. (buffered_high_surrogate.key + key.key)
  295. .encode("utf-16-le", "surrogatepass")
  296. .decode("utf-16-le")
  297. )
  298. key = KeyPress(fullchar, fullchar)
  299. else:
  300. yield buffered_high_surrogate
  301. buffered_high_surrogate = None
  302. if is_high_surrogate:
  303. buffered_high_surrogate = key
  304. else:
  305. yield key
  306. if buffered_high_surrogate:
  307. yield buffered_high_surrogate
  308. @staticmethod
  309. def _is_paste(keys: list[KeyPress]) -> bool:
  310. """
  311. Return `True` when we should consider this list of keys as a paste
  312. event. Pasted text on windows will be turned into a
  313. `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably
  314. the best possible way to detect pasting of text and handle that
  315. correctly.)
  316. """
  317. # Consider paste when it contains at least one newline and at least one
  318. # other character.
  319. text_count = 0
  320. newline_count = 0
  321. for k in keys:
  322. if not isinstance(k.key, Keys):
  323. text_count += 1
  324. if k.key == Keys.ControlM:
  325. newline_count += 1
  326. return newline_count >= 1 and text_count >= 1
  327. def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> list[KeyPress]:
  328. """
  329. For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances.
  330. """
  331. assert isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown
  332. result: KeyPress | None = None
  333. control_key_state = ev.ControlKeyState
  334. u_char = ev.uChar.UnicodeChar
  335. # Use surrogatepass because u_char may be an unmatched surrogate
  336. ascii_char = u_char.encode("utf-8", "surrogatepass")
  337. # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the
  338. # unicode code point truncated to 1 byte. See also:
  339. # https://github.com/ipython/ipython/issues/10004
  340. # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389
  341. if u_char == "\x00":
  342. if ev.VirtualKeyCode in self.keycodes:
  343. result = KeyPress(self.keycodes[ev.VirtualKeyCode], "")
  344. else:
  345. if ascii_char in self.mappings:
  346. if self.mappings[ascii_char] == Keys.ControlJ:
  347. u_char = (
  348. "\n" # Windows sends \n, turn into \r for unix compatibility.
  349. )
  350. result = KeyPress(self.mappings[ascii_char], u_char)
  351. else:
  352. result = KeyPress(u_char, u_char)
  353. # First we handle Shift-Control-Arrow/Home/End (need to do this first)
  354. if (
  355. (
  356. control_key_state & self.LEFT_CTRL_PRESSED
  357. or control_key_state & self.RIGHT_CTRL_PRESSED
  358. )
  359. and control_key_state & self.SHIFT_PRESSED
  360. and result
  361. ):
  362. mapping: dict[str, str] = {
  363. Keys.Left: Keys.ControlShiftLeft,
  364. Keys.Right: Keys.ControlShiftRight,
  365. Keys.Up: Keys.ControlShiftUp,
  366. Keys.Down: Keys.ControlShiftDown,
  367. Keys.Home: Keys.ControlShiftHome,
  368. Keys.End: Keys.ControlShiftEnd,
  369. Keys.Insert: Keys.ControlShiftInsert,
  370. Keys.PageUp: Keys.ControlShiftPageUp,
  371. Keys.PageDown: Keys.ControlShiftPageDown,
  372. }
  373. result.key = mapping.get(result.key, result.key)
  374. # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys.
  375. if (
  376. control_key_state & self.LEFT_CTRL_PRESSED
  377. or control_key_state & self.RIGHT_CTRL_PRESSED
  378. ) and result:
  379. mapping = {
  380. Keys.Left: Keys.ControlLeft,
  381. Keys.Right: Keys.ControlRight,
  382. Keys.Up: Keys.ControlUp,
  383. Keys.Down: Keys.ControlDown,
  384. Keys.Home: Keys.ControlHome,
  385. Keys.End: Keys.ControlEnd,
  386. Keys.Insert: Keys.ControlInsert,
  387. Keys.Delete: Keys.ControlDelete,
  388. Keys.PageUp: Keys.ControlPageUp,
  389. Keys.PageDown: Keys.ControlPageDown,
  390. }
  391. result.key = mapping.get(result.key, result.key)
  392. # Turn 'Tab' into 'BackTab' when shift was pressed.
  393. # Also handle other shift-key combination
  394. if control_key_state & self.SHIFT_PRESSED and result:
  395. mapping = {
  396. Keys.Tab: Keys.BackTab,
  397. Keys.Left: Keys.ShiftLeft,
  398. Keys.Right: Keys.ShiftRight,
  399. Keys.Up: Keys.ShiftUp,
  400. Keys.Down: Keys.ShiftDown,
  401. Keys.Home: Keys.ShiftHome,
  402. Keys.End: Keys.ShiftEnd,
  403. Keys.Insert: Keys.ShiftInsert,
  404. Keys.Delete: Keys.ShiftDelete,
  405. Keys.PageUp: Keys.ShiftPageUp,
  406. Keys.PageDown: Keys.ShiftPageDown,
  407. }
  408. result.key = mapping.get(result.key, result.key)
  409. # Turn 'Space' into 'ControlSpace' when control was pressed.
  410. if (
  411. (
  412. control_key_state & self.LEFT_CTRL_PRESSED
  413. or control_key_state & self.RIGHT_CTRL_PRESSED
  414. )
  415. and result
  416. and result.data == " "
  417. ):
  418. result = KeyPress(Keys.ControlSpace, " ")
  419. # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot
  420. # detect this combination. But it's really practical on Windows.)
  421. if (
  422. (
  423. control_key_state & self.LEFT_CTRL_PRESSED
  424. or control_key_state & self.RIGHT_CTRL_PRESSED
  425. )
  426. and result
  427. and result.key == Keys.ControlJ
  428. ):
  429. return [KeyPress(Keys.Escape, ""), result]
  430. # Return result. If alt was pressed, prefix the result with an
  431. # 'Escape' key, just like unix VT100 terminals do.
  432. # NOTE: Only replace the left alt with escape. The right alt key often
  433. # acts as altgr and is used in many non US keyboard layouts for
  434. # typing some special characters, like a backslash. We don't want
  435. # all backslashes to be prefixed with escape. (Esc-\ has a
  436. # meaning in E-macs, for instance.)
  437. if result:
  438. meta_pressed = control_key_state & self.LEFT_ALT_PRESSED
  439. if meta_pressed:
  440. return [KeyPress(Keys.Escape, ""), result]
  441. else:
  442. return [result]
  443. else:
  444. return []
  445. def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> list[KeyPress]:
  446. """
  447. Handle mouse events. Return a list of KeyPress instances.
  448. """
  449. event_flags = ev.EventFlags
  450. button_state = ev.ButtonState
  451. event_type: MouseEventType | None = None
  452. button: MouseButton = MouseButton.NONE
  453. # Scroll events.
  454. if event_flags & MOUSE_WHEELED:
  455. if button_state > 0:
  456. event_type = MouseEventType.SCROLL_UP
  457. else:
  458. event_type = MouseEventType.SCROLL_DOWN
  459. else:
  460. # Handle button state for non-scroll events.
  461. if button_state == FROM_LEFT_1ST_BUTTON_PRESSED:
  462. button = MouseButton.LEFT
  463. elif button_state == RIGHTMOST_BUTTON_PRESSED:
  464. button = MouseButton.RIGHT
  465. # Move events.
  466. if event_flags & MOUSE_MOVED:
  467. event_type = MouseEventType.MOUSE_MOVE
  468. # No key pressed anymore: mouse up.
  469. if event_type is None:
  470. if button_state > 0:
  471. # Some button pressed.
  472. event_type = MouseEventType.MOUSE_DOWN
  473. else:
  474. # No button pressed.
  475. event_type = MouseEventType.MOUSE_UP
  476. data = ";".join(
  477. [
  478. button.value,
  479. event_type.value,
  480. str(ev.MousePosition.X),
  481. str(ev.MousePosition.Y),
  482. ]
  483. )
  484. return [KeyPress(Keys.WindowsMouseEvent, data)]
  485. class Vt100ConsoleInputReader:
  486. """
  487. Similar to `ConsoleInputReader`, but for usage when
  488. `ENABLE_VIRTUAL_TERMINAL_INPUT` is enabled. This assumes that Windows sends
  489. us the right vt100 escape sequences and we parse those with our vt100
  490. parser.
  491. (Using this instead of `ConsoleInputReader` results in the "data" attribute
  492. from the `KeyPress` instances to be more correct in edge cases, because
  493. this responds to for instance the terminal being in application cursor keys
  494. mode.)
  495. """
  496. def __init__(self) -> None:
  497. self._fdcon = None
  498. self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
  499. self._vt100_parser = Vt100Parser(
  500. lambda key_press: self._buffer.append(key_press)
  501. )
  502. # When stdin is a tty, use that handle, otherwise, create a handle from
  503. # CONIN$.
  504. self.handle: HANDLE
  505. if sys.stdin.isatty():
  506. self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  507. else:
  508. self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
  509. self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
  510. def close(self) -> None:
  511. "Close fdcon."
  512. if self._fdcon is not None:
  513. os.close(self._fdcon)
  514. def read(self) -> Iterable[KeyPress]:
  515. """
  516. Return a list of `KeyPress` instances. It won't return anything when
  517. there was nothing to read. (This function doesn't block.)
  518. http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
  519. """
  520. max_count = 2048 # Max events to read at the same time.
  521. read = DWORD(0)
  522. arrtype = INPUT_RECORD * max_count
  523. input_records = arrtype()
  524. # Check whether there is some input to read. `ReadConsoleInputW` would
  525. # block otherwise.
  526. # (Actually, the event loop is responsible to make sure that this
  527. # function is only called when there is something to read, but for some
  528. # reason this happened in the asyncio_win32 loop, and it's better to be
  529. # safe anyway.)
  530. if not wait_for_handles([self.handle], timeout=0):
  531. return []
  532. # Get next batch of input event.
  533. windll.kernel32.ReadConsoleInputW(
  534. self.handle, pointer(input_records), max_count, pointer(read)
  535. )
  536. # First, get all the keys from the input buffer, in order to determine
  537. # whether we should consider this a paste event or not.
  538. for key_data in self._get_keys(read, input_records):
  539. self._vt100_parser.feed(key_data)
  540. # Return result.
  541. result = self._buffer
  542. self._buffer = []
  543. return result
  544. def _get_keys(
  545. self, read: DWORD, input_records: Array[INPUT_RECORD]
  546. ) -> Iterator[str]:
  547. """
  548. Generator that yields `KeyPress` objects from the input records.
  549. """
  550. for i in range(read.value):
  551. ir = input_records[i]
  552. # Get the right EventType from the EVENT_RECORD.
  553. # (For some reason the Windows console application 'cmder'
  554. # [http://gooseberrycreative.com/cmder/] can return '0' for
  555. # ir.EventType. -- Just ignore that.)
  556. if ir.EventType in EventTypes:
  557. ev = getattr(ir.Event, EventTypes[ir.EventType])
  558. # Process if this is a key event. (We also have mouse, menu and
  559. # focus events.)
  560. if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
  561. u_char = ev.uChar.UnicodeChar
  562. if u_char != "\x00":
  563. yield u_char
  564. class _Win32Handles:
  565. """
  566. Utility to keep track of which handles are connectod to which callbacks.
  567. `add_win32_handle` starts a tiny event loop in another thread which waits
  568. for the Win32 handle to become ready. When this happens, the callback will
  569. be called in the current asyncio event loop using `call_soon_threadsafe`.
  570. `remove_win32_handle` will stop this tiny event loop.
  571. NOTE: We use this technique, so that we don't have to use the
  572. `ProactorEventLoop` on Windows and we can wait for things like stdin
  573. in a `SelectorEventLoop`. This is important, because our inputhook
  574. mechanism (used by IPython), only works with the `SelectorEventLoop`.
  575. """
  576. def __init__(self) -> None:
  577. self._handle_callbacks: dict[int, Callable[[], None]] = {}
  578. # Windows Events that are triggered when we have to stop watching this
  579. # handle.
  580. self._remove_events: dict[int, HANDLE] = {}
  581. def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None:
  582. """
  583. Add a Win32 handle to the event loop.
  584. """
  585. handle_value = handle.value
  586. if handle_value is None:
  587. raise ValueError("Invalid handle.")
  588. # Make sure to remove a previous registered handler first.
  589. self.remove_win32_handle(handle)
  590. loop = get_running_loop()
  591. self._handle_callbacks[handle_value] = callback
  592. # Create remove event.
  593. remove_event = create_win32_event()
  594. self._remove_events[handle_value] = remove_event
  595. # Add reader.
  596. def ready() -> None:
  597. # Tell the callback that input's ready.
  598. try:
  599. callback()
  600. finally:
  601. run_in_executor_with_context(wait, loop=loop)
  602. # Wait for the input to become ready.
  603. # (Use an executor for this, the Windows asyncio event loop doesn't
  604. # allow us to wait for handles like stdin.)
  605. def wait() -> None:
  606. # Wait until either the handle becomes ready, or the remove event
  607. # has been set.
  608. result = wait_for_handles([remove_event, handle])
  609. if result is remove_event:
  610. windll.kernel32.CloseHandle(remove_event)
  611. return
  612. else:
  613. loop.call_soon_threadsafe(ready)
  614. run_in_executor_with_context(wait, loop=loop)
  615. def remove_win32_handle(self, handle: HANDLE) -> Callable[[], None] | None:
  616. """
  617. Remove a Win32 handle from the event loop.
  618. Return either the registered handler or `None`.
  619. """
  620. if handle.value is None:
  621. return None # Ignore.
  622. # Trigger remove events, so that the reader knows to stop.
  623. try:
  624. event = self._remove_events.pop(handle.value)
  625. except KeyError:
  626. pass
  627. else:
  628. windll.kernel32.SetEvent(event)
  629. try:
  630. return self._handle_callbacks.pop(handle.value)
  631. except KeyError:
  632. return None
  633. @contextmanager
  634. def attach_win32_input(
  635. input: _Win32InputBase, callback: Callable[[], None]
  636. ) -> Iterator[None]:
  637. """
  638. Context manager that makes this input active in the current event loop.
  639. :param input: :class:`~prompt_toolkit.input.Input` object.
  640. :param input_ready_callback: Called when the input is ready to read.
  641. """
  642. win32_handles = input.win32_handles
  643. handle = input.handle
  644. if handle.value is None:
  645. raise ValueError("Invalid handle.")
  646. # Add reader.
  647. previous_callback = win32_handles.remove_win32_handle(handle)
  648. win32_handles.add_win32_handle(handle, callback)
  649. try:
  650. yield
  651. finally:
  652. win32_handles.remove_win32_handle(handle)
  653. if previous_callback:
  654. win32_handles.add_win32_handle(handle, previous_callback)
  655. @contextmanager
  656. def detach_win32_input(input: _Win32InputBase) -> Iterator[None]:
  657. win32_handles = input.win32_handles
  658. handle = input.handle
  659. if handle.value is None:
  660. raise ValueError("Invalid handle.")
  661. previous_callback = win32_handles.remove_win32_handle(handle)
  662. try:
  663. yield
  664. finally:
  665. if previous_callback:
  666. win32_handles.add_win32_handle(handle, previous_callback)
  667. class raw_mode:
  668. """
  669. ::
  670. with raw_mode(stdin):
  671. ''' the windows terminal is now in 'raw' mode. '''
  672. The ``fileno`` attribute is ignored. This is to be compatible with the
  673. `raw_input` method of `.vt100_input`.
  674. """
  675. def __init__(
  676. self, fileno: int | None = None, use_win10_virtual_terminal_input: bool = False
  677. ) -> None:
  678. self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  679. self.use_win10_virtual_terminal_input = use_win10_virtual_terminal_input
  680. def __enter__(self) -> None:
  681. # Remember original mode.
  682. original_mode = DWORD()
  683. windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode))
  684. self.original_mode = original_mode
  685. self._patch()
  686. def _patch(self) -> None:
  687. # Set raw
  688. ENABLE_ECHO_INPUT = 0x0004
  689. ENABLE_LINE_INPUT = 0x0002
  690. ENABLE_PROCESSED_INPUT = 0x0001
  691. new_mode = self.original_mode.value & ~(
  692. ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT
  693. )
  694. if self.use_win10_virtual_terminal_input:
  695. new_mode |= ENABLE_VIRTUAL_TERMINAL_INPUT
  696. windll.kernel32.SetConsoleMode(self.handle, new_mode)
  697. def __exit__(self, *a: object) -> None:
  698. # Restore original mode
  699. windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
  700. class cooked_mode(raw_mode):
  701. """
  702. ::
  703. with cooked_mode(stdin):
  704. ''' The pseudo-terminal stdin is now used in cooked mode. '''
  705. """
  706. def _patch(self) -> None:
  707. # Set cooked.
  708. ENABLE_ECHO_INPUT = 0x0004
  709. ENABLE_LINE_INPUT = 0x0002
  710. ENABLE_PROCESSED_INPUT = 0x0001
  711. windll.kernel32.SetConsoleMode(
  712. self.handle,
  713. self.original_mode.value
  714. | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
  715. )
  716. def _is_win_vt100_input_enabled() -> bool:
  717. """
  718. Returns True when we're running Windows and VT100 escape sequences are
  719. supported.
  720. """
  721. hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
  722. # Get original console mode.
  723. original_mode = DWORD(0)
  724. windll.kernel32.GetConsoleMode(hconsole, byref(original_mode))
  725. try:
  726. # Try to enable VT100 sequences.
  727. result: int = windll.kernel32.SetConsoleMode(
  728. hconsole, DWORD(ENABLE_VIRTUAL_TERMINAL_INPUT)
  729. )
  730. return result == 1
  731. finally:
  732. windll.kernel32.SetConsoleMode(hconsole, original_mode)