123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194 |
- # -*- test-case-name: twisted.conch.test.test_telnet -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Telnet protocol implementation.
- @author: Jean-Paul Calderone
- """
- from __future__ import absolute_import, division
- import struct
- from zope.interface import implementer
- from twisted.internet import protocol, interfaces as iinternet, defer
- from twisted.python import log
- from twisted.python.compat import _bytesChr as chr, iterbytes
- MODE = chr(1)
- EDIT = 1
- TRAPSIG = 2
- MODE_ACK = 4
- SOFT_TAB = 8
- LIT_ECHO = 16
- # Characters gleaned from the various (and conflicting) RFCs. Not all of these are correct.
- NULL = chr(0) # No operation.
- BEL = chr(7) # Produces an audible or
- # visible signal (which does
- # NOT move the print head).
- BS = chr(8) # Moves the print head one
- # character position towards
- # the left margin.
- HT = chr(9) # Moves the printer to the
- # next horizontal tab stop.
- # It remains unspecified how
- # either party determines or
- # establishes where such tab
- # stops are located.
- LF = chr(10) # Moves the printer to the
- # next print line, keeping the
- # same horizontal position.
- VT = chr(11) # Moves the printer to the
- # next vertical tab stop. It
- # remains unspecified how
- # either party determines or
- # establishes where such tab
- # stops are located.
- FF = chr(12) # Moves the printer to the top
- # of the next page, keeping
- # the same horizontal position.
- CR = chr(13) # Moves the printer to the left
- # margin of the current line.
- ECHO = chr(1) # User-to-Server: Asks the server to send
- # Echos of the transmitted data.
- SGA = chr(3) # Suppress Go Ahead. Go Ahead is silly
- # and most modern servers should suppress
- # it.
- NAWS = chr(31) # Negotiate About Window Size. Indicate that
- # information about the size of the terminal
- # can be communicated.
- LINEMODE = chr(34) # Allow line buffering to be
- # negotiated about.
- SE = chr(240) # End of subnegotiation parameters.
- NOP = chr(241) # No operation.
- DM = chr(242) # "Data Mark": The data stream portion
- # of a Synch. This should always be
- # accompanied by a TCP Urgent
- # notification.
- BRK = chr(243) # NVT character Break.
- IP = chr(244) # The function Interrupt Process.
- AO = chr(245) # The function Abort Output
- AYT = chr(246) # The function Are You There.
- EC = chr(247) # The function Erase Character.
- EL = chr(248) # The function Erase Line
- GA = chr(249) # The Go Ahead signal.
- SB = chr(250) # Indicates that what follows is
- # subnegotiation of the indicated
- # option.
- WILL = chr(251) # Indicates the desire to begin
- # performing, or confirmation that
- # you are now performing, the
- # indicated option.
- WONT = chr(252) # Indicates the refusal to perform,
- # or continue performing, the
- # indicated option.
- DO = chr(253) # Indicates the request that the
- # other party perform, or
- # confirmation that you are expecting
- # the other party to perform, the
- # indicated option.
- DONT = chr(254) # Indicates the demand that the
- # other party stop performing,
- # or confirmation that you are no
- # longer expecting the other party
- # to perform, the indicated option.
- IAC = chr(255) # Data Byte 255. Introduces a
- # telnet command.
- LINEMODE_MODE = chr(1)
- LINEMODE_EDIT = chr(1)
- LINEMODE_TRAPSIG = chr(2)
- LINEMODE_MODE_ACK = chr(4)
- LINEMODE_SOFT_TAB = chr(8)
- LINEMODE_LIT_ECHO = chr(16)
- LINEMODE_FORWARDMASK = chr(2)
- LINEMODE_SLC = chr(3)
- LINEMODE_SLC_SYNCH = chr(1)
- LINEMODE_SLC_BRK = chr(2)
- LINEMODE_SLC_IP = chr(3)
- LINEMODE_SLC_AO = chr(4)
- LINEMODE_SLC_AYT = chr(5)
- LINEMODE_SLC_EOR = chr(6)
- LINEMODE_SLC_ABORT = chr(7)
- LINEMODE_SLC_EOF = chr(8)
- LINEMODE_SLC_SUSP = chr(9)
- LINEMODE_SLC_EC = chr(10)
- LINEMODE_SLC_EL = chr(11)
- LINEMODE_SLC_EW = chr(12)
- LINEMODE_SLC_RP = chr(13)
- LINEMODE_SLC_LNEXT = chr(14)
- LINEMODE_SLC_XON = chr(15)
- LINEMODE_SLC_XOFF = chr(16)
- LINEMODE_SLC_FORW1 = chr(17)
- LINEMODE_SLC_FORW2 = chr(18)
- LINEMODE_SLC_MCL = chr(19)
- LINEMODE_SLC_MCR = chr(20)
- LINEMODE_SLC_MCWL = chr(21)
- LINEMODE_SLC_MCWR = chr(22)
- LINEMODE_SLC_MCBOL = chr(23)
- LINEMODE_SLC_MCEOL = chr(24)
- LINEMODE_SLC_INSRT = chr(25)
- LINEMODE_SLC_OVER = chr(26)
- LINEMODE_SLC_ECR = chr(27)
- LINEMODE_SLC_EWR = chr(28)
- LINEMODE_SLC_EBOL = chr(29)
- LINEMODE_SLC_EEOL = chr(30)
- LINEMODE_SLC_DEFAULT = chr(3)
- LINEMODE_SLC_VALUE = chr(2)
- LINEMODE_SLC_CANTCHANGE = chr(1)
- LINEMODE_SLC_NOSUPPORT = chr(0)
- LINEMODE_SLC_LEVELBITS = chr(3)
- LINEMODE_SLC_ACK = chr(128)
- LINEMODE_SLC_FLUSHIN = chr(64)
- LINEMODE_SLC_FLUSHOUT = chr(32)
- LINEMODE_EOF = chr(236)
- LINEMODE_SUSP = chr(237)
- LINEMODE_ABORT = chr(238)
- class ITelnetProtocol(iinternet.IProtocol):
- def unhandledCommand(command, argument):
- """
- A command was received but not understood.
- @param command: the command received.
- @type command: L{str}, a single character.
- @param argument: the argument to the received command.
- @type argument: L{str}, a single character, or None if the command that
- was unhandled does not provide an argument.
- """
- def unhandledSubnegotiation(command, data):
- """
- A subnegotiation command was received but not understood.
- @param command: the command being subnegotiated. That is, the first
- byte after the SB command.
- @type command: L{str}, a single character.
- @param data: all other bytes of the subneogation. That is, all but the
- first bytes between SB and SE, with IAC un-escaping applied.
- @type data: L{bytes}, each a single character
- """
- def enableLocal(option):
- """
- Enable the given option locally.
- This should enable the given option on this side of the
- telnet connection and return True. If False is returned,
- the option will be treated as still disabled and the peer
- will be notified.
- @param option: the option to be enabled.
- @type option: L{bytes}, a single character.
- """
- def enableRemote(option):
- """
- Indicate whether the peer should be allowed to enable this option.
- Returns True if the peer should be allowed to enable this option,
- False otherwise.
- @param option: the option to be enabled.
- @type option: L{bytes}, a single character.
- """
- def disableLocal(option):
- """
- Disable the given option locally.
- Unlike enableLocal, this method cannot fail. The option must be
- disabled.
- @param option: the option to be disabled.
- @type option: L{bytes}, a single character.
- """
- def disableRemote(option):
- """
- Indicate that the peer has disabled this option.
- @param option: the option to be disabled.
- @type option: L{bytes}, a single character.
- """
- class ITelnetTransport(iinternet.ITransport):
- def do(option):
- """
- Indicate a desire for the peer to begin performing the given option.
- Returns a Deferred that fires with True when the peer begins performing
- the option, or fails with L{OptionRefused} when the peer refuses to
- perform it. If the peer is already performing the given option, the
- Deferred will fail with L{AlreadyEnabled}. If a negotiation regarding
- this option is already in progress, the Deferred will fail with
- L{AlreadyNegotiating}.
- Note: It is currently possible that this Deferred will never fire,
- if the peer never responds, or if the peer believes the option to
- already be enabled.
- """
- def dont(option):
- """
- Indicate a desire for the peer to cease performing the given option.
- Returns a Deferred that fires with True when the peer ceases performing
- the option. If the peer is not performing the given option, the
- Deferred will fail with L{AlreadyDisabled}. If negotiation regarding
- this option is already in progress, the Deferred will fail with
- L{AlreadyNegotiating}.
- Note: It is currently possible that this Deferred will never fire,
- if the peer never responds, or if the peer believes the option to
- already be disabled.
- """
- def will(option):
- """
- Indicate our willingness to begin performing this option locally.
- Returns a Deferred that fires with True when the peer agrees to allow us
- to begin performing this option, or fails with L{OptionRefused} if the
- peer refuses to allow us to begin performing it. If the option is
- already enabled locally, the Deferred will fail with L{AlreadyEnabled}.
- If negotiation regarding this option is already in progress, the
- Deferred will fail with L{AlreadyNegotiating}.
- Note: It is currently possible that this Deferred will never fire,
- if the peer never responds, or if the peer believes the option to
- already be enabled.
- """
- def wont(option):
- """
- Indicate that we will stop performing the given option.
- Returns a Deferred that fires with True when the peer acknowledges
- we have stopped performing this option. If the option is already
- disabled locally, the Deferred will fail with L{AlreadyDisabled}.
- If negotiation regarding this option is already in progress,
- the Deferred will fail with L{AlreadyNegotiating}.
- Note: It is currently possible that this Deferred will never fire,
- if the peer never responds, or if the peer believes the option to
- already be disabled.
- """
- def requestNegotiation(about, data):
- """
- Send a subnegotiation request.
- @param about: A byte indicating the feature being negotiated.
- @param data: Any number of L{bytes} containing specific information
- about the negotiation being requested. No values in this string
- need to be escaped, as this function will escape any value which
- requires it.
- """
- class TelnetError(Exception):
- pass
- class NegotiationError(TelnetError):
- def __str__(self):
- return self.__class__.__module__ + '.' + self.__class__.__name__ + ':' + repr(self.args[0])
- class OptionRefused(NegotiationError):
- pass
- class AlreadyEnabled(NegotiationError):
- pass
- class AlreadyDisabled(NegotiationError):
- pass
- class AlreadyNegotiating(NegotiationError):
- pass
- @implementer(ITelnetProtocol)
- class TelnetProtocol(protocol.Protocol):
- def unhandledCommand(self, command, argument):
- pass
- def unhandledSubnegotiation(self, command, data):
- pass
- def enableLocal(self, option):
- pass
- def enableRemote(self, option):
- pass
- def disableLocal(self, option):
- pass
- def disableRemote(self, option):
- pass
- class Telnet(protocol.Protocol):
- """
- @ivar commandMap: A mapping of bytes to callables. When a
- telnet command is received, the command byte (the first byte
- after IAC) is looked up in this dictionary. If a callable is
- found, it is invoked with the argument of the command, or None
- if the command takes no argument. Values should be added to
- this dictionary if commands wish to be handled. By default,
- only WILL, WONT, DO, and DONT are handled. These should not
- be overridden, as this class handles them correctly and
- provides an API for interacting with them.
- @ivar negotiationMap: A mapping of bytes to callables. When
- a subnegotiation command is received, the command byte (the
- first byte after SB) is looked up in this dictionary. If
- a callable is found, it is invoked with the argument of the
- subnegotiation. Values should be added to this dictionary if
- subnegotiations are to be handled. By default, no values are
- handled.
- @ivar options: A mapping of option bytes to their current
- state. This state is likely of little use to user code.
- Changes should not be made to it.
- @ivar state: A string indicating the current parse state. It
- can take on the values "data", "escaped", "command", "newline",
- "subnegotiation", and "subnegotiation-escaped". Changes
- should not be made to it.
- @ivar transport: This protocol's transport object.
- """
- # One of a lot of things
- state = 'data'
- def __init__(self):
- self.options = {}
- self.negotiationMap = {}
- self.commandMap = {
- WILL: self.telnet_WILL,
- WONT: self.telnet_WONT,
- DO: self.telnet_DO,
- DONT: self.telnet_DONT}
- def _write(self, data):
- self.transport.write(data)
- class _OptionState:
- """
- Represents the state of an option on both sides of a telnet
- connection.
- @ivar us: The state of the option on this side of the connection.
- @ivar him: The state of the option on the other side of the
- connection.
- """
- class _Perspective:
- """
- Represents the state of an option on side of the telnet
- connection. Some options can be enabled on a particular side of
- the connection (RFC 1073 for example: only the client can have
- NAWS enabled). Other options can be enabled on either or both
- sides (such as RFC 1372: each side can have its own flow control
- state).
- @ivar state: C{'yes'} or C{'no'} indicating whether or not this
- option is enabled on one side of the connection.
- @ivar negotiating: A boolean tracking whether negotiation about
- this option is in progress.
- @ivar onResult: When negotiation about this option has been
- initiated by this side of the connection, a L{Deferred}
- which will fire with the result of the negotiation. L{None}
- at other times.
- """
- state = 'no'
- negotiating = False
- onResult = None
- def __str__(self):
- return self.state + ('*' * self.negotiating)
- def __init__(self):
- self.us = self._Perspective()
- self.him = self._Perspective()
- def __repr__(self):
- return '<_OptionState us=%s him=%s>' % (self.us, self.him)
- def getOptionState(self, opt):
- return self.options.setdefault(opt, self._OptionState())
- def _do(self, option):
- self._write(IAC + DO + option)
- def _dont(self, option):
- self._write(IAC + DONT + option)
- def _will(self, option):
- self._write(IAC + WILL + option)
- def _wont(self, option):
- self._write(IAC + WONT + option)
- def will(self, option):
- """
- Indicate our willingness to enable an option.
- """
- s = self.getOptionState(option)
- if s.us.negotiating or s.him.negotiating:
- return defer.fail(AlreadyNegotiating(option))
- elif s.us.state == 'yes':
- return defer.fail(AlreadyEnabled(option))
- else:
- s.us.negotiating = True
- s.us.onResult = d = defer.Deferred()
- self._will(option)
- return d
- def wont(self, option):
- """
- Indicate we are not willing to enable an option.
- """
- s = self.getOptionState(option)
- if s.us.negotiating or s.him.negotiating:
- return defer.fail(AlreadyNegotiating(option))
- elif s.us.state == 'no':
- return defer.fail(AlreadyDisabled(option))
- else:
- s.us.negotiating = True
- s.us.onResult = d = defer.Deferred()
- self._wont(option)
- return d
- def do(self, option):
- s = self.getOptionState(option)
- if s.us.negotiating or s.him.negotiating:
- return defer.fail(AlreadyNegotiating(option))
- elif s.him.state == 'yes':
- return defer.fail(AlreadyEnabled(option))
- else:
- s.him.negotiating = True
- s.him.onResult = d = defer.Deferred()
- self._do(option)
- return d
- def dont(self, option):
- s = self.getOptionState(option)
- if s.us.negotiating or s.him.negotiating:
- return defer.fail(AlreadyNegotiating(option))
- elif s.him.state == 'no':
- return defer.fail(AlreadyDisabled(option))
- else:
- s.him.negotiating = True
- s.him.onResult = d = defer.Deferred()
- self._dont(option)
- return d
- def requestNegotiation(self, about, data):
- """
- Send a negotiation message for the option C{about} with C{data} as the
- payload.
- @param data: the payload
- @type data: L{bytes}
- @see: L{ITelnetTransport.requestNegotiation}
- """
- data = data.replace(IAC, IAC * 2)
- self._write(IAC + SB + about + data + IAC + SE)
- def dataReceived(self, data):
- appDataBuffer = []
- for b in iterbytes(data):
- if self.state == 'data':
- if b == IAC:
- self.state = 'escaped'
- elif b == b'\r':
- self.state = 'newline'
- else:
- appDataBuffer.append(b)
- elif self.state == 'escaped':
- if b == IAC:
- appDataBuffer.append(b)
- self.state = 'data'
- elif b == SB:
- self.state = 'subnegotiation'
- self.commands = []
- elif b in (NOP, DM, BRK, IP, AO, AYT, EC, EL, GA):
- self.state = 'data'
- if appDataBuffer:
- self.applicationDataReceived(b''.join(appDataBuffer))
- del appDataBuffer[:]
- self.commandReceived(b, None)
- elif b in (WILL, WONT, DO, DONT):
- self.state = 'command'
- self.command = b
- else:
- raise ValueError("Stumped", b)
- elif self.state == 'command':
- self.state = 'data'
- command = self.command
- del self.command
- if appDataBuffer:
- self.applicationDataReceived(b''.join(appDataBuffer))
- del appDataBuffer[:]
- self.commandReceived(command, b)
- elif self.state == 'newline':
- self.state = 'data'
- if b == b'\n':
- appDataBuffer.append(b'\n')
- elif b == b'\0':
- appDataBuffer.append(b'\r')
- elif b == IAC:
- # IAC isn't really allowed after \r, according to the
- # RFC, but handling it this way is less surprising than
- # delivering the IAC to the app as application data.
- # The purpose of the restriction is to allow terminals
- # to unambiguously interpret the behavior of the CR
- # after reading only one more byte. CR LF is supposed
- # to mean one thing (cursor to next line, first column),
- # CR NUL another (cursor to first column). Absent the
- # NUL, it still makes sense to interpret this as CR and
- # then apply all the usual interpretation to the IAC.
- appDataBuffer.append(b'\r')
- self.state = 'escaped'
- else:
- appDataBuffer.append(b'\r' + b)
- elif self.state == 'subnegotiation':
- if b == IAC:
- self.state = 'subnegotiation-escaped'
- else:
- self.commands.append(b)
- elif self.state == 'subnegotiation-escaped':
- if b == SE:
- self.state = 'data'
- commands = self.commands
- del self.commands
- if appDataBuffer:
- self.applicationDataReceived(b''.join(appDataBuffer))
- del appDataBuffer[:]
- self.negotiate(commands)
- else:
- self.state = 'subnegotiation'
- self.commands.append(b)
- else:
- raise ValueError("How'd you do this?")
- if appDataBuffer:
- self.applicationDataReceived(b''.join(appDataBuffer))
- def connectionLost(self, reason):
- for state in self.options.values():
- if state.us.onResult is not None:
- d = state.us.onResult
- state.us.onResult = None
- d.errback(reason)
- if state.him.onResult is not None:
- d = state.him.onResult
- state.him.onResult = None
- d.errback(reason)
- def applicationDataReceived(self, data):
- """
- Called with application-level data.
- """
- def unhandledCommand(self, command, argument):
- """
- Called for commands for which no handler is installed.
- """
- def commandReceived(self, command, argument):
- cmdFunc = self.commandMap.get(command)
- if cmdFunc is None:
- self.unhandledCommand(command, argument)
- else:
- cmdFunc(argument)
- def unhandledSubnegotiation(self, command, data):
- """
- Called for subnegotiations for which no handler is installed.
- """
- def negotiate(self, data):
- command, data = data[0], data[1:]
- cmdFunc = self.negotiationMap.get(command)
- if cmdFunc is None:
- self.unhandledSubnegotiation(command, data)
- else:
- cmdFunc(data)
- def telnet_WILL(self, option):
- s = self.getOptionState(option)
- self.willMap[s.him.state, s.him.negotiating](self, s, option)
- def will_no_false(self, state, option):
- # He is unilaterally offering to enable an option.
- if self.enableRemote(option):
- state.him.state = 'yes'
- self._do(option)
- else:
- self._dont(option)
- def will_no_true(self, state, option):
- # Peer agreed to enable an option in response to our request.
- state.him.state = 'yes'
- state.him.negotiating = False
- d = state.him.onResult
- state.him.onResult = None
- d.callback(True)
- assert self.enableRemote(option), "enableRemote must return True in this context (for option %r)" % (option,)
- def will_yes_false(self, state, option):
- # He is unilaterally offering to enable an already-enabled option.
- # Ignore this.
- pass
- def will_yes_true(self, state, option):
- # This is a bogus state. It is here for completeness. It will
- # never be entered.
- assert False, "will_yes_true can never be entered, but was called with %r, %r" % (state, option)
- willMap = {('no', False): will_no_false, ('no', True): will_no_true,
- ('yes', False): will_yes_false, ('yes', True): will_yes_true}
- def telnet_WONT(self, option):
- s = self.getOptionState(option)
- self.wontMap[s.him.state, s.him.negotiating](self, s, option)
- def wont_no_false(self, state, option):
- # He is unilaterally demanding that an already-disabled option be/remain disabled.
- # Ignore this (although we could record it and refuse subsequent enable attempts
- # from our side - he can always refuse them again though, so we won't)
- pass
- def wont_no_true(self, state, option):
- # Peer refused to enable an option in response to our request.
- state.him.negotiating = False
- d = state.him.onResult
- state.him.onResult = None
- d.errback(OptionRefused(option))
- def wont_yes_false(self, state, option):
- # Peer is unilaterally demanding that an option be disabled.
- state.him.state = 'no'
- self.disableRemote(option)
- self._dont(option)
- def wont_yes_true(self, state, option):
- # Peer agreed to disable an option at our request.
- state.him.state = 'no'
- state.him.negotiating = False
- d = state.him.onResult
- state.him.onResult = None
- d.callback(True)
- self.disableRemote(option)
- wontMap = {('no', False): wont_no_false, ('no', True): wont_no_true,
- ('yes', False): wont_yes_false, ('yes', True): wont_yes_true}
- def telnet_DO(self, option):
- s = self.getOptionState(option)
- self.doMap[s.us.state, s.us.negotiating](self, s, option)
- def do_no_false(self, state, option):
- # Peer is unilaterally requesting that we enable an option.
- if self.enableLocal(option):
- state.us.state = 'yes'
- self._will(option)
- else:
- self._wont(option)
- def do_no_true(self, state, option):
- # Peer agreed to allow us to enable an option at our request.
- state.us.state = 'yes'
- state.us.negotiating = False
- d = state.us.onResult
- state.us.onResult = None
- d.callback(True)
- self.enableLocal(option)
- def do_yes_false(self, state, option):
- # Peer is unilaterally requesting us to enable an already-enabled option.
- # Ignore this.
- pass
- def do_yes_true(self, state, option):
- # This is a bogus state. It is here for completeness. It will never be
- # entered.
- assert False, "do_yes_true can never be entered, but was called with %r, %r" % (state, option)
- doMap = {('no', False): do_no_false, ('no', True): do_no_true,
- ('yes', False): do_yes_false, ('yes', True): do_yes_true}
- def telnet_DONT(self, option):
- s = self.getOptionState(option)
- self.dontMap[s.us.state, s.us.negotiating](self, s, option)
- def dont_no_false(self, state, option):
- # Peer is unilaterally demanding us to disable an already-disabled option.
- # Ignore this.
- pass
- def dont_no_true(self, state, option):
- # Offered option was refused. Fail the Deferred returned by the
- # previous will() call.
- state.us.negotiating = False
- d = state.us.onResult
- state.us.onResult = None
- d.errback(OptionRefused(option))
- def dont_yes_false(self, state, option):
- # Peer is unilaterally demanding we disable an option.
- state.us.state = 'no'
- self.disableLocal(option)
- self._wont(option)
- def dont_yes_true(self, state, option):
- # Peer acknowledged our notice that we will disable an option.
- state.us.state = 'no'
- state.us.negotiating = False
- d = state.us.onResult
- state.us.onResult = None
- d.callback(True)
- self.disableLocal(option)
- dontMap = {('no', False): dont_no_false, ('no', True): dont_no_true,
- ('yes', False): dont_yes_false, ('yes', True): dont_yes_true}
- def enableLocal(self, option):
- """
- Reject all attempts to enable options.
- """
- return False
- def enableRemote(self, option):
- """
- Reject all attempts to enable options.
- """
- return False
- def disableLocal(self, option):
- """
- Signal a programming error by raising an exception.
- L{enableLocal} must return true for the given value of C{option} in
- order for this method to be called. If a subclass of L{Telnet}
- overrides enableLocal to allow certain options to be enabled, it must
- also override disableLocal to disable those options.
- @raise NotImplementedError: Always raised.
- """
- raise NotImplementedError(
- "Don't know how to disable local telnet option %r" % (option,))
- def disableRemote(self, option):
- """
- Signal a programming error by raising an exception.
- L{enableRemote} must return true for the given value of C{option} in
- order for this method to be called. If a subclass of L{Telnet}
- overrides enableRemote to allow certain options to be enabled, it must
- also override disableRemote tto disable those options.
- @raise NotImplementedError: Always raised.
- """
- raise NotImplementedError(
- "Don't know how to disable remote telnet option %r" % (option,))
- class ProtocolTransportMixin:
- def write(self, data):
- self.transport.write(data.replace(b'\n', b'\r\n'))
- def writeSequence(self, seq):
- self.transport.writeSequence(seq)
- def loseConnection(self):
- self.transport.loseConnection()
- def getHost(self):
- return self.transport.getHost()
- def getPeer(self):
- return self.transport.getPeer()
- class TelnetTransport(Telnet, ProtocolTransportMixin):
- """
- @ivar protocol: An instance of the protocol to which this
- transport is connected, or None before the connection is
- established and after it is lost.
- @ivar protocolFactory: A callable which returns protocol instances
- which provide L{ITelnetProtocol}. This will be invoked when a
- connection is established. It is passed *protocolArgs and
- **protocolKwArgs.
- @ivar protocolArgs: A tuple of additional arguments to
- pass to protocolFactory.
- @ivar protocolKwArgs: A dictionary of additional arguments
- to pass to protocolFactory.
- """
- disconnecting = False
- protocolFactory = None
- protocol = None
- def __init__(self, protocolFactory=None, *a, **kw):
- Telnet.__init__(self)
- if protocolFactory is not None:
- self.protocolFactory = protocolFactory
- self.protocolArgs = a
- self.protocolKwArgs = kw
- def connectionMade(self):
- if self.protocolFactory is not None:
- self.protocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
- assert ITelnetProtocol.providedBy(self.protocol)
- try:
- factory = self.factory
- except AttributeError:
- pass
- else:
- self.protocol.factory = factory
- self.protocol.makeConnection(self)
- def connectionLost(self, reason):
- Telnet.connectionLost(self, reason)
- if self.protocol is not None:
- try:
- self.protocol.connectionLost(reason)
- finally:
- del self.protocol
- def enableLocal(self, option):
- return self.protocol.enableLocal(option)
- def enableRemote(self, option):
- return self.protocol.enableRemote(option)
- def disableLocal(self, option):
- return self.protocol.disableLocal(option)
- def disableRemote(self, option):
- return self.protocol.disableRemote(option)
- def unhandledSubnegotiation(self, command, data):
- self.protocol.unhandledSubnegotiation(command, data)
- def unhandledCommand(self, command, argument):
- self.protocol.unhandledCommand(command, argument)
- def applicationDataReceived(self, data):
- self.protocol.dataReceived(data)
- def write(self, data):
- ProtocolTransportMixin.write(self, data.replace(b'\xff', b'\xff\xff'))
- class TelnetBootstrapProtocol(TelnetProtocol, ProtocolTransportMixin):
- protocol = None
- def __init__(self, protocolFactory, *args, **kw):
- self.protocolFactory = protocolFactory
- self.protocolArgs = args
- self.protocolKwArgs = kw
- def connectionMade(self):
- self.transport.negotiationMap[NAWS] = self.telnet_NAWS
- self.transport.negotiationMap[LINEMODE] = self.telnet_LINEMODE
- for opt in (LINEMODE, NAWS, SGA):
- self.transport.do(opt).addErrback(log.err)
- for opt in (ECHO,):
- self.transport.will(opt).addErrback(log.err)
- self.protocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
- try:
- factory = self.factory
- except AttributeError:
- pass
- else:
- self.protocol.factory = factory
- self.protocol.makeConnection(self)
- def connectionLost(self, reason):
- if self.protocol is not None:
- try:
- self.protocol.connectionLost(reason)
- finally:
- del self.protocol
- def dataReceived(self, data):
- self.protocol.dataReceived(data)
- def enableLocal(self, opt):
- if opt == ECHO:
- return True
- elif opt == SGA:
- return True
- else:
- return False
- def enableRemote(self, opt):
- if opt == LINEMODE:
- self.transport.requestNegotiation(LINEMODE, MODE + chr(TRAPSIG))
- return True
- elif opt == NAWS:
- return True
- elif opt == SGA:
- return True
- else:
- return False
- def telnet_NAWS(self, data):
- # NAWS is client -> server *only*. self.protocol will
- # therefore be an ITerminalTransport, the `.protocol'
- # attribute of which will be an ITerminalProtocol. Maybe.
- # You know what, XXX TODO clean this up.
- if len(data) == 4:
- width, height = struct.unpack('!HH', b''.join(data))
- self.protocol.terminalProtocol.terminalSize(width, height)
- else:
- log.msg("Wrong number of NAWS bytes")
- linemodeSubcommands = {
- LINEMODE_SLC: 'SLC'}
- def telnet_LINEMODE(self, data):
- linemodeSubcommand = data[0]
- if 0:
- # XXX TODO: This should be enabled to parse linemode subnegotiation.
- getattr(self, 'linemode_' + self.linemodeSubcommands[linemodeSubcommand])(data[1:])
- def linemode_SLC(self, data):
- chunks = zip(*[iter(data)]*3)
- for slcFunction, slcValue, slcWhat in chunks:
- # Later, we should parse stuff.
- 'SLC', ord(slcFunction), ord(slcValue), ord(slcWhat)
- from twisted.protocols import basic
- class StatefulTelnetProtocol(basic.LineReceiver, TelnetProtocol):
- delimiter = b'\n'
- state = 'Discard'
- def connectionLost(self, reason):
- basic.LineReceiver.connectionLost(self, reason)
- TelnetProtocol.connectionLost(self, reason)
- def lineReceived(self, line):
- oldState = self.state
- newState = getattr(self, "telnet_" + oldState)(line)
- if newState is not None:
- if self.state == oldState:
- self.state = newState
- else:
- log.msg("Warning: state changed and new state returned")
- def telnet_Discard(self, line):
- pass
- from twisted.cred import credentials
- class AuthenticatingTelnetProtocol(StatefulTelnetProtocol):
- """
- A protocol which prompts for credentials and attempts to authenticate them.
- Username and password prompts are given (the password is obscured). When the
- information is collected, it is passed to a portal and an avatar implementing
- L{ITelnetProtocol} is requested. If an avatar is returned, it connected to this
- protocol's transport, and this protocol's transport is connected to it.
- Otherwise, the user is re-prompted for credentials.
- """
- state = "User"
- protocol = None
- def __init__(self, portal):
- self.portal = portal
- def connectionMade(self):
- self.transport.write(b"Username: ")
- def connectionLost(self, reason):
- StatefulTelnetProtocol.connectionLost(self, reason)
- if self.protocol is not None:
- try:
- self.protocol.connectionLost(reason)
- self.logout()
- finally:
- del self.protocol, self.logout
- def telnet_User(self, line):
- self.username = line
- self.transport.will(ECHO)
- self.transport.write(b"Password: ")
- return 'Password'
- def telnet_Password(self, line):
- username, password = self.username, line
- del self.username
- def login(ignored):
- creds = credentials.UsernamePassword(username, password)
- d = self.portal.login(creds, None, ITelnetProtocol)
- d.addCallback(self._cbLogin)
- d.addErrback(self._ebLogin)
- self.transport.wont(ECHO).addCallback(login)
- return 'Discard'
- def _cbLogin(self, ial):
- interface, protocol, logout = ial
- assert interface is ITelnetProtocol
- self.protocol = protocol
- self.logout = logout
- self.state = 'Command'
- protocol.makeConnection(self.transport)
- self.transport.protocol = protocol
- def _ebLogin(self, failure):
- self.transport.write(b"\nAuthentication failed\n")
- self.transport.write(b"Username: ")
- self.state = "User"
- __all__ = [
- # Exceptions
- 'TelnetError', 'NegotiationError', 'OptionRefused',
- 'AlreadyNegotiating', 'AlreadyEnabled', 'AlreadyDisabled',
- # Interfaces
- 'ITelnetProtocol', 'ITelnetTransport',
- # Other stuff, protocols, etc.
- 'Telnet', 'TelnetProtocol', 'TelnetTransport',
- 'TelnetBootstrapProtocol',
- ]
|