helper.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. # -*- test-case-name: twisted.conch.test.test_helper -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Partial in-memory terminal emulator
  6. @author: Jp Calderone
  7. """
  8. from __future__ import print_function
  9. import re, string
  10. from zope.interface import implementer
  11. from incremental import Version
  12. from twisted.internet import defer, protocol, reactor
  13. from twisted.python import log, _textattributes
  14. from twisted.python.compat import iterbytes
  15. from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
  16. from twisted.conch.insults import insults
  17. FOREGROUND = 30
  18. BACKGROUND = 40
  19. BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
  20. class _FormattingState(_textattributes._FormattingStateMixin):
  21. """
  22. Represents the formatting state/attributes of a single character.
  23. Character set, intensity, underlinedness, blinkitude, video
  24. reversal, as well as foreground and background colors made up a
  25. character's attributes.
  26. """
  27. compareAttributes = (
  28. 'charset', 'bold', 'underline', 'blink', 'reverseVideo', 'foreground',
  29. 'background', '_subtracting')
  30. def __init__(self, charset=insults.G0, bold=False, underline=False,
  31. blink=False, reverseVideo=False, foreground=WHITE,
  32. background=BLACK, _subtracting=False):
  33. self.charset = charset
  34. self.bold = bold
  35. self.underline = underline
  36. self.blink = blink
  37. self.reverseVideo = reverseVideo
  38. self.foreground = foreground
  39. self.background = background
  40. self._subtracting = _subtracting
  41. @deprecated(Version('Twisted', 13, 1, 0))
  42. def wantOne(self, **kw):
  43. """
  44. Add a character attribute to a copy of this formatting state.
  45. @param **kw: An optional attribute name and value can be provided with
  46. a keyword argument.
  47. @return: A formatting state instance with the new attribute.
  48. @see: L{DefaultFormattingState._withAttribute}.
  49. """
  50. k, v = kw.popitem()
  51. return self._withAttribute(k, v)
  52. def toVT102(self):
  53. # Spit out a vt102 control sequence that will set up
  54. # all the attributes set here. Except charset.
  55. attrs = []
  56. if self._subtracting:
  57. attrs.append(0)
  58. if self.bold:
  59. attrs.append(insults.BOLD)
  60. if self.underline:
  61. attrs.append(insults.UNDERLINE)
  62. if self.blink:
  63. attrs.append(insults.BLINK)
  64. if self.reverseVideo:
  65. attrs.append(insults.REVERSE_VIDEO)
  66. if self.foreground != WHITE:
  67. attrs.append(FOREGROUND + self.foreground)
  68. if self.background != BLACK:
  69. attrs.append(BACKGROUND + self.background)
  70. if attrs:
  71. return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
  72. return ''
  73. CharacterAttribute = _FormattingState
  74. deprecatedModuleAttribute(
  75. Version('Twisted', 13, 1, 0),
  76. 'Use twisted.conch.insults.text.assembleFormattedText instead.',
  77. 'twisted.conch.insults.helper',
  78. 'CharacterAttribute')
  79. # XXX - need to support scroll regions and scroll history
  80. @implementer(insults.ITerminalTransport)
  81. class TerminalBuffer(protocol.Protocol):
  82. """
  83. An in-memory terminal emulator.
  84. """
  85. for keyID in (b'UP_ARROW', b'DOWN_ARROW', b'RIGHT_ARROW', b'LEFT_ARROW',
  86. b'HOME', b'INSERT', b'DELETE', b'END', b'PGUP', b'PGDN',
  87. b'F1', b'F2', b'F3', b'F4', b'F5', b'F6', b'F7', b'F8', b'F9',
  88. b'F10', b'F11', b'F12'):
  89. execBytes = keyID + b" = object()"
  90. execStr = execBytes.decode("ascii")
  91. exec(execStr)
  92. TAB = b'\t'
  93. BACKSPACE = b'\x7f'
  94. width = 80
  95. height = 24
  96. fill = b' '
  97. void = object()
  98. def getCharacter(self, x, y):
  99. return self.lines[y][x]
  100. def connectionMade(self):
  101. self.reset()
  102. def write(self, data):
  103. """
  104. Add the given printable bytes to the terminal.
  105. Line feeds in L{bytes} will be replaced with carriage return / line
  106. feed pairs.
  107. """
  108. for b in iterbytes(data.replace(b'\n', b'\r\n')):
  109. self.insertAtCursor(b)
  110. def _currentFormattingState(self):
  111. return _FormattingState(self.activeCharset, **self.graphicRendition)
  112. def insertAtCursor(self, b):
  113. """
  114. Add one byte to the terminal at the cursor and make consequent state
  115. updates.
  116. If b is a carriage return, move the cursor to the beginning of the
  117. current row.
  118. If b is a line feed, move the cursor to the next row or scroll down if
  119. the cursor is already in the last row.
  120. Otherwise, if b is printable, put it at the cursor position (inserting
  121. or overwriting as dictated by the current mode) and move the cursor.
  122. """
  123. if b == b'\r':
  124. self.x = 0
  125. elif b == b'\n':
  126. self._scrollDown()
  127. elif b in string.printable.encode("ascii"):
  128. if self.x >= self.width:
  129. self.nextLine()
  130. ch = (b, self._currentFormattingState())
  131. if self.modes.get(insults.modes.IRM):
  132. self.lines[self.y][self.x:self.x] = [ch]
  133. self.lines[self.y].pop()
  134. else:
  135. self.lines[self.y][self.x] = ch
  136. self.x += 1
  137. def _emptyLine(self, width):
  138. return [(self.void, self._currentFormattingState())
  139. for i in range(width)]
  140. def _scrollDown(self):
  141. self.y += 1
  142. if self.y >= self.height:
  143. self.y -= 1
  144. del self.lines[0]
  145. self.lines.append(self._emptyLine(self.width))
  146. def _scrollUp(self):
  147. self.y -= 1
  148. if self.y < 0:
  149. self.y = 0
  150. del self.lines[-1]
  151. self.lines.insert(0, self._emptyLine(self.width))
  152. def cursorUp(self, n=1):
  153. self.y = max(0, self.y - n)
  154. def cursorDown(self, n=1):
  155. self.y = min(self.height - 1, self.y + n)
  156. def cursorBackward(self, n=1):
  157. self.x = max(0, self.x - n)
  158. def cursorForward(self, n=1):
  159. self.x = min(self.width, self.x + n)
  160. def cursorPosition(self, column, line):
  161. self.x = column
  162. self.y = line
  163. def cursorHome(self):
  164. self.x = self.home.x
  165. self.y = self.home.y
  166. def index(self):
  167. self._scrollDown()
  168. def reverseIndex(self):
  169. self._scrollUp()
  170. def nextLine(self):
  171. """
  172. Update the cursor position attributes and scroll down if appropriate.
  173. """
  174. self.x = 0
  175. self._scrollDown()
  176. def saveCursor(self):
  177. self._savedCursor = (self.x, self.y)
  178. def restoreCursor(self):
  179. self.x, self.y = self._savedCursor
  180. del self._savedCursor
  181. def setModes(self, modes):
  182. for m in modes:
  183. self.modes[m] = True
  184. def resetModes(self, modes):
  185. for m in modes:
  186. try:
  187. del self.modes[m]
  188. except KeyError:
  189. pass
  190. def setPrivateModes(self, modes):
  191. """
  192. Enable the given modes.
  193. Track which modes have been enabled so that the implementations of
  194. other L{insults.ITerminalTransport} methods can be properly implemented
  195. to respect these settings.
  196. @see: L{resetPrivateModes}
  197. @see: L{insults.ITerminalTransport.setPrivateModes}
  198. """
  199. for m in modes:
  200. self.privateModes[m] = True
  201. def resetPrivateModes(self, modes):
  202. """
  203. Disable the given modes.
  204. @see: L{setPrivateModes}
  205. @see: L{insults.ITerminalTransport.resetPrivateModes}
  206. """
  207. for m in modes:
  208. try:
  209. del self.privateModes[m]
  210. except KeyError:
  211. pass
  212. def applicationKeypadMode(self):
  213. self.keypadMode = 'app'
  214. def numericKeypadMode(self):
  215. self.keypadMode = 'num'
  216. def selectCharacterSet(self, charSet, which):
  217. self.charsets[which] = charSet
  218. def shiftIn(self):
  219. self.activeCharset = insults.G0
  220. def shiftOut(self):
  221. self.activeCharset = insults.G1
  222. def singleShift2(self):
  223. oldActiveCharset = self.activeCharset
  224. self.activeCharset = insults.G2
  225. f = self.insertAtCursor
  226. def insertAtCursor(b):
  227. f(b)
  228. del self.insertAtCursor
  229. self.activeCharset = oldActiveCharset
  230. self.insertAtCursor = insertAtCursor
  231. def singleShift3(self):
  232. oldActiveCharset = self.activeCharset
  233. self.activeCharset = insults.G3
  234. f = self.insertAtCursor
  235. def insertAtCursor(b):
  236. f(b)
  237. del self.insertAtCursor
  238. self.activeCharset = oldActiveCharset
  239. self.insertAtCursor = insertAtCursor
  240. def selectGraphicRendition(self, *attributes):
  241. for a in attributes:
  242. if a == insults.NORMAL:
  243. self.graphicRendition = {
  244. 'bold': False,
  245. 'underline': False,
  246. 'blink': False,
  247. 'reverseVideo': False,
  248. 'foreground': WHITE,
  249. 'background': BLACK}
  250. elif a == insults.BOLD:
  251. self.graphicRendition['bold'] = True
  252. elif a == insults.UNDERLINE:
  253. self.graphicRendition['underline'] = True
  254. elif a == insults.BLINK:
  255. self.graphicRendition['blink'] = True
  256. elif a == insults.REVERSE_VIDEO:
  257. self.graphicRendition['reverseVideo'] = True
  258. else:
  259. try:
  260. v = int(a)
  261. except ValueError:
  262. log.msg("Unknown graphic rendition attribute: " + repr(a))
  263. else:
  264. if FOREGROUND <= v <= FOREGROUND + N_COLORS:
  265. self.graphicRendition['foreground'] = v - FOREGROUND
  266. elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
  267. self.graphicRendition['background'] = v - BACKGROUND
  268. else:
  269. log.msg("Unknown graphic rendition attribute: " + repr(a))
  270. def eraseLine(self):
  271. self.lines[self.y] = self._emptyLine(self.width)
  272. def eraseToLineEnd(self):
  273. width = self.width - self.x
  274. self.lines[self.y][self.x:] = self._emptyLine(width)
  275. def eraseToLineBeginning(self):
  276. self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
  277. def eraseDisplay(self):
  278. self.lines = [self._emptyLine(self.width) for i in range(self.height)]
  279. def eraseToDisplayEnd(self):
  280. self.eraseToLineEnd()
  281. height = self.height - self.y - 1
  282. self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
  283. def eraseToDisplayBeginning(self):
  284. self.eraseToLineBeginning()
  285. self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
  286. def deleteCharacter(self, n=1):
  287. del self.lines[self.y][self.x:self.x+n]
  288. self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
  289. def insertLine(self, n=1):
  290. self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
  291. del self.lines[self.height:]
  292. def deleteLine(self, n=1):
  293. del self.lines[self.y:self.y+n]
  294. self.lines.extend([self._emptyLine(self.width) for i in range(n)])
  295. def reportCursorPosition(self):
  296. return (self.x, self.y)
  297. def reset(self):
  298. self.home = insults.Vector(0, 0)
  299. self.x = self.y = 0
  300. self.modes = {}
  301. self.privateModes = {}
  302. self.setPrivateModes([insults.privateModes.AUTO_WRAP,
  303. insults.privateModes.CURSOR_MODE])
  304. self.numericKeypad = 'app'
  305. self.activeCharset = insults.G0
  306. self.graphicRendition = {
  307. 'bold': False,
  308. 'underline': False,
  309. 'blink': False,
  310. 'reverseVideo': False,
  311. 'foreground': WHITE,
  312. 'background': BLACK}
  313. self.charsets = {
  314. insults.G0: insults.CS_US,
  315. insults.G1: insults.CS_US,
  316. insults.G2: insults.CS_ALTERNATE,
  317. insults.G3: insults.CS_ALTERNATE_SPECIAL}
  318. self.eraseDisplay()
  319. def unhandledControlSequence(self, buf):
  320. print('Could not handle', repr(buf))
  321. def __bytes__(self):
  322. lines = []
  323. for L in self.lines:
  324. buf = []
  325. length = 0
  326. for (ch, attr) in L:
  327. if ch is not self.void:
  328. buf.append(ch)
  329. length = len(buf)
  330. else:
  331. buf.append(self.fill)
  332. lines.append(b''.join(buf[:length]))
  333. return b'\n'.join(lines)
  334. class ExpectationTimeout(Exception):
  335. pass
  336. class ExpectableBuffer(TerminalBuffer):
  337. _mark = 0
  338. def connectionMade(self):
  339. TerminalBuffer.connectionMade(self)
  340. self._expecting = []
  341. def write(self, data):
  342. TerminalBuffer.write(self, data)
  343. self._checkExpected()
  344. def cursorHome(self):
  345. TerminalBuffer.cursorHome(self)
  346. self._mark = 0
  347. def _timeoutExpected(self, d):
  348. d.errback(ExpectationTimeout())
  349. self._checkExpected()
  350. def _checkExpected(self):
  351. s = self.__bytes__()[self._mark:]
  352. while self._expecting:
  353. expr, timer, deferred = self._expecting[0]
  354. if timer and not timer.active():
  355. del self._expecting[0]
  356. continue
  357. for match in expr.finditer(s):
  358. if timer:
  359. timer.cancel()
  360. del self._expecting[0]
  361. self._mark += match.end()
  362. s = s[match.end():]
  363. deferred.callback(match)
  364. break
  365. else:
  366. return
  367. def expect(self, expression, timeout=None, scheduler=reactor):
  368. d = defer.Deferred()
  369. timer = None
  370. if timeout:
  371. timer = scheduler.callLater(timeout, self._timeoutExpected, d)
  372. self._expecting.append((re.compile(expression), timer, d))
  373. self._checkExpected()
  374. return d
  375. __all__ = [
  376. 'CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']