123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- """
- patch_stdout
- ============
- This implements a context manager that ensures that print statements within
- it won't destroy the user interface. The context manager will replace
- `sys.stdout` by something that draws the output above the current prompt,
- rather than overwriting the UI.
- Usage::
- with patch_stdout(application):
- ...
- application.run()
- ...
- Multiple applications can run in the body of the context manager, one after the
- other.
- """
- from __future__ import annotations
- import asyncio
- import queue
- import sys
- import threading
- import time
- from contextlib import contextmanager
- from typing import Generator, TextIO, cast
- from .application import get_app_session, run_in_terminal
- from .output import Output
- __all__ = [
- "patch_stdout",
- "StdoutProxy",
- ]
- @contextmanager
- def patch_stdout(raw: bool = False) -> Generator[None, None, None]:
- """
- Replace `sys.stdout` by an :class:`_StdoutProxy` instance.
- Writing to this proxy will make sure that the text appears above the
- prompt, and that it doesn't destroy the output from the renderer. If no
- application is curring, the behavior should be identical to writing to
- `sys.stdout` directly.
- Warning: If a new event loop is installed using `asyncio.set_event_loop()`,
- then make sure that the context manager is applied after the event loop
- is changed. Printing to stdout will be scheduled in the event loop
- that's active when the context manager is created.
- :param raw: (`bool`) When True, vt100 terminal escape sequences are not
- removed/escaped.
- """
- with StdoutProxy(raw=raw) as proxy:
- original_stdout = sys.stdout
- original_stderr = sys.stderr
- # Enter.
- sys.stdout = cast(TextIO, proxy)
- sys.stderr = cast(TextIO, proxy)
- try:
- yield
- finally:
- sys.stdout = original_stdout
- sys.stderr = original_stderr
- class _Done:
- "Sentinel value for stopping the stdout proxy."
- class StdoutProxy:
- """
- File-like object, which prints everything written to it, output above the
- current application/prompt. This class is compatible with other file
- objects and can be used as a drop-in replacement for `sys.stdout` or can
- for instance be passed to `logging.StreamHandler`.
- The current application, above which we print, is determined by looking
- what application currently runs in the `AppSession` that is active during
- the creation of this instance.
- This class can be used as a context manager.
- In order to avoid having to repaint the prompt continuously for every
- little write, a short delay of `sleep_between_writes` seconds will be added
- between writes in order to bundle many smaller writes in a short timespan.
- """
- def __init__(
- self,
- sleep_between_writes: float = 0.2,
- raw: bool = False,
- ) -> None:
- self.sleep_between_writes = sleep_between_writes
- self.raw = raw
- self._lock = threading.RLock()
- self._buffer: list[str] = []
- # Keep track of the curret app session.
- self.app_session = get_app_session()
- # See what output is active *right now*. We should do it at this point,
- # before this `StdoutProxy` instance is possibly assigned to `sys.stdout`.
- # Otherwise, if `patch_stdout` is used, and no `Output` instance has
- # been created, then the default output creation code will see this
- # proxy object as `sys.stdout`, and get in a recursive loop trying to
- # access `StdoutProxy.isatty()` which will again retrieve the output.
- self._output: Output = self.app_session.output
- # Flush thread
- self._flush_queue: queue.Queue[str | _Done] = queue.Queue()
- self._flush_thread = self._start_write_thread()
- self.closed = False
- def __enter__(self) -> StdoutProxy:
- return self
- def __exit__(self, *args: object) -> None:
- self.close()
- def close(self) -> None:
- """
- Stop `StdoutProxy` proxy.
- This will terminate the write thread, make sure everything is flushed
- and wait for the write thread to finish.
- """
- if not self.closed:
- self._flush_queue.put(_Done())
- self._flush_thread.join()
- self.closed = True
- def _start_write_thread(self) -> threading.Thread:
- thread = threading.Thread(
- target=self._write_thread,
- name="patch-stdout-flush-thread",
- daemon=True,
- )
- thread.start()
- return thread
- def _write_thread(self) -> None:
- done = False
- while not done:
- item = self._flush_queue.get()
- if isinstance(item, _Done):
- break
- # Don't bother calling when we got an empty string.
- if not item:
- continue
- text = []
- text.append(item)
- # Read the rest of the queue if more data was queued up.
- while True:
- try:
- item = self._flush_queue.get_nowait()
- except queue.Empty:
- break
- else:
- if isinstance(item, _Done):
- done = True
- else:
- text.append(item)
- app_loop = self._get_app_loop()
- self._write_and_flush(app_loop, "".join(text))
- # If an application was running that requires repainting, then wait
- # for a very short time, in order to bundle actual writes and avoid
- # having to repaint to often.
- if app_loop is not None:
- time.sleep(self.sleep_between_writes)
- def _get_app_loop(self) -> asyncio.AbstractEventLoop | None:
- """
- Return the event loop for the application currently running in our
- `AppSession`.
- """
- app = self.app_session.app
- if app is None:
- return None
- return app.loop
- def _write_and_flush(
- self, loop: asyncio.AbstractEventLoop | None, text: str
- ) -> None:
- """
- Write the given text to stdout and flush.
- If an application is running, use `run_in_terminal`.
- """
- def write_and_flush() -> None:
- # Ensure that autowrap is enabled before calling `write`.
- # XXX: On Windows, the `Windows10_Output` enables/disables VT
- # terminal processing for every flush. It turns out that this
- # causes autowrap to be reset (disabled) after each flush. So,
- # we have to enable it again before writing text.
- self._output.enable_autowrap()
- if self.raw:
- self._output.write_raw(text)
- else:
- self._output.write(text)
- self._output.flush()
- def write_and_flush_in_loop() -> None:
- # If an application is running, use `run_in_terminal`, otherwise
- # call it directly.
- run_in_terminal(write_and_flush, in_executor=False)
- if loop is None:
- # No loop, write immediately.
- write_and_flush()
- else:
- # Make sure `write_and_flush` is executed *in* the event loop, not
- # in another thread.
- loop.call_soon_threadsafe(write_and_flush_in_loop)
- def _write(self, data: str) -> None:
- """
- Note: print()-statements cause to multiple write calls.
- (write('line') and write('\n')). Of course we don't want to call
- `run_in_terminal` for every individual call, because that's too
- expensive, and as long as the newline hasn't been written, the
- text itself is again overwritten by the rendering of the input
- command line. Therefor, we have a little buffer which holds the
- text until a newline is written to stdout.
- """
- if "\n" in data:
- # When there is a newline in the data, write everything before the
- # newline, including the newline itself.
- before, after = data.rsplit("\n", 1)
- to_write = self._buffer + [before, "\n"]
- self._buffer = [after]
- text = "".join(to_write)
- self._flush_queue.put(text)
- else:
- # Otherwise, cache in buffer.
- self._buffer.append(data)
- def _flush(self) -> None:
- text = "".join(self._buffer)
- self._buffer = []
- self._flush_queue.put(text)
- def write(self, data: str) -> int:
- with self._lock:
- self._write(data)
- return len(data) # Pretend everything was written.
- def flush(self) -> None:
- """
- Flush buffered output.
- """
- with self._lock:
- self._flush()
- @property
- def original_stdout(self) -> TextIO:
- return self._output.stdout or sys.__stdout__
- # Attributes for compatibility with sys.__stdout__:
- def fileno(self) -> int:
- return self._output.fileno()
- def isatty(self) -> bool:
- stdout = self._output.stdout
- if stdout is None:
- return False
- return stdout.isatty()
- @property
- def encoding(self) -> str:
- return self._output.encoding()
- @property
- def errors(self) -> str:
- return "strict"
|