patch_stdout.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. """
  2. patch_stdout
  3. ============
  4. This implements a context manager that ensures that print statements within
  5. it won't destroy the user interface. The context manager will replace
  6. `sys.stdout` by something that draws the output above the current prompt,
  7. rather than overwriting the UI.
  8. Usage::
  9. with patch_stdout(application):
  10. ...
  11. application.run()
  12. ...
  13. Multiple applications can run in the body of the context manager, one after the
  14. other.
  15. """
  16. from __future__ import annotations
  17. import asyncio
  18. import queue
  19. import sys
  20. import threading
  21. import time
  22. from contextlib import contextmanager
  23. from typing import Generator, TextIO, cast
  24. from .application import get_app_session, run_in_terminal
  25. from .output import Output
  26. __all__ = [
  27. "patch_stdout",
  28. "StdoutProxy",
  29. ]
  30. @contextmanager
  31. def patch_stdout(raw: bool = False) -> Generator[None, None, None]:
  32. """
  33. Replace `sys.stdout` by an :class:`_StdoutProxy` instance.
  34. Writing to this proxy will make sure that the text appears above the
  35. prompt, and that it doesn't destroy the output from the renderer. If no
  36. application is curring, the behavior should be identical to writing to
  37. `sys.stdout` directly.
  38. Warning: If a new event loop is installed using `asyncio.set_event_loop()`,
  39. then make sure that the context manager is applied after the event loop
  40. is changed. Printing to stdout will be scheduled in the event loop
  41. that's active when the context manager is created.
  42. :param raw: (`bool`) When True, vt100 terminal escape sequences are not
  43. removed/escaped.
  44. """
  45. with StdoutProxy(raw=raw) as proxy:
  46. original_stdout = sys.stdout
  47. original_stderr = sys.stderr
  48. # Enter.
  49. sys.stdout = cast(TextIO, proxy)
  50. sys.stderr = cast(TextIO, proxy)
  51. try:
  52. yield
  53. finally:
  54. sys.stdout = original_stdout
  55. sys.stderr = original_stderr
  56. class _Done:
  57. "Sentinel value for stopping the stdout proxy."
  58. class StdoutProxy:
  59. """
  60. File-like object, which prints everything written to it, output above the
  61. current application/prompt. This class is compatible with other file
  62. objects and can be used as a drop-in replacement for `sys.stdout` or can
  63. for instance be passed to `logging.StreamHandler`.
  64. The current application, above which we print, is determined by looking
  65. what application currently runs in the `AppSession` that is active during
  66. the creation of this instance.
  67. This class can be used as a context manager.
  68. In order to avoid having to repaint the prompt continuously for every
  69. little write, a short delay of `sleep_between_writes` seconds will be added
  70. between writes in order to bundle many smaller writes in a short timespan.
  71. """
  72. def __init__(
  73. self,
  74. sleep_between_writes: float = 0.2,
  75. raw: bool = False,
  76. ) -> None:
  77. self.sleep_between_writes = sleep_between_writes
  78. self.raw = raw
  79. self._lock = threading.RLock()
  80. self._buffer: list[str] = []
  81. # Keep track of the curret app session.
  82. self.app_session = get_app_session()
  83. # See what output is active *right now*. We should do it at this point,
  84. # before this `StdoutProxy` instance is possibly assigned to `sys.stdout`.
  85. # Otherwise, if `patch_stdout` is used, and no `Output` instance has
  86. # been created, then the default output creation code will see this
  87. # proxy object as `sys.stdout`, and get in a recursive loop trying to
  88. # access `StdoutProxy.isatty()` which will again retrieve the output.
  89. self._output: Output = self.app_session.output
  90. # Flush thread
  91. self._flush_queue: queue.Queue[str | _Done] = queue.Queue()
  92. self._flush_thread = self._start_write_thread()
  93. self.closed = False
  94. def __enter__(self) -> StdoutProxy:
  95. return self
  96. def __exit__(self, *args: object) -> None:
  97. self.close()
  98. def close(self) -> None:
  99. """
  100. Stop `StdoutProxy` proxy.
  101. This will terminate the write thread, make sure everything is flushed
  102. and wait for the write thread to finish.
  103. """
  104. if not self.closed:
  105. self._flush_queue.put(_Done())
  106. self._flush_thread.join()
  107. self.closed = True
  108. def _start_write_thread(self) -> threading.Thread:
  109. thread = threading.Thread(
  110. target=self._write_thread,
  111. name="patch-stdout-flush-thread",
  112. daemon=True,
  113. )
  114. thread.start()
  115. return thread
  116. def _write_thread(self) -> None:
  117. done = False
  118. while not done:
  119. item = self._flush_queue.get()
  120. if isinstance(item, _Done):
  121. break
  122. # Don't bother calling when we got an empty string.
  123. if not item:
  124. continue
  125. text = []
  126. text.append(item)
  127. # Read the rest of the queue if more data was queued up.
  128. while True:
  129. try:
  130. item = self._flush_queue.get_nowait()
  131. except queue.Empty:
  132. break
  133. else:
  134. if isinstance(item, _Done):
  135. done = True
  136. else:
  137. text.append(item)
  138. app_loop = self._get_app_loop()
  139. self._write_and_flush(app_loop, "".join(text))
  140. # If an application was running that requires repainting, then wait
  141. # for a very short time, in order to bundle actual writes and avoid
  142. # having to repaint to often.
  143. if app_loop is not None:
  144. time.sleep(self.sleep_between_writes)
  145. def _get_app_loop(self) -> asyncio.AbstractEventLoop | None:
  146. """
  147. Return the event loop for the application currently running in our
  148. `AppSession`.
  149. """
  150. app = self.app_session.app
  151. if app is None:
  152. return None
  153. return app.loop
  154. def _write_and_flush(
  155. self, loop: asyncio.AbstractEventLoop | None, text: str
  156. ) -> None:
  157. """
  158. Write the given text to stdout and flush.
  159. If an application is running, use `run_in_terminal`.
  160. """
  161. def write_and_flush() -> None:
  162. # Ensure that autowrap is enabled before calling `write`.
  163. # XXX: On Windows, the `Windows10_Output` enables/disables VT
  164. # terminal processing for every flush. It turns out that this
  165. # causes autowrap to be reset (disabled) after each flush. So,
  166. # we have to enable it again before writing text.
  167. self._output.enable_autowrap()
  168. if self.raw:
  169. self._output.write_raw(text)
  170. else:
  171. self._output.write(text)
  172. self._output.flush()
  173. def write_and_flush_in_loop() -> None:
  174. # If an application is running, use `run_in_terminal`, otherwise
  175. # call it directly.
  176. run_in_terminal(write_and_flush, in_executor=False)
  177. if loop is None:
  178. # No loop, write immediately.
  179. write_and_flush()
  180. else:
  181. # Make sure `write_and_flush` is executed *in* the event loop, not
  182. # in another thread.
  183. loop.call_soon_threadsafe(write_and_flush_in_loop)
  184. def _write(self, data: str) -> None:
  185. """
  186. Note: print()-statements cause to multiple write calls.
  187. (write('line') and write('\n')). Of course we don't want to call
  188. `run_in_terminal` for every individual call, because that's too
  189. expensive, and as long as the newline hasn't been written, the
  190. text itself is again overwritten by the rendering of the input
  191. command line. Therefor, we have a little buffer which holds the
  192. text until a newline is written to stdout.
  193. """
  194. if "\n" in data:
  195. # When there is a newline in the data, write everything before the
  196. # newline, including the newline itself.
  197. before, after = data.rsplit("\n", 1)
  198. to_write = self._buffer + [before, "\n"]
  199. self._buffer = [after]
  200. text = "".join(to_write)
  201. self._flush_queue.put(text)
  202. else:
  203. # Otherwise, cache in buffer.
  204. self._buffer.append(data)
  205. def _flush(self) -> None:
  206. text = "".join(self._buffer)
  207. self._buffer = []
  208. self._flush_queue.put(text)
  209. def write(self, data: str) -> int:
  210. with self._lock:
  211. self._write(data)
  212. return len(data) # Pretend everything was written.
  213. def flush(self) -> None:
  214. """
  215. Flush buffered output.
  216. """
  217. with self._lock:
  218. self._flush()
  219. @property
  220. def original_stdout(self) -> TextIO:
  221. return self._output.stdout or sys.__stdout__
  222. # Attributes for compatibility with sys.__stdout__:
  223. def fileno(self) -> int:
  224. return self._output.fileno()
  225. def isatty(self) -> bool:
  226. stdout = self._output.stdout
  227. if stdout is None:
  228. return False
  229. return stdout.isatty()
  230. @property
  231. def encoding(self) -> str:
  232. return self._output.encoding()
  233. @property
  234. def errors(self) -> str:
  235. return "strict"