12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289 |
- # -*- test-case-name: twisted.conch.test.test_insults -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- VT102 and VT220 terminal manipulation.
- @author: Jp Calderone
- """
- from zope.interface import implementer, Interface
- from twisted.internet import protocol, defer, interfaces as iinternet
- from twisted.python.compat import intToBytes, iterbytes, networkString
- class ITerminalProtocol(Interface):
- def makeConnection(transport):
- """
- Called with an L{ITerminalTransport} when a connection is established.
- """
- def keystrokeReceived(keyID, modifier):
- """
- A keystroke was received.
- Each keystroke corresponds to one invocation of this method.
- keyID is a string identifier for that key. Printable characters
- are represented by themselves. Control keys, such as arrows and
- function keys, are represented with symbolic constants on
- L{ServerProtocol}.
- """
- def terminalSize(width, height):
- """
- Called to indicate the size of the terminal.
- A terminal of 80x24 should be assumed if this method is not
- called. This method might not be called for real terminals.
- """
- def unhandledControlSequence(seq):
- """
- Called when an unsupported control sequence is received.
- @type seq: L{str}
- @param seq: The whole control sequence which could not be interpreted.
- """
- def connectionLost(reason):
- """
- Called when the connection has been lost.
- reason is a Failure describing why.
- """
- @implementer(ITerminalProtocol)
- class TerminalProtocol(object):
- def makeConnection(self, terminal):
- # assert ITerminalTransport.providedBy(transport), "TerminalProtocol.makeConnection must be passed an ITerminalTransport implementor"
- self.terminal = terminal
- self.connectionMade()
- def connectionMade(self):
- """
- Called after a connection has been established.
- """
- def keystrokeReceived(self, keyID, modifier):
- pass
- def terminalSize(self, width, height):
- pass
- def unhandledControlSequence(self, seq):
- pass
- def connectionLost(self, reason):
- pass
- class ITerminalTransport(iinternet.ITransport):
- def cursorUp(n=1):
- """
- Move the cursor up n lines.
- """
- def cursorDown(n=1):
- """
- Move the cursor down n lines.
- """
- def cursorForward(n=1):
- """
- Move the cursor right n columns.
- """
- def cursorBackward(n=1):
- """
- Move the cursor left n columns.
- """
- def cursorPosition(column, line):
- """
- Move the cursor to the given line and column.
- """
- def cursorHome():
- """
- Move the cursor home.
- """
- def index():
- """
- Move the cursor down one line, performing scrolling if necessary.
- """
- def reverseIndex():
- """
- Move the cursor up one line, performing scrolling if necessary.
- """
- def nextLine():
- """
- Move the cursor to the first position on the next line, performing scrolling if necessary.
- """
- def saveCursor():
- """
- Save the cursor position, character attribute, character set, and origin mode selection.
- """
- def restoreCursor():
- """
- Restore the previously saved cursor position, character attribute, character set, and origin mode selection.
- If no cursor state was previously saved, move the cursor to the home position.
- """
- def setModes(modes):
- """
- Set the given modes on the terminal.
- """
- def resetModes(mode):
- """
- Reset the given modes on the terminal.
- """
- def setPrivateModes(modes):
- """
- Set the given DEC private modes on the terminal.
- """
- def resetPrivateModes(modes):
- """
- Reset the given DEC private modes on the terminal.
- """
- def applicationKeypadMode():
- """
- Cause keypad to generate control functions.
- Cursor key mode selects the type of characters generated by cursor keys.
- """
- def numericKeypadMode():
- """
- Cause keypad to generate normal characters.
- """
- def selectCharacterSet(charSet, which):
- """
- Select a character set.
- charSet should be one of CS_US, CS_UK, CS_DRAWING, CS_ALTERNATE, or
- CS_ALTERNATE_SPECIAL.
- which should be one of G0 or G1.
- """
- def shiftIn():
- """
- Activate the G0 character set.
- """
- def shiftOut():
- """
- Activate the G1 character set.
- """
- def singleShift2():
- """
- Shift to the G2 character set for a single character.
- """
- def singleShift3():
- """
- Shift to the G3 character set for a single character.
- """
- def selectGraphicRendition(*attributes):
- """
- Enabled one or more character attributes.
- Arguments should be one or more of UNDERLINE, REVERSE_VIDEO, BLINK, or BOLD.
- NORMAL may also be specified to disable all character attributes.
- """
- def horizontalTabulationSet():
- """
- Set a tab stop at the current cursor position.
- """
- def tabulationClear():
- """
- Clear the tab stop at the current cursor position.
- """
- def tabulationClearAll():
- """
- Clear all tab stops.
- """
- def doubleHeightLine(top=True):
- """
- Make the current line the top or bottom half of a double-height, double-width line.
- If top is True, the current line is the top half. Otherwise, it is the bottom half.
- """
- def singleWidthLine():
- """
- Make the current line a single-width, single-height line.
- """
- def doubleWidthLine():
- """
- Make the current line a double-width line.
- """
- def eraseToLineEnd():
- """
- Erase from the cursor to the end of line, including cursor position.
- """
- def eraseToLineBeginning():
- """
- Erase from the cursor to the beginning of the line, including the cursor position.
- """
- def eraseLine():
- """
- Erase the entire cursor line.
- """
- def eraseToDisplayEnd():
- """
- Erase from the cursor to the end of the display, including the cursor position.
- """
- def eraseToDisplayBeginning():
- """
- Erase from the cursor to the beginning of the display, including the cursor position.
- """
- def eraseDisplay():
- """
- Erase the entire display.
- """
- def deleteCharacter(n=1):
- """
- Delete n characters starting at the cursor position.
- Characters to the right of deleted characters are shifted to the left.
- """
- def insertLine(n=1):
- """
- Insert n lines at the cursor position.
- Lines below the cursor are shifted down. Lines moved past the bottom margin are lost.
- This command is ignored when the cursor is outside the scroll region.
- """
- def deleteLine(n=1):
- """
- Delete n lines starting at the cursor position.
- Lines below the cursor are shifted up. This command is ignored when the cursor is outside
- the scroll region.
- """
- def reportCursorPosition():
- """
- Return a Deferred that fires with a two-tuple of (x, y) indicating the cursor position.
- """
- def reset():
- """
- Reset the terminal to its initial state.
- """
- def unhandledControlSequence(seq):
- """
- Called when an unsupported control sequence is received.
- @type seq: L{str}
- @param seq: The whole control sequence which could not be interpreted.
- """
- CSI = b'\x1b'
- CST = {b'~': b'tilde'}
- class modes:
- """
- ECMA 48 standardized modes
- """
- # BREAKS YOPUR KEYBOARD MOFO
- KEYBOARD_ACTION = KAM = 2
- # When set, enables character insertion. New display characters
- # move old display characters to the right. Characters moved past
- # the right margin are lost.
- # When reset, enables replacement mode (disables character
- # insertion). New display characters replace old display
- # characters at cursor position. The old character is erased.
- INSERTION_REPLACEMENT = IRM = 4
- # Set causes a received linefeed, form feed, or vertical tab to
- # move cursor to first column of next line. RETURN transmits both
- # a carriage return and linefeed. This selection is also called
- # new line option.
- # Reset causes a received linefeed, form feed, or vertical tab to
- # move cursor to next line in current column. RETURN transmits a
- # carriage return.
- LINEFEED_NEWLINE = LNM = 20
- class privateModes:
- """
- ANSI-Compatible Private Modes
- """
- ERROR = 0
- CURSOR_KEY = 1
- ANSI_VT52 = 2
- COLUMN = 3
- SCROLL = 4
- SCREEN = 5
- ORIGIN = 6
- AUTO_WRAP = 7
- AUTO_REPEAT = 8
- PRINTER_FORM_FEED = 18
- PRINTER_EXTENT = 19
- # Toggle cursor visibility (reset hides it)
- CURSOR_MODE = 25
- # Character sets
- CS_US = b'CS_US'
- CS_UK = b'CS_UK'
- CS_DRAWING = b'CS_DRAWING'
- CS_ALTERNATE = b'CS_ALTERNATE'
- CS_ALTERNATE_SPECIAL = b'CS_ALTERNATE_SPECIAL'
- # Groupings (or something?? These are like variables that can be bound to character sets)
- G0 = b'G0'
- G1 = b'G1'
- # G2 and G3 cannot be changed, but they can be shifted to.
- G2 = b'G2'
- G3 = b'G3'
- # Character attributes
- NORMAL = 0
- BOLD = 1
- UNDERLINE = 4
- BLINK = 5
- REVERSE_VIDEO = 7
- class Vector:
- def __init__(self, x, y):
- self.x = x
- self.y = y
- def log(s):
- with open('log', 'a') as f:
- f.write(str(s) + '\n')
- # XXX TODO - These attributes are really part of the
- # ITerminalTransport interface, I think.
- _KEY_NAMES = ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
- 'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', 'NUMPAD_MIDDLE',
- 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
- 'F10', 'F11', 'F12',
- 'ALT', 'SHIFT', 'CONTROL')
- class _const(object):
- """
- @ivar name: A string naming this constant
- """
- def __init__(self, name):
- self.name = name
- def __repr__(self):
- return '[' + self.name + ']'
- def __bytes__(self):
- return ('[' + self.name + ']').encode("ascii")
- FUNCTION_KEYS = [
- _const(_name).__bytes__() for _name in _KEY_NAMES]
- @implementer(ITerminalTransport)
- class ServerProtocol(protocol.Protocol):
- protocolFactory = None
- terminalProtocol = None
- TAB = b'\t'
- BACKSPACE = b'\x7f'
- ##
- lastWrite = b''
- state = b'data'
- termSize = Vector(80, 24)
- cursorPos = Vector(0, 0)
- scrollRegion = None
- # Factory who instantiated me
- factory = None
- def __init__(self, protocolFactory=None, *a, **kw):
- """
- @param protocolFactory: A callable which will be invoked with
- *a, **kw and should return an ITerminalProtocol implementor.
- This will be invoked when a connection to this ServerProtocol
- is established.
- @param a: Any positional arguments to pass to protocolFactory.
- @param kw: Any keyword arguments to pass to protocolFactory.
- """
- # assert protocolFactory is None or ITerminalProtocol.implementedBy(protocolFactory), "ServerProtocol.__init__ must be passed an ITerminalProtocol implementor"
- if protocolFactory is not None:
- self.protocolFactory = protocolFactory
- self.protocolArgs = a
- self.protocolKwArgs = kw
- self._cursorReports = []
- def connectionMade(self):
- if self.protocolFactory is not None:
- self.terminalProtocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
- try:
- factory = self.factory
- except AttributeError:
- pass
- else:
- self.terminalProtocol.factory = factory
- self.terminalProtocol.makeConnection(self)
- def dataReceived(self, data):
- for ch in iterbytes(data):
- if self.state == b'data':
- if ch == b'\x1b':
- self.state = b'escaped'
- else:
- self.terminalProtocol.keystrokeReceived(ch, None)
- elif self.state == b'escaped':
- if ch == b'[':
- self.state = b'bracket-escaped'
- self.escBuf = []
- elif ch == b'O':
- self.state = b'low-function-escaped'
- else:
- self.state = b'data'
- self._handleShortControlSequence(ch)
- elif self.state == b'bracket-escaped':
- if ch == b'O':
- self.state = b'low-function-escaped'
- elif ch.isalpha() or ch == b'~':
- self._handleControlSequence(b''.join(self.escBuf) + ch)
- del self.escBuf
- self.state = b'data'
- else:
- self.escBuf.append(ch)
- elif self.state == b'low-function-escaped':
- self._handleLowFunctionControlSequence(ch)
- self.state = b'data'
- else:
- raise ValueError("Illegal state")
- def _handleShortControlSequence(self, ch):
- self.terminalProtocol.keystrokeReceived(ch, self.ALT)
- def _handleControlSequence(self, buf):
- buf = b'\x1b[' + buf
- f = getattr(self.controlSequenceParser,
- CST.get(buf[-1:], buf[-1:]).decode("ascii"),
- None)
- if f is None:
- self.unhandledControlSequence(buf)
- else:
- f(self, self.terminalProtocol, buf[:-1])
- def unhandledControlSequence(self, buf):
- self.terminalProtocol.unhandledControlSequence(buf)
- def _handleLowFunctionControlSequence(self, ch):
- functionKeys = {b'P': self.F1, b'Q': self.F2,
- b'R': self.F3, b'S': self.F4}
- keyID = functionKeys.get(ch)
- if keyID is not None:
- self.terminalProtocol.keystrokeReceived(keyID, None)
- else:
- self.terminalProtocol.unhandledControlSequence(b'\x1b[O' + ch)
- class ControlSequenceParser:
- def A(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.UP_ARROW, None)
- else:
- handler.unhandledControlSequence(buf + b'A')
- def B(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.DOWN_ARROW, None)
- else:
- handler.unhandledControlSequence(buf + b'B')
- def C(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.RIGHT_ARROW, None)
- else:
- handler.unhandledControlSequence(buf + b'C')
- def D(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.LEFT_ARROW, None)
- else:
- handler.unhandledControlSequence(buf + b'D')
- def E(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.NUMPAD_MIDDLE, None)
- else:
- handler.unhandledControlSequence(buf + b'E')
- def F(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.END, None)
- else:
- handler.unhandledControlSequence(buf + b'F')
- def H(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.HOME, None)
- else:
- handler.unhandledControlSequence(buf + b'H')
- def R(self, proto, handler, buf):
- if not proto._cursorReports:
- handler.unhandledControlSequence(buf + b'R')
- elif buf.startswith(b'\x1b['):
- report = buf[2:]
- parts = report.split(b';')
- if len(parts) != 2:
- handler.unhandledControlSequence(buf + b'R')
- else:
- Pl, Pc = parts
- try:
- Pl, Pc = int(Pl), int(Pc)
- except ValueError:
- handler.unhandledControlSequence(buf + b'R')
- else:
- d = proto._cursorReports.pop(0)
- d.callback((Pc - 1, Pl - 1))
- else:
- handler.unhandledControlSequence(buf + b'R')
- def Z(self, proto, handler, buf):
- if buf == b'\x1b[':
- handler.keystrokeReceived(proto.TAB, proto.SHIFT)
- else:
- handler.unhandledControlSequence(buf + b'Z')
- def tilde(self, proto, handler, buf):
- map = {1: proto.HOME, 2: proto.INSERT, 3: proto.DELETE,
- 4: proto.END, 5: proto.PGUP, 6: proto.PGDN,
- 15: proto.F5, 17: proto.F6, 18: proto.F7,
- 19: proto.F8, 20: proto.F9, 21: proto.F10,
- 23: proto.F11, 24: proto.F12}
- if buf.startswith(b'\x1b['):
- ch = buf[2:]
- try:
- v = int(ch)
- except ValueError:
- handler.unhandledControlSequence(buf + b'~')
- else:
- symbolic = map.get(v)
- if symbolic is not None:
- handler.keystrokeReceived(map[v], None)
- else:
- handler.unhandledControlSequence(buf + b'~')
- else:
- handler.unhandledControlSequence(buf + b'~')
- controlSequenceParser = ControlSequenceParser()
- # ITerminalTransport
- def cursorUp(self, n=1):
- assert n >= 1
- self.cursorPos.y = max(self.cursorPos.y - n, 0)
- self.write(b'\x1b[' + intToBytes(n) + b'A')
- def cursorDown(self, n=1):
- assert n >= 1
- self.cursorPos.y = min(self.cursorPos.y + n, self.termSize.y - 1)
- self.write(b'\x1b[' + intToBytes(n) + b'B')
- def cursorForward(self, n=1):
- assert n >= 1
- self.cursorPos.x = min(self.cursorPos.x + n, self.termSize.x - 1)
- self.write(b'\x1b[' + intToBytes(n) + b'C')
- def cursorBackward(self, n=1):
- assert n >= 1
- self.cursorPos.x = max(self.cursorPos.x - n, 0)
- self.write(b'\x1b[' + intToBytes(n) + b'D')
- def cursorPosition(self, column, line):
- self.write(b'\x1b[' +
- intToBytes(line + 1) +
- b';' +
- intToBytes(column + 1) +
- b'H')
- def cursorHome(self):
- self.cursorPos.x = self.cursorPos.y = 0
- self.write(b'\x1b[H')
- def index(self):
- # ECMA48 5th Edition removes this
- self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
- self.write(b'\x1bD')
- def reverseIndex(self):
- self.cursorPos.y = max(self.cursorPos.y - 1, 0)
- self.write(b'\x1bM')
- def nextLine(self):
- self.cursorPos.x = 0
- self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
- self.write(b'\n')
- def saveCursor(self):
- self._savedCursorPos = Vector(self.cursorPos.x, self.cursorPos.y)
- self.write(b'\x1b7')
- def restoreCursor(self):
- self.cursorPos = self._savedCursorPos
- del self._savedCursorPos
- self.write(b'\x1b8')
- def setModes(self, modes):
- # XXX Support ANSI-Compatible private modes
- modesBytes = b';'.join([intToBytes(mode) for mode in modes])
- self.write(b'\x1b[' + modesBytes + b'h')
- def setPrivateModes(self, modes):
- modesBytes = b';'.join([intToBytes(mode) for mode in modes])
- self.write(b'\x1b[?' + modesBytes + b'h')
- def resetModes(self, modes):
- # XXX Support ANSI-Compatible private modes
- modesBytes = b';'.join([intToBytes(mode) for mode in modes])
- self.write(b'\x1b[' + modesBytes + b'l')
- def resetPrivateModes(self, modes):
- modesBytes = b';'.join([intToBytes(mode) for mode in modes])
- self.write(b'\x1b[?' + modesBytes + b'l')
- def applicationKeypadMode(self):
- self.write(b'\x1b=')
- def numericKeypadMode(self):
- self.write(b'\x1b>')
- def selectCharacterSet(self, charSet, which):
- # XXX Rewrite these as dict lookups
- if which == G0:
- which = b'('
- elif which == G1:
- which = b')'
- else:
- raise ValueError("`which' argument to selectCharacterSet must be G0 or G1")
- if charSet == CS_UK:
- charSet = b'A'
- elif charSet == CS_US:
- charSet = b'B'
- elif charSet == CS_DRAWING:
- charSet = b'0'
- elif charSet == CS_ALTERNATE:
- charSet = b'1'
- elif charSet == CS_ALTERNATE_SPECIAL:
- charSet = b'2'
- else:
- raise ValueError("Invalid `charSet' argument to selectCharacterSet")
- self.write(b'\x1b' + which + charSet)
- def shiftIn(self):
- self.write(b'\x15')
- def shiftOut(self):
- self.write(b'\x14')
- def singleShift2(self):
- self.write(b'\x1bN')
- def singleShift3(self):
- self.write(b'\x1bO')
- def selectGraphicRendition(self, *attributes):
- # each member of attributes must be a native string
- attrs = []
- for a in attributes:
- attrs.append(networkString(a))
- self.write(b'\x1b[' +
- b';'.join(attrs) +
- b'm')
- def horizontalTabulationSet(self):
- self.write(b'\x1bH')
- def tabulationClear(self):
- self.write(b'\x1b[q')
- def tabulationClearAll(self):
- self.write(b'\x1b[3q')
- def doubleHeightLine(self, top=True):
- if top:
- self.write(b'\x1b#3')
- else:
- self.write(b'\x1b#4')
- def singleWidthLine(self):
- self.write(b'\x1b#5')
- def doubleWidthLine(self):
- self.write(b'\x1b#6')
- def eraseToLineEnd(self):
- self.write(b'\x1b[K')
- def eraseToLineBeginning(self):
- self.write(b'\x1b[1K')
- def eraseLine(self):
- self.write(b'\x1b[2K')
- def eraseToDisplayEnd(self):
- self.write(b'\x1b[J')
- def eraseToDisplayBeginning(self):
- self.write(b'\x1b[1J')
- def eraseDisplay(self):
- self.write(b'\x1b[2J')
- def deleteCharacter(self, n=1):
- self.write(b'\x1b[' + intToBytes(n) + b'P')
- def insertLine(self, n=1):
- self.write(b'\x1b[' + intToBytes(n) + b'L')
- def deleteLine(self, n=1):
- self.write(b'\x1b[' + intToBytes(n) + b'M')
- def setScrollRegion(self, first=None, last=None):
- if first is not None:
- first = intToBytes(first)
- else:
- first = b''
- if last is not None:
- last = intToBytes(last)
- else:
- last = b''
- self.write(b'\x1b[' + first + b';' + last + b'r')
- def resetScrollRegion(self):
- self.setScrollRegion()
- def reportCursorPosition(self):
- d = defer.Deferred()
- self._cursorReports.append(d)
- self.write(b'\x1b[6n')
- return d
- def reset(self):
- self.cursorPos.x = self.cursorPos.y = 0
- try:
- del self._savedCursorPos
- except AttributeError:
- pass
- self.write(b'\x1bc')
- # ITransport
- def write(self, data):
- if data:
- if not isinstance(data, bytes):
- data = data.encode("utf-8")
- self.lastWrite = data
- self.transport.write(b'\r\n'.join(data.split(b'\n')))
- def writeSequence(self, data):
- self.write(b''.join(data))
- def loseConnection(self):
- self.reset()
- self.transport.loseConnection()
- def connectionLost(self, reason):
- if self.terminalProtocol is not None:
- try:
- self.terminalProtocol.connectionLost(reason)
- finally:
- self.terminalProtocol = None
- # Add symbolic names for function keys
- for name, const in zip(_KEY_NAMES, FUNCTION_KEYS):
- setattr(ServerProtocol, name, const)
- class ClientProtocol(protocol.Protocol):
- terminalFactory = None
- terminal = None
- state = b'data'
- _escBuf = None
- _shorts = {
- b'D': b'index',
- b'M': b'reverseIndex',
- b'E': b'nextLine',
- b'7': b'saveCursor',
- b'8': b'restoreCursor',
- b'=': b'applicationKeypadMode',
- b'>': b'numericKeypadMode',
- b'N': b'singleShift2',
- b'O': b'singleShift3',
- b'H': b'horizontalTabulationSet',
- b'c': b'reset'}
- _longs = {
- b'[': b'bracket-escape',
- b'(': b'select-g0',
- b')': b'select-g1',
- b'#': b'select-height-width'}
- _charsets = {
- b'A': CS_UK,
- b'B': CS_US,
- b'0': CS_DRAWING,
- b'1': CS_ALTERNATE,
- b'2': CS_ALTERNATE_SPECIAL}
- # Factory who instantiated me
- factory = None
- def __init__(self, terminalFactory=None, *a, **kw):
- """
- @param terminalFactory: A callable which will be invoked with
- *a, **kw and should return an ITerminalTransport provider.
- This will be invoked when this ClientProtocol establishes a
- connection.
- @param a: Any positional arguments to pass to terminalFactory.
- @param kw: Any keyword arguments to pass to terminalFactory.
- """
- # assert terminalFactory is None or ITerminalTransport.implementedBy(terminalFactory), "ClientProtocol.__init__ must be passed an ITerminalTransport implementor"
- if terminalFactory is not None:
- self.terminalFactory = terminalFactory
- self.terminalArgs = a
- self.terminalKwArgs = kw
- def connectionMade(self):
- if self.terminalFactory is not None:
- self.terminal = self.terminalFactory(*self.terminalArgs, **self.terminalKwArgs)
- self.terminal.factory = self.factory
- self.terminal.makeConnection(self)
- def connectionLost(self, reason):
- if self.terminal is not None:
- try:
- self.terminal.connectionLost(reason)
- finally:
- del self.terminal
- def dataReceived(self, data):
- """
- Parse the given data from a terminal server, dispatching to event
- handlers defined by C{self.terminal}.
- """
- toWrite = []
- for b in iterbytes(data):
- if self.state == b'data':
- if b == b'\x1b':
- if toWrite:
- self.terminal.write(b''.join(toWrite))
- del toWrite[:]
- self.state = b'escaped'
- elif b == b'\x14':
- if toWrite:
- self.terminal.write(b''.join(toWrite))
- del toWrite[:]
- self.terminal.shiftOut()
- elif b == b'\x15':
- if toWrite:
- self.terminal.write(b''.join(toWrite))
- del toWrite[:]
- self.terminal.shiftIn()
- elif b == b'\x08':
- if toWrite:
- self.terminal.write(b''.join(toWrite))
- del toWrite[:]
- self.terminal.cursorBackward()
- else:
- toWrite.append(b)
- elif self.state == b'escaped':
- fName = self._shorts.get(b)
- if fName is not None:
- self.state = b'data'
- getattr(self.terminal, fName.decode("ascii"))()
- else:
- state = self._longs.get(b)
- if state is not None:
- self.state = state
- else:
- self.terminal.unhandledControlSequence(b'\x1b' + b)
- self.state = b'data'
- elif self.state == b'bracket-escape':
- if self._escBuf is None:
- self._escBuf = []
- if b.isalpha() or b == b'~':
- self._handleControlSequence(b''.join(self._escBuf), b)
- del self._escBuf
- self.state = b'data'
- else:
- self._escBuf.append(b)
- elif self.state == b'select-g0':
- self.terminal.selectCharacterSet(self._charsets.get(b, b), G0)
- self.state = b'data'
- elif self.state == b'select-g1':
- self.terminal.selectCharacterSet(self._charsets.get(b, b), G1)
- self.state = b'data'
- elif self.state == b'select-height-width':
- self._handleHeightWidth(b)
- self.state = b'data'
- else:
- raise ValueError("Illegal state")
- if toWrite:
- self.terminal.write(b''.join(toWrite))
- def _handleControlSequence(self, buf, terminal):
- f = getattr(self.controlSequenceParser, CST.get(terminal, terminal).decode("ascii"), None)
- if f is None:
- self.terminal.unhandledControlSequence(b'\x1b[' + buf + terminal)
- else:
- f(self, self.terminal, buf)
- class ControlSequenceParser:
- def _makeSimple(ch, fName):
- n = 'cursor' + fName
- def simple(self, proto, handler, buf):
- if not buf:
- getattr(handler, n)(1)
- else:
- try:
- m = int(buf)
- except ValueError:
- handler.unhandledControlSequence(b'\x1b[' + buf + ch)
- else:
- getattr(handler, n)(m)
- return simple
- for (ch, fName) in (('A', 'Up'),
- ('B', 'Down'),
- ('C', 'Forward'),
- ('D', 'Backward')):
- exec(ch + " = _makeSimple(ch, fName)")
- del _makeSimple
- def h(self, proto, handler, buf):
- # XXX - Handle '?' to introduce ANSI-Compatible private modes.
- try:
- modes = [int(mode) for mode in buf.split(b';')]
- except ValueError:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'h')
- else:
- handler.setModes(modes)
- def l(self, proto, handler, buf):
- # XXX - Handle '?' to introduce ANSI-Compatible private modes.
- try:
- modes = [int(mode) for mode in buf.split(b';')]
- except ValueError:
- handler.unhandledControlSequence(b'\x1b[' + buf + 'l')
- else:
- handler.resetModes(modes)
- def r(self, proto, handler, buf):
- parts = buf.split(b';')
- if len(parts) == 1:
- handler.setScrollRegion(None, None)
- elif len(parts) == 2:
- try:
- if parts[0]:
- pt = int(parts[0])
- else:
- pt = None
- if parts[1]:
- pb = int(parts[1])
- else:
- pb = None
- except ValueError:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'r')
- else:
- handler.setScrollRegion(pt, pb)
- else:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'r')
- def K(self, proto, handler, buf):
- if not buf:
- handler.eraseToLineEnd()
- elif buf == b'1':
- handler.eraseToLineBeginning()
- elif buf == b'2':
- handler.eraseLine()
- else:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'K')
- def H(self, proto, handler, buf):
- handler.cursorHome()
- def J(self, proto, handler, buf):
- if not buf:
- handler.eraseToDisplayEnd()
- elif buf == b'1':
- handler.eraseToDisplayBeginning()
- elif buf == b'2':
- handler.eraseDisplay()
- else:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'J')
- def P(self, proto, handler, buf):
- if not buf:
- handler.deleteCharacter(1)
- else:
- try:
- n = int(buf)
- except ValueError:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'P')
- else:
- handler.deleteCharacter(n)
- def L(self, proto, handler, buf):
- if not buf:
- handler.insertLine(1)
- else:
- try:
- n = int(buf)
- except ValueError:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'L')
- else:
- handler.insertLine(n)
- def M(self, proto, handler, buf):
- if not buf:
- handler.deleteLine(1)
- else:
- try:
- n = int(buf)
- except ValueError:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'M')
- else:
- handler.deleteLine(n)
- def n(self, proto, handler, buf):
- if buf == b'6':
- x, y = handler.reportCursorPosition()
- proto.transport.write(b'\x1b['
- + intToBytes(x+1)
- + b';'
- + intToBytes(y+1)
- + b'R')
- else:
- handler.unhandledControlSequence(b'\x1b[' + buf + b'n')
- def m(self, proto, handler, buf):
- if not buf:
- handler.selectGraphicRendition(NORMAL)
- else:
- attrs = []
- for a in buf.split(b';'):
- try:
- a = int(a)
- except ValueError:
- pass
- attrs.append(a)
- handler.selectGraphicRendition(*attrs)
- controlSequenceParser = ControlSequenceParser()
- def _handleHeightWidth(self, b):
- if b == b'3':
- self.terminal.doubleHeightLine(True)
- elif b == b'4':
- self.terminal.doubleHeightLine(False)
- elif b == b'5':
- self.terminal.singleWidthLine()
- elif b == b'6':
- self.terminal.doubleWidthLine()
- else:
- self.terminal.unhandledControlSequence(b'\x1b#' + b)
- __all__ = [
- # Interfaces
- 'ITerminalProtocol', 'ITerminalTransport',
- # Symbolic constants
- 'modes', 'privateModes', 'FUNCTION_KEYS',
- 'CS_US', 'CS_UK', 'CS_DRAWING', 'CS_ALTERNATE', 'CS_ALTERNATE_SPECIAL',
- 'G0', 'G1', 'G2', 'G3',
- 'UNDERLINE', 'REVERSE_VIDEO', 'BLINK', 'BOLD', 'NORMAL',
- # Protocol classes
- 'ServerProtocol', 'ClientProtocol']
|