123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517 |
- # -*- test-case-name: twisted.conch.test.test_helper -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Partial in-memory terminal emulator
- @author: Jp Calderone
- """
- from __future__ import print_function
- import re, string
- from zope.interface import implementer
- from incremental import Version
- from twisted.internet import defer, protocol, reactor
- from twisted.python import log, _textattributes
- from twisted.python.compat import iterbytes
- from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
- from twisted.conch.insults import insults
- FOREGROUND = 30
- BACKGROUND = 40
- BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
- class _FormattingState(_textattributes._FormattingStateMixin):
- """
- Represents the formatting state/attributes of a single character.
- Character set, intensity, underlinedness, blinkitude, video
- reversal, as well as foreground and background colors made up a
- character's attributes.
- """
- compareAttributes = (
- 'charset', 'bold', 'underline', 'blink', 'reverseVideo', 'foreground',
- 'background', '_subtracting')
- def __init__(self, charset=insults.G0, bold=False, underline=False,
- blink=False, reverseVideo=False, foreground=WHITE,
- background=BLACK, _subtracting=False):
- self.charset = charset
- self.bold = bold
- self.underline = underline
- self.blink = blink
- self.reverseVideo = reverseVideo
- self.foreground = foreground
- self.background = background
- self._subtracting = _subtracting
- @deprecated(Version('Twisted', 13, 1, 0))
- def wantOne(self, **kw):
- """
- Add a character attribute to a copy of this formatting state.
- @param **kw: An optional attribute name and value can be provided with
- a keyword argument.
- @return: A formatting state instance with the new attribute.
- @see: L{DefaultFormattingState._withAttribute}.
- """
- k, v = kw.popitem()
- return self._withAttribute(k, v)
- def toVT102(self):
- # Spit out a vt102 control sequence that will set up
- # all the attributes set here. Except charset.
- attrs = []
- if self._subtracting:
- attrs.append(0)
- if self.bold:
- attrs.append(insults.BOLD)
- if self.underline:
- attrs.append(insults.UNDERLINE)
- if self.blink:
- attrs.append(insults.BLINK)
- if self.reverseVideo:
- attrs.append(insults.REVERSE_VIDEO)
- if self.foreground != WHITE:
- attrs.append(FOREGROUND + self.foreground)
- if self.background != BLACK:
- attrs.append(BACKGROUND + self.background)
- if attrs:
- return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
- return ''
- CharacterAttribute = _FormattingState
- deprecatedModuleAttribute(
- Version('Twisted', 13, 1, 0),
- 'Use twisted.conch.insults.text.assembleFormattedText instead.',
- 'twisted.conch.insults.helper',
- 'CharacterAttribute')
- # XXX - need to support scroll regions and scroll history
- @implementer(insults.ITerminalTransport)
- class TerminalBuffer(protocol.Protocol):
- """
- An in-memory terminal emulator.
- """
- for keyID in (b'UP_ARROW', b'DOWN_ARROW', b'RIGHT_ARROW', b'LEFT_ARROW',
- b'HOME', b'INSERT', b'DELETE', b'END', b'PGUP', b'PGDN',
- b'F1', b'F2', b'F3', b'F4', b'F5', b'F6', b'F7', b'F8', b'F9',
- b'F10', b'F11', b'F12'):
- execBytes = keyID + b" = object()"
- execStr = execBytes.decode("ascii")
- exec(execStr)
- TAB = b'\t'
- BACKSPACE = b'\x7f'
- width = 80
- height = 24
- fill = b' '
- void = object()
- def getCharacter(self, x, y):
- return self.lines[y][x]
- def connectionMade(self):
- self.reset()
- def write(self, data):
- """
- Add the given printable bytes to the terminal.
- Line feeds in L{bytes} will be replaced with carriage return / line
- feed pairs.
- """
- for b in iterbytes(data.replace(b'\n', b'\r\n')):
- self.insertAtCursor(b)
- def _currentFormattingState(self):
- return _FormattingState(self.activeCharset, **self.graphicRendition)
- def insertAtCursor(self, b):
- """
- Add one byte to the terminal at the cursor and make consequent state
- updates.
- If b is a carriage return, move the cursor to the beginning of the
- current row.
- If b is a line feed, move the cursor to the next row or scroll down if
- the cursor is already in the last row.
- Otherwise, if b is printable, put it at the cursor position (inserting
- or overwriting as dictated by the current mode) and move the cursor.
- """
- if b == b'\r':
- self.x = 0
- elif b == b'\n':
- self._scrollDown()
- elif b in string.printable.encode("ascii"):
- if self.x >= self.width:
- self.nextLine()
- ch = (b, self._currentFormattingState())
- if self.modes.get(insults.modes.IRM):
- self.lines[self.y][self.x:self.x] = [ch]
- self.lines[self.y].pop()
- else:
- self.lines[self.y][self.x] = ch
- self.x += 1
- def _emptyLine(self, width):
- return [(self.void, self._currentFormattingState())
- for i in range(width)]
- def _scrollDown(self):
- self.y += 1
- if self.y >= self.height:
- self.y -= 1
- del self.lines[0]
- self.lines.append(self._emptyLine(self.width))
- def _scrollUp(self):
- self.y -= 1
- if self.y < 0:
- self.y = 0
- del self.lines[-1]
- self.lines.insert(0, self._emptyLine(self.width))
- def cursorUp(self, n=1):
- self.y = max(0, self.y - n)
- def cursorDown(self, n=1):
- self.y = min(self.height - 1, self.y + n)
- def cursorBackward(self, n=1):
- self.x = max(0, self.x - n)
- def cursorForward(self, n=1):
- self.x = min(self.width, self.x + n)
- def cursorPosition(self, column, line):
- self.x = column
- self.y = line
- def cursorHome(self):
- self.x = self.home.x
- self.y = self.home.y
- def index(self):
- self._scrollDown()
- def reverseIndex(self):
- self._scrollUp()
- def nextLine(self):
- """
- Update the cursor position attributes and scroll down if appropriate.
- """
- self.x = 0
- self._scrollDown()
- def saveCursor(self):
- self._savedCursor = (self.x, self.y)
- def restoreCursor(self):
- self.x, self.y = self._savedCursor
- del self._savedCursor
- def setModes(self, modes):
- for m in modes:
- self.modes[m] = True
- def resetModes(self, modes):
- for m in modes:
- try:
- del self.modes[m]
- except KeyError:
- pass
- def setPrivateModes(self, modes):
- """
- Enable the given modes.
- Track which modes have been enabled so that the implementations of
- other L{insults.ITerminalTransport} methods can be properly implemented
- to respect these settings.
- @see: L{resetPrivateModes}
- @see: L{insults.ITerminalTransport.setPrivateModes}
- """
- for m in modes:
- self.privateModes[m] = True
- def resetPrivateModes(self, modes):
- """
- Disable the given modes.
- @see: L{setPrivateModes}
- @see: L{insults.ITerminalTransport.resetPrivateModes}
- """
- for m in modes:
- try:
- del self.privateModes[m]
- except KeyError:
- pass
- def applicationKeypadMode(self):
- self.keypadMode = 'app'
- def numericKeypadMode(self):
- self.keypadMode = 'num'
- def selectCharacterSet(self, charSet, which):
- self.charsets[which] = charSet
- def shiftIn(self):
- self.activeCharset = insults.G0
- def shiftOut(self):
- self.activeCharset = insults.G1
- def singleShift2(self):
- oldActiveCharset = self.activeCharset
- self.activeCharset = insults.G2
- f = self.insertAtCursor
- def insertAtCursor(b):
- f(b)
- del self.insertAtCursor
- self.activeCharset = oldActiveCharset
- self.insertAtCursor = insertAtCursor
- def singleShift3(self):
- oldActiveCharset = self.activeCharset
- self.activeCharset = insults.G3
- f = self.insertAtCursor
- def insertAtCursor(b):
- f(b)
- del self.insertAtCursor
- self.activeCharset = oldActiveCharset
- self.insertAtCursor = insertAtCursor
- def selectGraphicRendition(self, *attributes):
- for a in attributes:
- if a == insults.NORMAL:
- self.graphicRendition = {
- 'bold': False,
- 'underline': False,
- 'blink': False,
- 'reverseVideo': False,
- 'foreground': WHITE,
- 'background': BLACK}
- elif a == insults.BOLD:
- self.graphicRendition['bold'] = True
- elif a == insults.UNDERLINE:
- self.graphicRendition['underline'] = True
- elif a == insults.BLINK:
- self.graphicRendition['blink'] = True
- elif a == insults.REVERSE_VIDEO:
- self.graphicRendition['reverseVideo'] = True
- else:
- try:
- v = int(a)
- except ValueError:
- log.msg("Unknown graphic rendition attribute: " + repr(a))
- else:
- if FOREGROUND <= v <= FOREGROUND + N_COLORS:
- self.graphicRendition['foreground'] = v - FOREGROUND
- elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
- self.graphicRendition['background'] = v - BACKGROUND
- else:
- log.msg("Unknown graphic rendition attribute: " + repr(a))
- def eraseLine(self):
- self.lines[self.y] = self._emptyLine(self.width)
- def eraseToLineEnd(self):
- width = self.width - self.x
- self.lines[self.y][self.x:] = self._emptyLine(width)
- def eraseToLineBeginning(self):
- self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
- def eraseDisplay(self):
- self.lines = [self._emptyLine(self.width) for i in range(self.height)]
- def eraseToDisplayEnd(self):
- self.eraseToLineEnd()
- height = self.height - self.y - 1
- self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
- def eraseToDisplayBeginning(self):
- self.eraseToLineBeginning()
- self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
- def deleteCharacter(self, n=1):
- del self.lines[self.y][self.x:self.x+n]
- self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
- def insertLine(self, n=1):
- self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
- del self.lines[self.height:]
- def deleteLine(self, n=1):
- del self.lines[self.y:self.y+n]
- self.lines.extend([self._emptyLine(self.width) for i in range(n)])
- def reportCursorPosition(self):
- return (self.x, self.y)
- def reset(self):
- self.home = insults.Vector(0, 0)
- self.x = self.y = 0
- self.modes = {}
- self.privateModes = {}
- self.setPrivateModes([insults.privateModes.AUTO_WRAP,
- insults.privateModes.CURSOR_MODE])
- self.numericKeypad = 'app'
- self.activeCharset = insults.G0
- self.graphicRendition = {
- 'bold': False,
- 'underline': False,
- 'blink': False,
- 'reverseVideo': False,
- 'foreground': WHITE,
- 'background': BLACK}
- self.charsets = {
- insults.G0: insults.CS_US,
- insults.G1: insults.CS_US,
- insults.G2: insults.CS_ALTERNATE,
- insults.G3: insults.CS_ALTERNATE_SPECIAL}
- self.eraseDisplay()
- def unhandledControlSequence(self, buf):
- print('Could not handle', repr(buf))
- def __bytes__(self):
- lines = []
- for L in self.lines:
- buf = []
- length = 0
- for (ch, attr) in L:
- if ch is not self.void:
- buf.append(ch)
- length = len(buf)
- else:
- buf.append(self.fill)
- lines.append(b''.join(buf[:length]))
- return b'\n'.join(lines)
- class ExpectationTimeout(Exception):
- pass
- class ExpectableBuffer(TerminalBuffer):
- _mark = 0
- def connectionMade(self):
- TerminalBuffer.connectionMade(self)
- self._expecting = []
- def write(self, data):
- TerminalBuffer.write(self, data)
- self._checkExpected()
- def cursorHome(self):
- TerminalBuffer.cursorHome(self)
- self._mark = 0
- def _timeoutExpected(self, d):
- d.errback(ExpectationTimeout())
- self._checkExpected()
- def _checkExpected(self):
- s = self.__bytes__()[self._mark:]
- while self._expecting:
- expr, timer, deferred = self._expecting[0]
- if timer and not timer.active():
- del self._expecting[0]
- continue
- for match in expr.finditer(s):
- if timer:
- timer.cancel()
- del self._expecting[0]
- self._mark += match.end()
- s = s[match.end():]
- deferred.callback(match)
- break
- else:
- return
- def expect(self, expression, timeout=None, scheduler=reactor):
- d = defer.Deferred()
- timer = None
- if timeout:
- timer = scheduler.callLater(timeout, self._timeoutExpected, d)
- self._expecting.append((re.compile(expression), timer, d))
- self._checkExpected()
- return d
- __all__ = [
- 'CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']
|