manhole.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. # -*- test-case-name: twisted.conch.test.test_manhole -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Line-input oriented interactive interpreter loop.
  6. Provides classes for handling Python source input and arbitrary output
  7. interactively from a Twisted application. Also included is syntax coloring
  8. code with support for VT102 terminals, control code handling (^C, ^D, ^Q),
  9. and reasonable handling of Deferreds.
  10. @author: Jp Calderone
  11. """
  12. import code
  13. import sys
  14. import tokenize
  15. from io import BytesIO
  16. from traceback import format_exception
  17. from types import TracebackType
  18. from typing import Type
  19. from twisted.conch import recvline
  20. from twisted.internet import defer
  21. from twisted.python.htmlizer import TokenPrinter
  22. from twisted.python.monkey import MonkeyPatcher
  23. class FileWrapper:
  24. """
  25. Minimal write-file-like object.
  26. Writes are translated into addOutput calls on an object passed to
  27. __init__. Newlines are also converted from network to local style.
  28. """
  29. softspace = 0
  30. state = "normal"
  31. def __init__(self, o):
  32. self.o = o
  33. def flush(self):
  34. pass
  35. def write(self, data):
  36. self.o.addOutput(data.replace("\r\n", "\n"))
  37. def writelines(self, lines):
  38. self.write("".join(lines))
  39. class ManholeInterpreter(code.InteractiveInterpreter):
  40. """
  41. Interactive Interpreter with special output and Deferred support.
  42. Aside from the features provided by L{code.InteractiveInterpreter}, this
  43. class captures sys.stdout output and redirects it to the appropriate
  44. location (the Manhole protocol instance). It also treats Deferreds
  45. which reach the top-level specially: each is formatted to the user with
  46. a unique identifier and a new callback and errback added to it, each of
  47. which will format the unique identifier and the result with which the
  48. Deferred fires and then pass it on to the next participant in the
  49. callback chain.
  50. """
  51. numDeferreds = 0
  52. def __init__(self, handler, locals=None, filename="<console>"):
  53. code.InteractiveInterpreter.__init__(self, locals)
  54. self._pendingDeferreds = {}
  55. self.handler = handler
  56. self.filename = filename
  57. self.resetBuffer()
  58. self.monkeyPatcher = MonkeyPatcher()
  59. self.monkeyPatcher.addPatch(sys, "displayhook", self.displayhook)
  60. self.monkeyPatcher.addPatch(sys, "excepthook", self.excepthook)
  61. self.monkeyPatcher.addPatch(sys, "stdout", FileWrapper(self.handler))
  62. def resetBuffer(self):
  63. """
  64. Reset the input buffer.
  65. """
  66. self.buffer = []
  67. def push(self, line):
  68. """
  69. Push a line to the interpreter.
  70. The line should not have a trailing newline; it may have
  71. internal newlines. The line is appended to a buffer and the
  72. interpreter's runsource() method is called with the
  73. concatenated contents of the buffer as source. If this
  74. indicates that the command was executed or invalid, the buffer
  75. is reset; otherwise, the command is incomplete, and the buffer
  76. is left as it was after the line was appended. The return
  77. value is 1 if more input is required, 0 if the line was dealt
  78. with in some way (this is the same as runsource()).
  79. @param line: line of text
  80. @type line: L{bytes}
  81. @return: L{bool} from L{code.InteractiveInterpreter.runsource}
  82. """
  83. self.buffer.append(line)
  84. source = b"\n".join(self.buffer)
  85. source = source.decode("utf-8")
  86. more = self.runsource(source, self.filename)
  87. if not more:
  88. self.resetBuffer()
  89. return more
  90. def runcode(self, *a, **kw):
  91. with self.monkeyPatcher:
  92. code.InteractiveInterpreter.runcode(self, *a, **kw)
  93. def excepthook(
  94. self,
  95. excType: Type[BaseException],
  96. excValue: BaseException,
  97. excTraceback: TracebackType,
  98. ) -> None:
  99. """
  100. Format exception tracebacks and write them to the output handler.
  101. """
  102. code_obj = excTraceback.tb_frame.f_code
  103. if code_obj.co_filename == code.__file__ and code_obj.co_name == "runcode":
  104. traceback = excTraceback.tb_next
  105. else:
  106. # Workaround for https://github.com/python/cpython/issues/122478,
  107. # present e.g. in Python 3.12.6:
  108. traceback = excTraceback
  109. lines = format_exception(excType, excValue, traceback)
  110. self.write("".join(lines))
  111. def displayhook(self, obj):
  112. self.locals["_"] = obj
  113. if isinstance(obj, defer.Deferred):
  114. # XXX Ick, where is my "hasFired()" interface?
  115. if hasattr(obj, "result"):
  116. self.write(repr(obj))
  117. elif id(obj) in self._pendingDeferreds:
  118. self.write("<Deferred #%d>" % (self._pendingDeferreds[id(obj)][0],))
  119. else:
  120. d = self._pendingDeferreds
  121. k = self.numDeferreds
  122. d[id(obj)] = (k, obj)
  123. self.numDeferreds += 1
  124. obj.addCallbacks(
  125. self._cbDisplayDeferred,
  126. self._ebDisplayDeferred,
  127. callbackArgs=(k, obj),
  128. errbackArgs=(k, obj),
  129. )
  130. self.write("<Deferred #%d>" % (k,))
  131. elif obj is not None:
  132. self.write(repr(obj))
  133. def _cbDisplayDeferred(self, result, k, obj):
  134. self.write("Deferred #%d called back: %r" % (k, result), True)
  135. del self._pendingDeferreds[id(obj)]
  136. return result
  137. def _ebDisplayDeferred(self, failure, k, obj):
  138. self.write("Deferred #%d failed: %r" % (k, failure.getErrorMessage()), True)
  139. del self._pendingDeferreds[id(obj)]
  140. return failure
  141. def write(self, data, isAsync=None):
  142. self.handler.addOutput(data, isAsync)
  143. CTRL_C = b"\x03"
  144. CTRL_D = b"\x04"
  145. CTRL_BACKSLASH = b"\x1c"
  146. CTRL_L = b"\x0c"
  147. CTRL_A = b"\x01"
  148. CTRL_E = b"\x05"
  149. class Manhole(recvline.HistoricRecvLine):
  150. r"""
  151. Mediator between a fancy line source and an interactive interpreter.
  152. This accepts lines from its transport and passes them on to a
  153. L{ManholeInterpreter}. Control commands (^C, ^D, ^\) are also handled
  154. with something approximating their normal terminal-mode behavior. It
  155. can optionally be constructed with a dict which will be used as the
  156. local namespace for any code executed.
  157. """
  158. namespace = None
  159. def __init__(self, namespace=None):
  160. recvline.HistoricRecvLine.__init__(self)
  161. if namespace is not None:
  162. self.namespace = namespace.copy()
  163. def connectionMade(self):
  164. recvline.HistoricRecvLine.connectionMade(self)
  165. self.interpreter = ManholeInterpreter(self, self.namespace)
  166. self.keyHandlers[CTRL_C] = self.handle_INT
  167. self.keyHandlers[CTRL_D] = self.handle_EOF
  168. self.keyHandlers[CTRL_L] = self.handle_FF
  169. self.keyHandlers[CTRL_A] = self.handle_HOME
  170. self.keyHandlers[CTRL_E] = self.handle_END
  171. self.keyHandlers[CTRL_BACKSLASH] = self.handle_QUIT
  172. def handle_INT(self):
  173. """
  174. Handle ^C as an interrupt keystroke by resetting the current input
  175. variables to their initial state.
  176. """
  177. self.pn = 0
  178. self.lineBuffer = []
  179. self.lineBufferIndex = 0
  180. self.interpreter.resetBuffer()
  181. self.terminal.nextLine()
  182. self.terminal.write(b"KeyboardInterrupt")
  183. self.terminal.nextLine()
  184. self.terminal.write(self.ps[self.pn])
  185. def handle_EOF(self):
  186. if self.lineBuffer:
  187. self.terminal.write(b"\a")
  188. else:
  189. self.handle_QUIT()
  190. def handle_FF(self):
  191. """
  192. Handle a 'form feed' byte - generally used to request a screen
  193. refresh/redraw.
  194. """
  195. self.terminal.eraseDisplay()
  196. self.terminal.cursorHome()
  197. self.drawInputLine()
  198. def handle_QUIT(self):
  199. self.terminal.loseConnection()
  200. def _needsNewline(self):
  201. w = self.terminal.lastWrite
  202. return not w.endswith(b"\n") and not w.endswith(b"\x1bE")
  203. def addOutput(self, data, isAsync=None):
  204. if isAsync:
  205. self.terminal.eraseLine()
  206. self.terminal.cursorBackward(len(self.lineBuffer) + len(self.ps[self.pn]))
  207. self.terminal.write(data)
  208. if isAsync:
  209. if self._needsNewline():
  210. self.terminal.nextLine()
  211. self.terminal.write(self.ps[self.pn])
  212. if self.lineBuffer:
  213. oldBuffer = self.lineBuffer
  214. self.lineBuffer = []
  215. self.lineBufferIndex = 0
  216. self._deliverBuffer(oldBuffer)
  217. def lineReceived(self, line):
  218. more = self.interpreter.push(line)
  219. self.pn = bool(more)
  220. if self._needsNewline():
  221. self.terminal.nextLine()
  222. self.terminal.write(self.ps[self.pn])
  223. class VT102Writer:
  224. """
  225. Colorizer for Python tokens.
  226. A series of tokens are written to instances of this object. Each is
  227. colored in a particular way. The final line of the result of this is
  228. generally added to the output.
  229. """
  230. typeToColor = {
  231. "identifier": b"\x1b[31m",
  232. "keyword": b"\x1b[32m",
  233. "parameter": b"\x1b[33m",
  234. "variable": b"\x1b[1;33m",
  235. "string": b"\x1b[35m",
  236. "number": b"\x1b[36m",
  237. "op": b"\x1b[37m",
  238. }
  239. normalColor = b"\x1b[0m"
  240. def __init__(self):
  241. self.written = []
  242. def color(self, type):
  243. r = self.typeToColor.get(type, b"")
  244. return r
  245. def write(self, token, type=None):
  246. if token and token != b"\r":
  247. c = self.color(type)
  248. if c:
  249. self.written.append(c)
  250. self.written.append(token)
  251. if c:
  252. self.written.append(self.normalColor)
  253. def __bytes__(self):
  254. s = b"".join(self.written)
  255. return s.strip(b"\n").splitlines()[-1]
  256. def lastColorizedLine(source):
  257. """
  258. Tokenize and colorize the given Python source.
  259. Returns a VT102-format colorized version of the last line of C{source}.
  260. @param source: Python source code
  261. @type source: L{str} or L{bytes}
  262. @return: L{bytes} of colorized source
  263. """
  264. if not isinstance(source, bytes):
  265. source = source.encode("utf-8")
  266. w = VT102Writer()
  267. p = TokenPrinter(w.write).printtoken
  268. s = BytesIO(source)
  269. for token in tokenize.tokenize(s.readline):
  270. (tokenType, string, start, end, line) = token
  271. p(tokenType, string, start, end, line)
  272. return bytes(w)
  273. class ColoredManhole(Manhole):
  274. """
  275. A REPL which syntax colors input as users type it.
  276. """
  277. def getSource(self):
  278. """
  279. Return a string containing the currently entered source.
  280. This is only the code which will be considered for execution
  281. next.
  282. """
  283. return b"\n".join(self.interpreter.buffer) + b"\n" + b"".join(self.lineBuffer)
  284. def characterReceived(self, ch, moreCharactersComing):
  285. if self.mode == "insert":
  286. self.lineBuffer.insert(self.lineBufferIndex, ch)
  287. else:
  288. self.lineBuffer[self.lineBufferIndex : self.lineBufferIndex + 1] = [ch]
  289. self.lineBufferIndex += 1
  290. if moreCharactersComing:
  291. # Skip it all, we'll get called with another character in
  292. # like 2 femtoseconds.
  293. return
  294. if ch == b" ":
  295. # Don't bother to try to color whitespace
  296. self.terminal.write(ch)
  297. return
  298. source = self.getSource()
  299. # Try to write some junk
  300. try:
  301. coloredLine = lastColorizedLine(source)
  302. except tokenize.TokenError:
  303. # We couldn't do it. Strange. Oh well, just add the character.
  304. self.terminal.write(ch)
  305. else:
  306. # Success! Clear the source on this line.
  307. self.terminal.eraseLine()
  308. self.terminal.cursorBackward(
  309. len(self.lineBuffer) + len(self.ps[self.pn]) - 1
  310. )
  311. # And write a new, colorized one.
  312. self.terminal.write(self.ps[self.pn] + coloredLine)
  313. # And move the cursor to where it belongs
  314. n = len(self.lineBuffer) - self.lineBufferIndex
  315. if n:
  316. self.terminal.cursorBackward(n)