console.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. # -*- coding: utf-8 -*-
  2. """
  3. werkzeug.debug.console
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. Interactive console support.
  6. :copyright: 2007 Pallets
  7. :license: BSD-3-Clause
  8. """
  9. import code
  10. import sys
  11. from types import CodeType
  12. from ..local import Local
  13. from ..utils import escape
  14. from .repr import debug_repr
  15. from .repr import dump
  16. from .repr import helper
  17. _local = Local()
  18. class HTMLStringO(object):
  19. """A StringO version that HTML escapes on write."""
  20. def __init__(self):
  21. self._buffer = []
  22. def isatty(self):
  23. return False
  24. def close(self):
  25. pass
  26. def flush(self):
  27. pass
  28. def seek(self, n, mode=0):
  29. pass
  30. def readline(self):
  31. if len(self._buffer) == 0:
  32. return ""
  33. ret = self._buffer[0]
  34. del self._buffer[0]
  35. return ret
  36. def reset(self):
  37. val = "".join(self._buffer)
  38. del self._buffer[:]
  39. return val
  40. def _write(self, x):
  41. if isinstance(x, bytes):
  42. x = x.decode("utf-8", "replace")
  43. self._buffer.append(x)
  44. def write(self, x):
  45. self._write(escape(x))
  46. def writelines(self, x):
  47. self._write(escape("".join(x)))
  48. class ThreadedStream(object):
  49. """Thread-local wrapper for sys.stdout for the interactive console."""
  50. @staticmethod
  51. def push():
  52. if not isinstance(sys.stdout, ThreadedStream):
  53. sys.stdout = ThreadedStream()
  54. _local.stream = HTMLStringO()
  55. @staticmethod
  56. def fetch():
  57. try:
  58. stream = _local.stream
  59. except AttributeError:
  60. return ""
  61. return stream.reset()
  62. @staticmethod
  63. def displayhook(obj):
  64. try:
  65. stream = _local.stream
  66. except AttributeError:
  67. return _displayhook(obj)
  68. # stream._write bypasses escaping as debug_repr is
  69. # already generating HTML for us.
  70. if obj is not None:
  71. _local._current_ipy.locals["_"] = obj
  72. stream._write(debug_repr(obj))
  73. def __setattr__(self, name, value):
  74. raise AttributeError("read only attribute %s" % name)
  75. def __dir__(self):
  76. return dir(sys.__stdout__)
  77. def __getattribute__(self, name):
  78. if name == "__members__":
  79. return dir(sys.__stdout__)
  80. try:
  81. stream = _local.stream
  82. except AttributeError:
  83. stream = sys.__stdout__
  84. return getattr(stream, name)
  85. def __repr__(self):
  86. return repr(sys.__stdout__)
  87. # add the threaded stream as display hook
  88. _displayhook = sys.displayhook
  89. sys.displayhook = ThreadedStream.displayhook
  90. class _ConsoleLoader(object):
  91. def __init__(self):
  92. self._storage = {}
  93. def register(self, code, source):
  94. self._storage[id(code)] = source
  95. # register code objects of wrapped functions too.
  96. for var in code.co_consts:
  97. if isinstance(var, CodeType):
  98. self._storage[id(var)] = source
  99. def get_source_by_code(self, code):
  100. try:
  101. return self._storage[id(code)]
  102. except KeyError:
  103. pass
  104. def _wrap_compiler(console):
  105. compile = console.compile
  106. def func(source, filename, symbol):
  107. code = compile(source, filename, symbol)
  108. console.loader.register(code, source)
  109. return code
  110. console.compile = func
  111. class _InteractiveConsole(code.InteractiveInterpreter):
  112. def __init__(self, globals, locals):
  113. code.InteractiveInterpreter.__init__(self, locals)
  114. self.globals = dict(globals)
  115. self.globals["dump"] = dump
  116. self.globals["help"] = helper
  117. self.globals["__loader__"] = self.loader = _ConsoleLoader()
  118. self.more = False
  119. self.buffer = []
  120. _wrap_compiler(self)
  121. def runsource(self, source):
  122. source = source.rstrip() + "\n"
  123. ThreadedStream.push()
  124. prompt = "... " if self.more else ">>> "
  125. try:
  126. source_to_eval = "".join(self.buffer + [source])
  127. if code.InteractiveInterpreter.runsource(
  128. self, source_to_eval, "<debugger>", "single"
  129. ):
  130. self.more = True
  131. self.buffer.append(source)
  132. else:
  133. self.more = False
  134. del self.buffer[:]
  135. finally:
  136. output = ThreadedStream.fetch()
  137. return prompt + escape(source) + output
  138. def runcode(self, code):
  139. try:
  140. eval(code, self.globals, self.locals)
  141. except Exception:
  142. self.showtraceback()
  143. def showtraceback(self):
  144. from .tbtools import get_current_traceback
  145. tb = get_current_traceback(skip=1)
  146. sys.stdout._write(tb.render_summary())
  147. def showsyntaxerror(self, filename=None):
  148. from .tbtools import get_current_traceback
  149. tb = get_current_traceback(skip=4)
  150. sys.stdout._write(tb.render_summary())
  151. def write(self, data):
  152. sys.stdout.write(data)
  153. class Console(object):
  154. """An interactive console."""
  155. def __init__(self, globals=None, locals=None):
  156. if locals is None:
  157. locals = {}
  158. if globals is None:
  159. globals = {}
  160. self._ipy = _InteractiveConsole(globals, locals)
  161. def eval(self, code):
  162. _local._current_ipy = self._ipy
  163. old_sys_stdout = sys.stdout
  164. try:
  165. return self._ipy.runsource(code)
  166. finally:
  167. sys.stdout = old_sys_stdout