123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592 |
- # -*- test-case-name: twisted.conch.test.test_checkers -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Provide L{ICredentialsChecker} implementations to be used in Conch protocols.
- """
- from __future__ import absolute_import, division
- import sys
- import binascii
- import errno
- try:
- import pwd
- except ImportError:
- pwd = None
- else:
- import crypt
- try:
- import spwd
- except ImportError:
- spwd = None
- from zope.interface import providedBy, implementer, Interface
- from incremental import Version
- from twisted.conch import error
- from twisted.conch.ssh import keys
- from twisted.cred.checkers import ICredentialsChecker
- from twisted.cred.credentials import IUsernamePassword, ISSHPrivateKey
- from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
- from twisted.internet import defer
- from twisted.python.compat import _keys, _PY3, _b64decodebytes
- from twisted.python import failure, reflect, log
- from twisted.python.deprecate import deprecatedModuleAttribute
- from twisted.python.util import runAsEffectiveUser
- from twisted.python.filepath import FilePath
- def verifyCryptedPassword(crypted, pw):
- """
- Check that the password, when crypted, matches the stored crypted password.
- @param crypted: The stored crypted password.
- @type crypted: L{str}
- @param pw: The password the user has given.
- @type pw: L{str}
- @rtype: L{bool}
- """
- return crypt.crypt(pw, crypted) == crypted
- def _pwdGetByName(username):
- """
- Look up a user in the /etc/passwd database using the pwd module. If the
- pwd module is not available, return None.
- @param username: the username of the user to return the passwd database
- information for.
- @type username: L{str}
- """
- if pwd is None:
- return None
- return pwd.getpwnam(username)
- def _shadowGetByName(username):
- """
- Look up a user in the /etc/shadow database using the spwd module. If it is
- not available, return L{None}.
- @param username: the username of the user to return the shadow database
- information for.
- @type username: L{str}
- """
- if spwd is not None:
- f = spwd.getspnam
- else:
- return None
- return runAsEffectiveUser(0, 0, f, username)
- @implementer(ICredentialsChecker)
- class UNIXPasswordDatabase:
- """
- A checker which validates users out of the UNIX password databases, or
- databases of a compatible format.
- @ivar _getByNameFunctions: a C{list} of functions which are called in order
- to valid a user. The default value is such that the C{/etc/passwd}
- database will be tried first, followed by the C{/etc/shadow} database.
- """
- credentialInterfaces = IUsernamePassword,
- def __init__(self, getByNameFunctions=None):
- if getByNameFunctions is None:
- getByNameFunctions = [_pwdGetByName, _shadowGetByName]
- self._getByNameFunctions = getByNameFunctions
- def requestAvatarId(self, credentials):
- # We get bytes, but the Py3 pwd module uses str. So attempt to decode
- # it using the same method that CPython does for the file on disk.
- if _PY3:
- username = credentials.username.decode(sys.getfilesystemencoding())
- password = credentials.password.decode(sys.getfilesystemencoding())
- else:
- username = credentials.username
- password = credentials.password
- for func in self._getByNameFunctions:
- try:
- pwnam = func(username)
- except KeyError:
- return defer.fail(UnauthorizedLogin("invalid username"))
- else:
- if pwnam is not None:
- crypted = pwnam[1]
- if crypted == '':
- continue
- if verifyCryptedPassword(crypted, password):
- return defer.succeed(credentials.username)
- # fallback
- return defer.fail(UnauthorizedLogin("unable to verify password"))
- @implementer(ICredentialsChecker)
- class SSHPublicKeyDatabase:
- """
- Checker that authenticates SSH public keys, based on public keys listed in
- authorized_keys and authorized_keys2 files in user .ssh/ directories.
- """
- credentialInterfaces = (ISSHPrivateKey,)
- _userdb = pwd
- def requestAvatarId(self, credentials):
- d = defer.maybeDeferred(self.checkKey, credentials)
- d.addCallback(self._cbRequestAvatarId, credentials)
- d.addErrback(self._ebRequestAvatarId)
- return d
- def _cbRequestAvatarId(self, validKey, credentials):
- """
- Check whether the credentials themselves are valid, now that we know
- if the key matches the user.
- @param validKey: A boolean indicating whether or not the public key
- matches a key in the user's authorized_keys file.
- @param credentials: The credentials offered by the user.
- @type credentials: L{ISSHPrivateKey} provider
- @raise UnauthorizedLogin: (as a failure) if the key does not match the
- user in C{credentials}. Also raised if the user provides an invalid
- signature.
- @raise ValidPublicKey: (as a failure) if the key matches the user but
- the credentials do not include a signature. See
- L{error.ValidPublicKey} for more information.
- @return: The user's username, if authentication was successful.
- """
- if not validKey:
- return failure.Failure(UnauthorizedLogin("invalid key"))
- if not credentials.signature:
- return failure.Failure(error.ValidPublicKey())
- else:
- try:
- pubKey = keys.Key.fromString(credentials.blob)
- if pubKey.verify(credentials.signature, credentials.sigData):
- return credentials.username
- except: # any error should be treated as a failed login
- log.err()
- return failure.Failure(UnauthorizedLogin('error while verifying key'))
- return failure.Failure(UnauthorizedLogin("unable to verify key"))
- def getAuthorizedKeysFiles(self, credentials):
- """
- Return a list of L{FilePath} instances for I{authorized_keys} files
- which might contain information about authorized keys for the given
- credentials.
- On OpenSSH servers, the default location of the file containing the
- list of authorized public keys is
- U{$HOME/.ssh/authorized_keys<http://www.openbsd.org/cgi-bin/man.cgi?query=sshd_config>}.
- I{$HOME/.ssh/authorized_keys2} is also returned, though it has been
- U{deprecated by OpenSSH since
- 2001<http://marc.info/?m=100508718416162>}.
- @return: A list of L{FilePath} instances to files with the authorized keys.
- """
- pwent = self._userdb.getpwnam(credentials.username)
- root = FilePath(pwent.pw_dir).child('.ssh')
- files = ['authorized_keys', 'authorized_keys2']
- return [root.child(f) for f in files]
- def checkKey(self, credentials):
- """
- Retrieve files containing authorized keys and check against user
- credentials.
- """
- ouid, ogid = self._userdb.getpwnam(credentials.username)[2:4]
- for filepath in self.getAuthorizedKeysFiles(credentials):
- if not filepath.exists():
- continue
- try:
- lines = filepath.open()
- except IOError as e:
- if e.errno == errno.EACCES:
- lines = runAsEffectiveUser(ouid, ogid, filepath.open)
- else:
- raise
- with lines:
- for l in lines:
- l2 = l.split()
- if len(l2) < 2:
- continue
- try:
- if _b64decodebytes(l2[1]) == credentials.blob:
- return True
- except binascii.Error:
- continue
- return False
- def _ebRequestAvatarId(self, f):
- if not f.check(UnauthorizedLogin):
- log.msg(f)
- return failure.Failure(UnauthorizedLogin("unable to get avatar id"))
- return f
- @implementer(ICredentialsChecker)
- class SSHProtocolChecker:
- """
- SSHProtocolChecker is a checker that requires multiple authentications
- to succeed. To add a checker, call my registerChecker method with
- the checker and the interface.
- After each successful authenticate, I call my areDone method with the
- avatar id. To get a list of the successful credentials for an avatar id,
- use C{SSHProcotolChecker.successfulCredentials[avatarId]}. If L{areDone}
- returns True, the authentication has succeeded.
- """
- def __init__(self):
- self.checkers = {}
- self.successfulCredentials = {}
- def get_credentialInterfaces(self):
- return _keys(self.checkers)
- credentialInterfaces = property(get_credentialInterfaces)
- def registerChecker(self, checker, *credentialInterfaces):
- if not credentialInterfaces:
- credentialInterfaces = checker.credentialInterfaces
- for credentialInterface in credentialInterfaces:
- self.checkers[credentialInterface] = checker
- def requestAvatarId(self, credentials):
- """
- Part of the L{ICredentialsChecker} interface. Called by a portal with
- some credentials to check if they'll authenticate a user. We check the
- interfaces that the credentials provide against our list of acceptable
- checkers. If one of them matches, we ask that checker to verify the
- credentials. If they're valid, we call our L{_cbGoodAuthentication}
- method to continue.
- @param credentials: the credentials the L{Portal} wants us to verify
- """
- ifac = providedBy(credentials)
- for i in ifac:
- c = self.checkers.get(i)
- if c is not None:
- d = defer.maybeDeferred(c.requestAvatarId, credentials)
- return d.addCallback(self._cbGoodAuthentication,
- credentials)
- return defer.fail(UnhandledCredentials("No checker for %s" % \
- ', '.join(map(reflect.qual, ifac))))
- def _cbGoodAuthentication(self, avatarId, credentials):
- """
- Called if a checker has verified the credentials. We call our
- L{areDone} method to see if the whole of the successful authentications
- are enough. If they are, we return the avatar ID returned by the first
- checker.
- """
- if avatarId not in self.successfulCredentials:
- self.successfulCredentials[avatarId] = []
- self.successfulCredentials[avatarId].append(credentials)
- if self.areDone(avatarId):
- del self.successfulCredentials[avatarId]
- return avatarId
- else:
- raise error.NotEnoughAuthentication()
- def areDone(self, avatarId):
- """
- Override to determine if the authentication is finished for a given
- avatarId.
- @param avatarId: the avatar returned by the first checker. For
- this checker to function correctly, all the checkers must
- return the same avatar ID.
- """
- return True
- deprecatedModuleAttribute(
- Version("Twisted", 15, 0, 0),
- ("Please use twisted.conch.checkers.SSHPublicKeyChecker, "
- "initialized with an instance of "
- "twisted.conch.checkers.UNIXAuthorizedKeysFiles instead."),
- __name__, "SSHPublicKeyDatabase")
- class IAuthorizedKeysDB(Interface):
- """
- An object that provides valid authorized ssh keys mapped to usernames.
- @since: 15.0
- """
- def getAuthorizedKeys(avatarId):
- """
- Gets an iterable of authorized keys that are valid for the given
- C{avatarId}.
- @param avatarId: the ID of the avatar
- @type avatarId: valid return value of
- L{twisted.cred.checkers.ICredentialsChecker.requestAvatarId}
- @return: an iterable of L{twisted.conch.ssh.keys.Key}
- """
- def readAuthorizedKeyFile(fileobj, parseKey=keys.Key.fromString):
- """
- Reads keys from an authorized keys file. Any non-comment line that cannot
- be parsed as a key will be ignored, although that particular line will
- be logged.
- @param fileobj: something from which to read lines which can be parsed
- as keys
- @type fileobj: L{file}-like object
- @param parseKey: a callable that takes a string and returns a
- L{twisted.conch.ssh.keys.Key}, mainly to be used for testing. The
- default is L{twisted.conch.ssh.keys.Key.fromString}.
- @type parseKey: L{callable}
- @return: an iterable of L{twisted.conch.ssh.keys.Key}
- @rtype: iterable
- @since: 15.0
- """
- for line in fileobj:
- line = line.strip()
- if line and not line.startswith(b'#'): # for comments
- try:
- yield parseKey(line)
- except keys.BadKeyError as e:
- log.msg('Unable to parse line "{0}" as a key: {1!s}'
- .format(line, e))
- def _keysFromFilepaths(filepaths, parseKey):
- """
- Helper function that turns an iterable of filepaths into a generator of
- keys. If any file cannot be read, a message is logged but it is
- otherwise ignored.
- @param filepaths: iterable of L{twisted.python.filepath.FilePath}.
- @type filepaths: iterable
- @param parseKey: a callable that takes a string and returns a
- L{twisted.conch.ssh.keys.Key}
- @type parseKey: L{callable}
- @return: generator of L{twisted.conch.ssh.keys.Key}
- @rtype: generator
- @since: 15.0
- """
- for fp in filepaths:
- if fp.exists():
- try:
- with fp.open() as f:
- for key in readAuthorizedKeyFile(f, parseKey):
- yield key
- except (IOError, OSError) as e:
- log.msg("Unable to read {0}: {1!s}".format(fp.path, e))
- @implementer(IAuthorizedKeysDB)
- class InMemorySSHKeyDB(object):
- """
- Object that provides SSH public keys based on a dictionary of usernames
- mapped to L{twisted.conch.ssh.keys.Key}s.
- @since: 15.0
- """
- def __init__(self, mapping):
- """
- Initializes a new L{InMemorySSHKeyDB}.
- @param mapping: mapping of usernames to iterables of
- L{twisted.conch.ssh.keys.Key}s
- @type mapping: L{dict}
- """
- self._mapping = mapping
- def getAuthorizedKeys(self, username):
- return self._mapping.get(username, [])
- @implementer(IAuthorizedKeysDB)
- class UNIXAuthorizedKeysFiles(object):
- """
- Object that provides SSH public keys based on public keys listed in
- authorized_keys and authorized_keys2 files in UNIX user .ssh/ directories.
- If any of the files cannot be read, a message is logged but that file is
- otherwise ignored.
- @since: 15.0
- """
- def __init__(self, userdb=None, parseKey=keys.Key.fromString):
- """
- Initializes a new L{UNIXAuthorizedKeysFiles}.
- @param userdb: access to the Unix user account and password database
- (default is the Python module L{pwd})
- @type userdb: L{pwd}-like object
- @param parseKey: a callable that takes a string and returns a
- L{twisted.conch.ssh.keys.Key}, mainly to be used for testing. The
- default is L{twisted.conch.ssh.keys.Key.fromString}.
- @type parseKey: L{callable}
- """
- self._userdb = userdb
- self._parseKey = parseKey
- if userdb is None:
- self._userdb = pwd
- def getAuthorizedKeys(self, username):
- try:
- passwd = self._userdb.getpwnam(username)
- except KeyError:
- return ()
- root = FilePath(passwd.pw_dir).child('.ssh')
- files = ['authorized_keys', 'authorized_keys2']
- return _keysFromFilepaths((root.child(f) for f in files),
- self._parseKey)
- @implementer(ICredentialsChecker)
- class SSHPublicKeyChecker(object):
- """
- Checker that authenticates SSH public keys, based on public keys listed in
- authorized_keys and authorized_keys2 files in user .ssh/ directories.
- Initializing this checker with a L{UNIXAuthorizedKeysFiles} should be
- used instead of L{twisted.conch.checkers.SSHPublicKeyDatabase}.
- @since: 15.0
- """
- credentialInterfaces = (ISSHPrivateKey,)
- def __init__(self, keydb):
- """
- Initializes a L{SSHPublicKeyChecker}.
- @param keydb: a provider of L{IAuthorizedKeysDB}
- @type keydb: L{IAuthorizedKeysDB} provider
- """
- self._keydb = keydb
- def requestAvatarId(self, credentials):
- d = defer.maybeDeferred(self._sanityCheckKey, credentials)
- d.addCallback(self._checkKey, credentials)
- d.addCallback(self._verifyKey, credentials)
- return d
- def _sanityCheckKey(self, credentials):
- """
- Checks whether the provided credentials are a valid SSH key with a
- signature (does not actually verify the signature).
- @param credentials: the credentials offered by the user
- @type credentials: L{ISSHPrivateKey} provider
- @raise ValidPublicKey: the credentials do not include a signature. See
- L{error.ValidPublicKey} for more information.
- @raise BadKeyError: The key included with the credentials is not
- recognized as a key.
- @return: the key in the credentials
- @rtype: L{twisted.conch.ssh.keys.Key}
- """
- if not credentials.signature:
- raise error.ValidPublicKey()
- return keys.Key.fromString(credentials.blob)
- def _checkKey(self, pubKey, credentials):
- """
- Checks the public key against all authorized keys (if any) for the
- user.
- @param pubKey: the key in the credentials (just to prevent it from
- having to be calculated again)
- @type pubKey:
- @param credentials: the credentials offered by the user
- @type credentials: L{ISSHPrivateKey} provider
- @raise UnauthorizedLogin: If the key is not authorized, or if there
- was any error obtaining a list of authorized keys for the user.
- @return: C{pubKey} if the key is authorized
- @rtype: L{twisted.conch.ssh.keys.Key}
- """
- if any(key == pubKey for key in
- self._keydb.getAuthorizedKeys(credentials.username)):
- return pubKey
- raise UnauthorizedLogin("Key not authorized")
- def _verifyKey(self, pubKey, credentials):
- """
- Checks whether the credentials themselves are valid, now that we know
- if the key matches the user.
- @param pubKey: the key in the credentials (just to prevent it from
- having to be calculated again)
- @type pubKey: L{twisted.conch.ssh.keys.Key}
- @param credentials: the credentials offered by the user
- @type credentials: L{ISSHPrivateKey} provider
- @raise UnauthorizedLogin: If the key signature is invalid or there
- was any error verifying the signature.
- @return: The user's username, if authentication was successful
- @rtype: L{bytes}
- """
- try:
- if pubKey.verify(credentials.signature, credentials.sigData):
- return credentials.username
- except: # Any error should be treated as a failed login
- log.err()
- raise UnauthorizedLogin('Error while verifying key')
- raise UnauthorizedLogin("Key signature invalid.")
|