streams.py 8.8 KB


  1. import io
  2. import sys
  3. import time
  4. from ctypes import byref, c_ulong
  5. from .buffer import get_buffer
  6. from .info import WINDOWS, PY2
  7. if PY2:
  8. from .file_object import FileObject
  9. if WINDOWS:
  10. from ctypes import WinDLL, get_last_error, set_last_error, WinError
  11. from msvcrt import get_osfhandle
  12. kernel32 = WinDLL("kernel32", use_last_error=True)
  13. ReadConsoleW = kernel32.ReadConsoleW
  14. WriteConsoleW = kernel32.WriteConsoleW
  15. GetConsoleMode = kernel32.GetConsoleMode
  16. ERROR_SUCCESS = 0
  17. ERROR_INVALID_HANDLE = 6
  18. ERROR_NOT_ENOUGH_MEMORY = 8
  19. ERROR_OPERATION_ABORTED = 995
  20. EOF = b"\x1a"
  21. MAX_BYTES_WRITTEN = 32767 # arbitrary because WriteConsoleW ability to write big buffers depends on heap usage
  22. class StandardStreamInfo:
  23. def __init__(self, name, standard_fileno):
  24. self.name = name
  25. self.fileno = standard_fileno
  26. self.handle = get_osfhandle(standard_fileno) if WINDOWS else None
  27. def __repr__(self):
  28. return "<{} '{}' fileno={} handle={}>".format(self.__class__.__name__, self.name, self.fileno, self.handle)
  29. @property
  30. def stream(self):
  31. return getattr(sys, self.name)
  32. def is_a_TTY(self):
  33. # the test used in input()
  34. try:
  35. get_fileno = self.stream.fileno
  36. except AttributeError: # e.g. StringIO in Python 2
  37. return False
  38. try:
  39. fileno = get_fileno()
  40. except io.UnsupportedOperation:
  41. return False
  42. else:
  43. return fileno == self.fileno and self.stream.isatty()
  44. def is_a_console(self):
  45. if self.handle is None:
  46. return False
  47. if GetConsoleMode(self.handle, byref(c_ulong())):
  48. return True
  49. else:
  50. last_error = get_last_error()
  51. if last_error == ERROR_INVALID_HANDLE:
  52. return False
  53. else:
  54. raise WinError(last_error)
  55. def should_be_fixed(self):
  56. if self.stream is None: # e.g. with IDLE
  57. return True
  58. return self.is_a_TTY() and self.is_a_console()
  59. STDIN = StandardStreamInfo("stdin", standard_fileno=0)
  60. STDOUT = StandardStreamInfo("stdout", standard_fileno=1)
  61. STDERR = StandardStreamInfo("stderr", standard_fileno=2)
  62. class _ReprMixin:
  63. def __repr__(self):
  64. modname = self.__class__.__module__
  65. if PY2:
  66. clsname = self.__class__.__name__
  67. else:
  68. clsname = self.__class__.__qualname__
  69. attributes = []
  70. for name in ["name", "encoding"]:
  71. try:
  72. value = getattr(self, name)
  73. except AttributeError:
  74. pass
  75. else:
  76. attributes.append("{}={}".format(name, repr(value)))
  77. return "<{}.{} {}>".format(modname, clsname, " ".join(attributes))
  78. class WindowsConsoleRawIOBase(_ReprMixin, io.RawIOBase):
  79. def __init__(self, name, handle, fileno):
  80. self.name = name
  81. self.handle = handle
  82. self.file_no = fileno
  83. def fileno(self):
  84. return self.file_no
  85. def isatty(self):
  86. # PY3 # super().isatty() # for close check in default implementation
  87. super(WindowsConsoleRawIOBase, self).isatty()
  88. return True
  89. class WindowsConsoleRawReader(WindowsConsoleRawIOBase):
  90. def readable(self):
  91. return True
  92. def readinto(self, b):
  93. bytes_to_be_read = len(b)
  94. if not bytes_to_be_read:
  95. return 0
  96. elif bytes_to_be_read % 2:
  97. raise ValueError("cannot read odd number of bytes from UTF-16-LE encoded console")
  98. buffer = get_buffer(b, writable=True)
  99. code_units_to_be_read = bytes_to_be_read // 2
  100. code_units_read = c_ulong()
  101. set_last_error(ERROR_SUCCESS)
  102. ReadConsoleW(self.handle, buffer, code_units_to_be_read, byref(code_units_read), None)
  103. last_error = get_last_error()
  104. if last_error == ERROR_OPERATION_ABORTED:
  105. time.sleep(0.1) # wait for KeyboardInterrupt
  106. if last_error != ERROR_SUCCESS:
  107. raise WinError(last_error)
  108. if buffer[0] == EOF:
  109. return 0
  110. else:
  111. return 2 * code_units_read.value # bytes read
  112. class WindowsConsoleRawWriter(WindowsConsoleRawIOBase):
  113. def writable(self):
  114. return True
  115. def write(self, b):
  116. bytes_to_be_written = len(b)
  117. buffer = get_buffer(b)
  118. code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2
  119. code_units_written = c_ulong()
  120. if code_units_to_be_written == 0 != bytes_to_be_written:
  121. raise ValueError("two-byte code units expected, just one byte given")
  122. if not WriteConsoleW(self.handle, buffer, code_units_to_be_written, byref(code_units_written), None):
  123. exc = WinError(get_last_error())
  124. if exc.winerror == ERROR_NOT_ENOUGH_MEMORY:
  125. exc.strerror += " Try to lower `win_unicode_console.streams.MAX_BYTES_WRITTEN`."
  126. raise exc
  127. return 2 * code_units_written.value # bytes written
  128. class _TextStreamWrapperMixin(_ReprMixin):
  129. def __init__(self, base):
  130. self.base = base
  131. @property
  132. def encoding(self):
  133. return self.base.encoding
  134. @property
  135. def errors(self):
  136. return self.base.errors
  137. @property
  138. def line_buffering(self):
  139. return self.base.line_buffering
  140. def seekable(self):
  141. return self.base.seekable()
  142. def readable(self):
  143. return self.base.readable()
  144. def writable(self):
  145. return self.base.writable()
  146. def flush(self):
  147. self.base.flush()
  148. def close(self):
  149. self.base.close()
  150. @property
  151. def closed(self):
  152. return self.base.closed
  153. @property
  154. def name(self):
  155. return self.base.name
  156. def fileno(self):
  157. return self.base.fileno()
  158. def isatty(self):
  159. return self.base.isatty()
  160. def write(self, s):
  161. return self.base.write(s)
  162. def tell(self):
  163. return self.base.tell()
  164. def truncate(self, pos=None):
  165. return self.base.truncate(pos)
  166. def seek(self, cookie, whence=0):
  167. return self.base.seek(cookie, whence)
  168. def read(self, size=None):
  169. return self.base.read(size)
  170. def __next__(self):
  171. return next(self.base)
  172. def readline(self, size=-1):
  173. return self.base.readline(size)
  174. @property
  175. def newlines(self):
  176. return self.base.newlines
  177. class TextStreamWrapper(_TextStreamWrapperMixin, io.TextIOBase):
  178. pass
  179. class TextTranscodingWrapper(TextStreamWrapper):
  180. encoding = None # disable the descriptor
  181. def __init__(self, base, encoding):
  182. # PY3 # super().__init__(base)
  183. super(TextTranscodingWrapper, self).__init__(base)
  184. self.encoding = encoding
  185. class StrStreamWrapper(TextStreamWrapper):
  186. def write(self, s):
  187. if isinstance(s, bytes):
  188. s = s.decode(self.encoding)
  189. self.base.write(s)
  190. if PY2:
  191. class FileobjWrapper(_TextStreamWrapperMixin, file):
  192. def __init__(self, base, f):
  193. super(FileobjWrapper, self).__init__(base)
  194. fileobj = self._fileobj = FileObject.from_file(self)
  195. fileobj.set_encoding(base.encoding)
  196. fileobj.copy_file_pointer(f)
  197. fileobj.readable = base.readable()
  198. fileobj.writable = base.writable()
  199. # needed for the right interpretation of unicode literals in interactive mode when win_unicode_console is enabled in sitecustomize since Py_Initialize changes encoding afterwards
  200. def _reset_encoding(self):
  201. self._fileobj.set_encoding(self.base.encoding)
  202. def readline(self, size=-1):
  203. self._reset_encoding()
  204. return self.base.readline(size)
  205. if WINDOWS:
  206. stdin_raw = WindowsConsoleRawReader("<stdin>", STDIN.handle, STDIN.fileno)
  207. stdout_raw = WindowsConsoleRawWriter("<stdout>", STDOUT.handle, STDOUT.fileno)
  208. stderr_raw = WindowsConsoleRawWriter("<stderr>", STDERR.handle, STDERR.fileno)
  209. stdin_text = io.TextIOWrapper(io.BufferedReader(stdin_raw), encoding="utf-16-le", line_buffering=True)
  210. stdout_text = io.TextIOWrapper(io.BufferedWriter(stdout_raw), encoding="utf-16-le", line_buffering=True)
  211. stderr_text = io.TextIOWrapper(io.BufferedWriter(stderr_raw), encoding="utf-16-le", line_buffering=True)
  212. stdin_text_transcoded = TextTranscodingWrapper(stdin_text, encoding="utf-8")
  213. stdout_text_transcoded = TextTranscodingWrapper(stdout_text, encoding="utf-8")
  214. stderr_text_transcoded = TextTranscodingWrapper(stderr_text, encoding="utf-8")
  215. stdout_text_str = StrStreamWrapper(stdout_text_transcoded)
  216. stderr_text_str = StrStreamWrapper(stderr_text_transcoded)
  217. if PY2:
  218. stdin_text_fileobj = FileobjWrapper(stdin_text_transcoded, sys.__stdin__)
  219. stdout_text_str_fileobj = FileobjWrapper(stdout_text_str, sys.__stdout__)
  220. def disable():
  221. sys.stdin.flush()
  222. sys.stdout.flush()
  223. sys.stderr.flush()
  224. sys.stdin = sys.__stdin__
  225. sys.stdout = sys.__stdout__
  226. sys.stderr = sys.__stderr__
  227. # PY3 # def enable(*, stdin=Ellipsis, stdout=Ellipsis, stderr=Ellipsis):
  228. def enable(stdin=Ellipsis, stdout=Ellipsis, stderr=Ellipsis):
  229. if not WINDOWS:
  230. return
  231. # defaults
  232. if PY2:
  233. if stdin is Ellipsis:
  234. stdin = stdin_text_fileobj
  235. if stdout is Ellipsis:
  236. stdout = stdout_text_str
  237. if stderr is Ellipsis:
  238. stderr = stderr_text_str
  239. else: # transcoding because Python tokenizer cannot handle UTF-16
  240. if stdin is Ellipsis:
  241. stdin = stdin_text_transcoded
  242. if stdout is Ellipsis:
  243. stdout = stdout_text_transcoded
  244. if stderr is Ellipsis:
  245. stderr = stderr_text_transcoded
  246. if stdin is not None and STDIN.should_be_fixed():
  247. sys.stdin = stdin
  248. if stdout is not None and STDOUT.should_be_fixed():
  249. sys.stdout.flush()
  250. sys.stdout = stdout
  251. if stderr is not None and STDERR.should_be_fixed():
  252. sys.stderr.flush()
  253. sys.stderr = stderr
  254. # PY3 # def enable_only(*, stdin=None, stdout=None, stderr=None):
  255. def enable_only(stdin=None, stdout=None, stderr=None):
  256. enable(stdin=stdin, stdout=stdout, stderr=stderr)