123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058 |
- # -*- test-case-name: twisted.test.test_sslverify -*-
- # Copyright (c) 2005 Divmod, Inc.
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- from __future__ import division, absolute_import
- import warnings
- from constantly import Names, NamedConstant
- from hashlib import md5
- from OpenSSL import SSL, crypto
- from OpenSSL._util import lib as pyOpenSSLlib
- from twisted.internet.abstract import isIPAddress, isIPv6Address
- from twisted.python import log
- from twisted.python.randbytes import secureRandom
- from twisted.python._oldstyle import _oldStyle
- from ._idna import _idnaBytes
- from zope.interface import Interface, implementer
- from constantly import Flags, FlagConstant
- from incremental import Version
- from twisted.internet.defer import Deferred
- from twisted.internet.error import VerifyError, CertificateError
- from twisted.internet.interfaces import (
- IAcceptableCiphers, ICipher, IOpenSSLClientConnectionCreator,
- IOpenSSLContextFactory
- )
- from twisted.python import util
- from twisted.python.deprecate import _mutuallyExclusiveArguments
- from twisted.python.compat import nativeString, unicode
- from twisted.python.failure import Failure
- from twisted.python.util import FancyEqMixin
- from twisted.python.deprecate import deprecated
- class TLSVersion(Names):
- """
- TLS versions that we can negotiate with the client/server.
- """
- SSLv3 = NamedConstant()
- TLSv1_0 = NamedConstant()
- TLSv1_1 = NamedConstant()
- TLSv1_2 = NamedConstant()
- TLSv1_3 = NamedConstant()
- _tlsDisableFlags = {
- TLSVersion.SSLv3: SSL.OP_NO_SSLv3,
- TLSVersion.TLSv1_0: SSL.OP_NO_TLSv1,
- TLSVersion.TLSv1_1: SSL.OP_NO_TLSv1_1,
- TLSVersion.TLSv1_2: SSL.OP_NO_TLSv1_2,
- # If we don't have TLS v1.3 yet, we can't disable it -- this is just so
- # when it makes it into OpenSSL, connections knowingly bracketed to v1.2
- # don't end up going to v1.3
- TLSVersion.TLSv1_3: getattr(SSL, "OP_NO_TLSv1_3", 0x00),
- }
- def _getExcludedTLSProtocols(oldest, newest):
- """
- Given a pair of L{TLSVersion} constants, figure out what versions we want
- to disable (as OpenSSL is an exclusion based API).
- @param oldest: The oldest L{TLSVersion} we want to allow.
- @type oldest: L{TLSVersion} constant
- @param newest: The newest L{TLSVersion} we want to allow, or L{None} for no
- upper limit.
- @type newest: L{TLSVersion} constant or L{None}
- @return: The versions we want to disable.
- @rtype: L{list} of L{TLSVersion} constants.
- """
- versions = list(TLSVersion.iterconstants())
- excludedVersions = [x for x in versions[:versions.index(oldest)]]
- if newest:
- excludedVersions.extend([x for x in versions[versions.index(newest):]])
- return excludedVersions
- class SimpleVerificationError(Exception):
- """
- Not a very useful verification error.
- """
- def simpleVerifyHostname(connection, hostname):
- """
- Check only the common name in the certificate presented by the peer and
- only for an exact match.
- This is to provide I{something} in the way of hostname verification to
- users who haven't installed C{service_identity}. This check is overly
- strict, relies on a deprecated TLS feature (you're supposed to ignore the
- commonName if the subjectAlternativeName extensions are present, I
- believe), and lots of valid certificates will fail.
- @param connection: the OpenSSL connection to verify.
- @type connection: L{OpenSSL.SSL.Connection}
- @param hostname: The hostname expected by the user.
- @type hostname: L{unicode}
- @raise twisted.internet.ssl.VerificationError: if the common name and
- hostname don't match.
- """
- commonName = connection.get_peer_certificate().get_subject().commonName
- if commonName != hostname:
- raise SimpleVerificationError(repr(commonName) + "!=" +
- repr(hostname))
- def simpleVerifyIPAddress(connection, hostname):
- """
- Always fails validation of IP addresses
- @param connection: the OpenSSL connection to verify.
- @type connection: L{OpenSSL.SSL.Connection}
- @param hostname: The hostname expected by the user.
- @type hostname: L{unicode}
- @raise twisted.internet.ssl.VerificationError: Always raised
- """
- raise SimpleVerificationError("Cannot verify certificate IP addresses")
- def _usablePyOpenSSL(version):
- """
- Check pyOpenSSL version string whether we can use it for host verification.
- @param version: A pyOpenSSL version string.
- @type version: L{str}
- @rtype: L{bool}
- """
- major, minor = (int(part) for part in version.split(".")[:2])
- return (major, minor) >= (0, 12)
- def _selectVerifyImplementation():
- """
- Determine if C{service_identity} is installed. If so, use it. If not, use
- simplistic and incorrect checking as implemented in
- L{simpleVerifyHostname}.
- @return: 2-tuple of (C{verify_hostname}, C{VerificationError})
- @rtype: L{tuple}
- """
- whatsWrong = (
- "Without the service_identity module, Twisted can perform only "
- "rudimentary TLS client hostname verification. Many valid "
- "certificate/hostname mappings may be rejected."
- )
- try:
- from service_identity import VerificationError
- from service_identity.pyopenssl import (
- verify_hostname, verify_ip_address
- )
- return verify_hostname, verify_ip_address, VerificationError
- except ImportError as e:
- warnings.warn_explicit(
- "You do not have a working installation of the "
- "service_identity module: '" + str(e) + "'. "
- "Please install it from "
- "<https://pypi.python.org/pypi/service_identity> and make "
- "sure all of its dependencies are satisfied. "
- + whatsWrong,
- # Unfortunately the lineno is required.
- category=UserWarning, filename="", lineno=0)
- return simpleVerifyHostname, simpleVerifyIPAddress, SimpleVerificationError
- verifyHostname, verifyIPAddress, VerificationError = \
- _selectVerifyImplementation()
- class ProtocolNegotiationSupport(Flags):
- """
- L{ProtocolNegotiationSupport} defines flags which are used to indicate the
- level of NPN/ALPN support provided by the TLS backend.
- @cvar NOSUPPORT: There is no support for NPN or ALPN. This is exclusive
- with both L{NPN} and L{ALPN}.
- @cvar NPN: The implementation supports Next Protocol Negotiation.
- @cvar ALPN: The implementation supports Application Layer Protocol
- Negotiation.
- """
- NPN = FlagConstant(0x0001)
- ALPN = FlagConstant(0x0002)
- # FIXME: https://twistedmatrix.com/trac/ticket/8074
- # Currently flags with literal zero values behave incorrectly. However,
- # creating a flag by NOTing a flag with itself appears to work totally fine, so
- # do that instead.
- ProtocolNegotiationSupport.NOSUPPORT = (
- ProtocolNegotiationSupport.NPN ^ ProtocolNegotiationSupport.NPN
- )
- def protocolNegotiationMechanisms():
- """
- Checks whether your versions of PyOpenSSL and OpenSSL are recent enough to
- support protocol negotiation, and if they are, what kind of protocol
- negotiation is supported.
- @return: A combination of flags from L{ProtocolNegotiationSupport} that
- indicate which mechanisms for protocol negotiation are supported.
- @rtype: L{constantly.FlagConstant}
- """
- support = ProtocolNegotiationSupport.NOSUPPORT
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- try:
- ctx.set_npn_advertise_callback(lambda c: None)
- except (AttributeError, NotImplementedError):
- pass
- else:
- support |= ProtocolNegotiationSupport.NPN
- try:
- ctx.set_alpn_select_callback(lambda c: None)
- except (AttributeError, NotImplementedError):
- pass
- else:
- support |= ProtocolNegotiationSupport.ALPN
- return support
- _x509names = {
- 'CN': 'commonName',
- 'commonName': 'commonName',
- 'O': 'organizationName',
- 'organizationName': 'organizationName',
- 'OU': 'organizationalUnitName',
- 'organizationalUnitName': 'organizationalUnitName',
- 'L': 'localityName',
- 'localityName': 'localityName',
- 'ST': 'stateOrProvinceName',
- 'stateOrProvinceName': 'stateOrProvinceName',
- 'C': 'countryName',
- 'countryName': 'countryName',
- 'emailAddress': 'emailAddress'}
- class DistinguishedName(dict):
- """
- Identify and describe an entity.
- Distinguished names are used to provide a minimal amount of identifying
- information about a certificate issuer or subject. They are commonly
- created with one or more of the following fields::
- commonName (CN)
- organizationName (O)
- organizationalUnitName (OU)
- localityName (L)
- stateOrProvinceName (ST)
- countryName (C)
- emailAddress
- A L{DistinguishedName} should be constructed using keyword arguments whose
- keys can be any of the field names above (as a native string), and the
- values are either Unicode text which is encodable to ASCII, or L{bytes}
- limited to the ASCII subset. Any fields passed to the constructor will be
- set as attributes, accessible using both their extended name and their
- shortened acronym. The attribute values will be the ASCII-encoded
- bytes. For example::
- >>> dn = DistinguishedName(commonName=b'www.example.com',
- ... C='US')
- >>> dn.C
- b'US'
- >>> dn.countryName
- b'US'
- >>> hasattr(dn, "organizationName")
- False
- L{DistinguishedName} instances can also be used as dictionaries; the keys
- are extended name of the fields::
- >>> dn.keys()
- ['countryName', 'commonName']
- >>> dn['countryName']
- b'US'
- """
- __slots__ = ()
- def __init__(self, **kw):
- for k, v in kw.items():
- setattr(self, k, v)
- def _copyFrom(self, x509name):
- for name in _x509names:
- value = getattr(x509name, name, None)
- if value is not None:
- setattr(self, name, value)
- def _copyInto(self, x509name):
- for k, v in self.items():
- setattr(x509name, k, nativeString(v))
- def __repr__(self):
- return '<DN %s>' % (dict.__repr__(self)[1:-1])
- def __getattr__(self, attr):
- try:
- return self[_x509names[attr]]
- except KeyError:
- raise AttributeError(attr)
- def __setattr__(self, attr, value):
- if attr not in _x509names:
- raise AttributeError("%s is not a valid OpenSSL X509 name field" % (attr,))
- realAttr = _x509names[attr]
- if not isinstance(value, bytes):
- value = value.encode("ascii")
- self[realAttr] = value
- def inspect(self):
- """
- Return a multi-line, human-readable representation of this DN.
- @rtype: L{str}
- """
- l = []
- lablen = 0
- def uniqueValues(mapping):
- return set(mapping.values())
- for k in sorted(uniqueValues(_x509names)):
- label = util.nameToLabel(k)
- lablen = max(len(label), lablen)
- v = getattr(self, k, None)
- if v is not None:
- l.append((label, nativeString(v)))
- lablen += 2
- for n, (label, attr) in enumerate(l):
- l[n] = (label.rjust(lablen)+': '+ attr)
- return '\n'.join(l)
- DN = DistinguishedName
- @_oldStyle
- class CertBase:
- """
- Base class for public (certificate only) and private (certificate + key
- pair) certificates.
- @ivar original: The underlying OpenSSL certificate object.
- @type original: L{OpenSSL.crypto.X509}
- """
- def __init__(self, original):
- self.original = original
- def _copyName(self, suffix):
- dn = DistinguishedName()
- dn._copyFrom(getattr(self.original, 'get_'+suffix)())
- return dn
- def getSubject(self):
- """
- Retrieve the subject of this certificate.
- @return: A copy of the subject of this certificate.
- @rtype: L{DistinguishedName}
- """
- return self._copyName('subject')
- def __conform__(self, interface):
- """
- Convert this L{CertBase} into a provider of the given interface.
- @param interface: The interface to conform to.
- @type interface: L{zope.interface.interfaces.IInterface}
- @return: an L{IOpenSSLTrustRoot} provider or L{NotImplemented}
- @rtype: L{IOpenSSLTrustRoot} or L{NotImplemented}
- """
- if interface is IOpenSSLTrustRoot:
- return OpenSSLCertificateAuthorities([self.original])
- return NotImplemented
- def _handleattrhelper(Class, transport, methodName):
- """
- (private) Helper for L{Certificate.peerFromTransport} and
- L{Certificate.hostFromTransport} which checks for incompatible handle types
- and null certificates and raises the appropriate exception or returns the
- appropriate certificate object.
- """
- method = getattr(transport.getHandle(),
- "get_%s_certificate" % (methodName,), None)
- if method is None:
- raise CertificateError(
- "non-TLS transport %r did not have %s certificate" % (transport, methodName))
- cert = method()
- if cert is None:
- raise CertificateError(
- "TLS transport %r did not have %s certificate" % (transport, methodName))
- return Class(cert)
- class Certificate(CertBase):
- """
- An x509 certificate.
- """
- def __repr__(self):
- return '<%s Subject=%s Issuer=%s>' % (self.__class__.__name__,
- self.getSubject().commonName,
- self.getIssuer().commonName)
- def __eq__(self, other):
- if isinstance(other, Certificate):
- return self.dump() == other.dump()
- return False
- def __ne__(self, other):
- return not self.__eq__(other)
- @classmethod
- def load(Class, requestData, format=crypto.FILETYPE_ASN1, args=()):
- """
- Load a certificate from an ASN.1- or PEM-format string.
- @rtype: C{Class}
- """
- return Class(crypto.load_certificate(format, requestData), *args)
- # We can't use super() because it is old style still, so we have to hack
- # around things wanting to call the parent function
- _load = load
- def dumpPEM(self):
- """
- Dump this certificate to a PEM-format data string.
- @rtype: L{str}
- """
- return self.dump(crypto.FILETYPE_PEM)
- @classmethod
- def loadPEM(Class, data):
- """
- Load a certificate from a PEM-format data string.
- @rtype: C{Class}
- """
- return Class.load(data, crypto.FILETYPE_PEM)
- @classmethod
- def peerFromTransport(Class, transport):
- """
- Get the certificate for the remote end of the given transport.
- @param transport: an L{ISystemHandle} provider
- @rtype: C{Class}
- @raise: L{CertificateError}, if the given transport does not have a peer
- certificate.
- """
- return _handleattrhelper(Class, transport, 'peer')
- @classmethod
- def hostFromTransport(Class, transport):
- """
- Get the certificate for the local end of the given transport.
- @param transport: an L{ISystemHandle} provider; the transport we will
- @rtype: C{Class}
- @raise: L{CertificateError}, if the given transport does not have a host
- certificate.
- """
- return _handleattrhelper(Class, transport, 'host')
- def getPublicKey(self):
- """
- Get the public key for this certificate.
- @rtype: L{PublicKey}
- """
- return PublicKey(self.original.get_pubkey())
- def dump(self, format=crypto.FILETYPE_ASN1):
- return crypto.dump_certificate(format, self.original)
- def serialNumber(self):
- """
- Retrieve the serial number of this certificate.
- @rtype: L{int}
- """
- return self.original.get_serial_number()
- def digest(self, method='md5'):
- """
- Return a digest hash of this certificate using the specified hash
- algorithm.
- @param method: One of C{'md5'} or C{'sha'}.
- @return: The digest of the object, formatted as b":"-delimited hex
- pairs
- @rtype: L{bytes}
- """
- return self.original.digest(method)
- def _inspect(self):
- return '\n'.join(['Certificate For Subject:',
- self.getSubject().inspect(),
- '\nIssuer:',
- self.getIssuer().inspect(),
- '\nSerial Number: %d' % self.serialNumber(),
- 'Digest: %s' % nativeString(self.digest())])
- def inspect(self):
- """
- Return a multi-line, human-readable representation of this
- Certificate, including information about the subject, issuer, and
- public key.
- """
- return '\n'.join((self._inspect(), self.getPublicKey().inspect()))
- def getIssuer(self):
- """
- Retrieve the issuer of this certificate.
- @rtype: L{DistinguishedName}
- @return: A copy of the issuer of this certificate.
- """
- return self._copyName('issuer')
- def options(self, *authorities):
- raise NotImplementedError('Possible, but doubtful we need this yet')
- class CertificateRequest(CertBase):
- """
- An x509 certificate request.
- Certificate requests are given to certificate authorities to be signed and
- returned resulting in an actual certificate.
- """
- @classmethod
- def load(Class, requestData, requestFormat=crypto.FILETYPE_ASN1):
- req = crypto.load_certificate_request(requestFormat, requestData)
- dn = DistinguishedName()
- dn._copyFrom(req.get_subject())
- if not req.verify(req.get_pubkey()):
- raise VerifyError("Can't verify that request for %r is self-signed." % (dn,))
- return Class(req)
- def dump(self, format=crypto.FILETYPE_ASN1):
- return crypto.dump_certificate_request(format, self.original)
- class PrivateCertificate(Certificate):
- """
- An x509 certificate and private key.
- """
- def __repr__(self):
- return Certificate.__repr__(self) + ' with ' + repr(self.privateKey)
- def _setPrivateKey(self, privateKey):
- if not privateKey.matches(self.getPublicKey()):
- raise VerifyError(
- "Certificate public and private keys do not match.")
- self.privateKey = privateKey
- return self
- def newCertificate(self, newCertData, format=crypto.FILETYPE_ASN1):
- """
- Create a new L{PrivateCertificate} from the given certificate data and
- this instance's private key.
- """
- return self.load(newCertData, self.privateKey, format)
- @classmethod
- def load(Class, data, privateKey, format=crypto.FILETYPE_ASN1):
- return Class._load(data, format)._setPrivateKey(privateKey)
- def inspect(self):
- return '\n'.join([Certificate._inspect(self),
- self.privateKey.inspect()])
- def dumpPEM(self):
- """
- Dump both public and private parts of a private certificate to
- PEM-format data.
- """
- return self.dump(crypto.FILETYPE_PEM) + self.privateKey.dump(crypto.FILETYPE_PEM)
- @classmethod
- def loadPEM(Class, data):
- """
- Load both private and public parts of a private certificate from a
- chunk of PEM-format data.
- """
- return Class.load(data, KeyPair.load(data, crypto.FILETYPE_PEM),
- crypto.FILETYPE_PEM)
- @classmethod
- def fromCertificateAndKeyPair(Class, certificateInstance, privateKey):
- privcert = Class(certificateInstance.original)
- return privcert._setPrivateKey(privateKey)
- def options(self, *authorities):
- """
- Create a context factory using this L{PrivateCertificate}'s certificate
- and private key.
- @param authorities: A list of L{Certificate} object
- @return: A context factory.
- @rtype: L{CertificateOptions <twisted.internet.ssl.CertificateOptions>}
- """
- options = dict(privateKey=self.privateKey.original,
- certificate=self.original)
- if authorities:
- options.update(dict(trustRoot=OpenSSLCertificateAuthorities(
- [auth.original for auth in authorities]
- )))
- return OpenSSLCertificateOptions(**options)
- def certificateRequest(self, format=crypto.FILETYPE_ASN1,
- digestAlgorithm='sha256'):
- return self.privateKey.certificateRequest(
- self.getSubject(),
- format,
- digestAlgorithm)
- def signCertificateRequest(self,
- requestData,
- verifyDNCallback,
- serialNumber,
- requestFormat=crypto.FILETYPE_ASN1,
- certificateFormat=crypto.FILETYPE_ASN1):
- issuer = self.getSubject()
- return self.privateKey.signCertificateRequest(
- issuer,
- requestData,
- verifyDNCallback,
- serialNumber,
- requestFormat,
- certificateFormat)
- def signRequestObject(self, certificateRequest, serialNumber,
- secondsToExpiry=60 * 60 * 24 * 365, # One year
- digestAlgorithm='sha256'):
- return self.privateKey.signRequestObject(self.getSubject(),
- certificateRequest,
- serialNumber,
- secondsToExpiry,
- digestAlgorithm)
- @_oldStyle
- class PublicKey:
- """
- A L{PublicKey} is a representation of the public part of a key pair.
- You can't do a whole lot with it aside from comparing it to other
- L{PublicKey} objects.
- @note: If constructing a L{PublicKey} manually, be sure to pass only a
- L{OpenSSL.crypto.PKey} that does not contain a private key!
- @ivar original: The original private key.
- """
- def __init__(self, osslpkey):
- """
- @param osslpkey: The underlying pyOpenSSL key object.
- @type osslpkey: L{OpenSSL.crypto.PKey}
- """
- self.original = osslpkey
- def matches(self, otherKey):
- """
- Does this L{PublicKey} contain the same value as another L{PublicKey}?
- @param otherKey: The key to compare C{self} to.
- @type otherKey: L{PublicKey}
- @return: L{True} if these keys match, L{False} if not.
- @rtype: L{bool}
- """
- return self.keyHash() == otherKey.keyHash()
- def __repr__(self):
- return '<%s %s>' % (self.__class__.__name__, self.keyHash())
- def keyHash(self):
- """
- Compute a hash of the underlying PKey object.
- The purpose of this method is to allow you to determine if two
- certificates share the same public key; it is not really useful for
- anything else.
- In versions of Twisted prior to 15.0, C{keyHash} used a technique
- involving certificate requests for computing the hash that was not
- stable in the face of changes to the underlying OpenSSL library.
- @return: Return a 32-character hexadecimal string uniquely identifying
- this public key, I{for this version of Twisted}.
- @rtype: native L{str}
- """
- raw = crypto.dump_publickey(crypto.FILETYPE_ASN1, self.original)
- h = md5()
- h.update(raw)
- return h.hexdigest()
- def inspect(self):
- return 'Public Key with Hash: %s' % (self.keyHash(),)
- class KeyPair(PublicKey):
- @classmethod
- def load(Class, data, format=crypto.FILETYPE_ASN1):
- return Class(crypto.load_privatekey(format, data))
- def dump(self, format=crypto.FILETYPE_ASN1):
- return crypto.dump_privatekey(format, self.original)
- def __getstate__(self):
- return self.dump()
- def __setstate__(self, state):
- self.__init__(crypto.load_privatekey(crypto.FILETYPE_ASN1, state))
- def inspect(self):
- t = self.original.type()
- if t == crypto.TYPE_RSA:
- ts = 'RSA'
- elif t == crypto.TYPE_DSA:
- ts = 'DSA'
- else:
- ts = '(Unknown Type!)'
- L = (self.original.bits(), ts, self.keyHash())
- return '%s-bit %s Key Pair with Hash: %s' % L
- @classmethod
- def generate(Class, kind=crypto.TYPE_RSA, size=2048):
- pkey = crypto.PKey()
- pkey.generate_key(kind, size)
- return Class(pkey)
- def newCertificate(self, newCertData, format=crypto.FILETYPE_ASN1):
- return PrivateCertificate.load(newCertData, self, format)
- def requestObject(self, distinguishedName, digestAlgorithm='sha256'):
- req = crypto.X509Req()
- req.set_pubkey(self.original)
- distinguishedName._copyInto(req.get_subject())
- req.sign(self.original, digestAlgorithm)
- return CertificateRequest(req)
- def certificateRequest(self, distinguishedName,
- format=crypto.FILETYPE_ASN1,
- digestAlgorithm='sha256'):
- """
- Create a certificate request signed with this key.
- @return: a string, formatted according to the 'format' argument.
- """
- return self.requestObject(distinguishedName, digestAlgorithm).dump(format)
- def signCertificateRequest(self,
- issuerDistinguishedName,
- requestData,
- verifyDNCallback,
- serialNumber,
- requestFormat=crypto.FILETYPE_ASN1,
- certificateFormat=crypto.FILETYPE_ASN1,
- secondsToExpiry=60 * 60 * 24 * 365, # One year
- digestAlgorithm='sha256'):
- """
- Given a blob of certificate request data and a certificate authority's
- DistinguishedName, return a blob of signed certificate data.
- If verifyDNCallback returns a Deferred, I will return a Deferred which
- fires the data when that Deferred has completed.
- """
- hlreq = CertificateRequest.load(requestData, requestFormat)
- dn = hlreq.getSubject()
- vval = verifyDNCallback(dn)
- def verified(value):
- if not value:
- raise VerifyError("DN callback %r rejected request DN %r" % (verifyDNCallback, dn))
- return self.signRequestObject(issuerDistinguishedName, hlreq,
- serialNumber, secondsToExpiry, digestAlgorithm).dump(certificateFormat)
- if isinstance(vval, Deferred):
- return vval.addCallback(verified)
- else:
- return verified(vval)
- def signRequestObject(self,
- issuerDistinguishedName,
- requestObject,
- serialNumber,
- secondsToExpiry=60 * 60 * 24 * 365, # One year
- digestAlgorithm='sha256'):
- """
- Sign a CertificateRequest instance, returning a Certificate instance.
- """
- req = requestObject.original
- cert = crypto.X509()
- issuerDistinguishedName._copyInto(cert.get_issuer())
- cert.set_subject(req.get_subject())
- cert.set_pubkey(req.get_pubkey())
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(secondsToExpiry)
- cert.set_serial_number(serialNumber)
- cert.sign(self.original, digestAlgorithm)
- return Certificate(cert)
- def selfSignedCert(self, serialNumber, **kw):
- dn = DN(**kw)
- return PrivateCertificate.fromCertificateAndKeyPair(
- self.signRequestObject(dn, self.requestObject(dn), serialNumber),
- self)
- KeyPair.__getstate__ = deprecated(Version("Twisted", 15, 0, 0),
- "a real persistence system")(KeyPair.__getstate__)
- KeyPair.__setstate__ = deprecated(Version("Twisted", 15, 0, 0),
- "a real persistence system")(KeyPair.__setstate__)
- class IOpenSSLTrustRoot(Interface):
- """
- Trust settings for an OpenSSL context.
- Note that this interface's methods are private, so things outside of
- Twisted shouldn't implement it.
- """
- def _addCACertsToContext(context):
- """
- Add certificate-authority certificates to an SSL context whose
- connections should trust those authorities.
- @param context: An SSL context for a connection which should be
- verified by some certificate authority.
- @type context: L{OpenSSL.SSL.Context}
- @return: L{None}
- """
- @implementer(IOpenSSLTrustRoot)
- class OpenSSLCertificateAuthorities(object):
- """
- Trust an explicitly specified set of certificates, represented by a list of
- L{OpenSSL.crypto.X509} objects.
- """
- def __init__(self, caCerts):
- """
- @param caCerts: The certificate authorities to trust when using this
- object as a C{trustRoot} for L{OpenSSLCertificateOptions}.
- @type caCerts: L{list} of L{OpenSSL.crypto.X509}
- """
- self._caCerts = caCerts
- def _addCACertsToContext(self, context):
- store = context.get_cert_store()
- for cert in self._caCerts:
- store.add_cert(cert)
- def trustRootFromCertificates(certificates):
- """
- Builds an object that trusts multiple root L{Certificate}s.
- When passed to L{optionsForClientTLS}, connections using those options will
- reject any server certificate not signed by at least one of the
- certificates in the `certificates` list.
- @since: 16.0
- @param certificates: All certificates which will be trusted.
- @type certificates: C{iterable} of L{CertBase}
- @rtype: L{IOpenSSLTrustRoot}
- @return: an object suitable for use as the trustRoot= keyword argument to
- L{optionsForClientTLS}
- """
- certs = []
- for cert in certificates:
- # PrivateCertificate or Certificate are both okay
- if isinstance(cert, CertBase):
- cert = cert.original
- else:
- raise TypeError(
- "certificates items must be twisted.internet.ssl.CertBase"
- " instances"
- )
- certs.append(cert)
- return OpenSSLCertificateAuthorities(certs)
- @implementer(IOpenSSLTrustRoot)
- class OpenSSLDefaultPaths(object):
- """
- Trust the set of default verify paths that OpenSSL was built with, as
- specified by U{SSL_CTX_set_default_verify_paths
- <https://www.openssl.org/docs/ssl/SSL_CTX_load_verify_locations.html>}.
- """
- def _addCACertsToContext(self, context):
- context.set_default_verify_paths()
- def platformTrust():
- """
- Attempt to discover a set of trusted certificate authority certificates
- (or, in other words: trust roots, or root certificates) whose trust is
- managed and updated by tools outside of Twisted.
- If you are writing any client-side TLS code with Twisted, you should use
- this as the C{trustRoot} argument to L{CertificateOptions
- <twisted.internet.ssl.CertificateOptions>}.
- The result of this function should be like the up-to-date list of
- certificates in a web browser. When developing code that uses
- C{platformTrust}, you can think of it that way. However, the choice of
- which certificate authorities to trust is never Twisted's responsibility.
- Unless you're writing a very unusual application or library, it's not your
- code's responsibility either. The user may use platform-specific tools for
- defining which server certificates should be trusted by programs using TLS.
- The purpose of using this API is to respect that decision as much as
- possible.
- This should be a set of trust settings most appropriate for I{client} TLS
- connections; i.e. those which need to verify a server's authenticity. You
- should probably use this by default for any client TLS connection that you
- create. For servers, however, client certificates are typically not
- verified; or, if they are, their verification will depend on a custom,
- application-specific certificate authority.
- @since: 14.0
- @note: Currently, L{platformTrust} depends entirely upon your OpenSSL build
- supporting a set of "L{default verify paths <OpenSSLDefaultPaths>}"
- which correspond to certificate authority trust roots. Unfortunately,
- whether this is true of your system is both outside of Twisted's
- control and difficult (if not impossible) for Twisted to detect
- automatically.
- Nevertheless, this ought to work as desired by default on:
- - Ubuntu Linux machines with the U{ca-certificates
- <https://launchpad.net/ubuntu/+source/ca-certificates>} package
- installed,
- - macOS when using the system-installed version of OpenSSL (i.e.
- I{not} one installed via MacPorts or Homebrew),
- - any build of OpenSSL which has had certificate authority
- certificates installed into its default verify paths (by default,
- C{/usr/local/ssl/certs} if you've built your own OpenSSL), or
- - any process where the C{SSL_CERT_FILE} environment variable is
- set to the path of a file containing your desired CA certificates
- bundle.
- Hopefully soon, this API will be updated to use more sophisticated
- trust-root discovery mechanisms. Until then, you can follow tickets in
- the Twisted tracker for progress on this implementation on U{Microsoft
- Windows <https://twistedmatrix.com/trac/ticket/6371>}, U{macOS
- <https://twistedmatrix.com/trac/ticket/6372>}, and U{a fallback for
- other platforms which do not have native trust management tools
- <https://twistedmatrix.com/trac/ticket/6934>}.
- @return: an appropriate trust settings object for your platform.
- @rtype: L{IOpenSSLTrustRoot}
- @raise NotImplementedError: if this platform is not yet supported by
- Twisted. At present, only OpenSSL is supported.
- """
- return OpenSSLDefaultPaths()
- def _tolerateErrors(wrapped):
- """
- Wrap up an C{info_callback} for pyOpenSSL so that if something goes wrong
- the error is immediately logged and the connection is dropped if possible.
- This wrapper exists because some versions of pyOpenSSL don't handle errors
- from callbacks at I{all}, and those which do write tracebacks directly to
- stderr rather than to a supplied logging system. This reports unexpected
- errors to the Twisted logging system.
- Also, this terminates the connection immediately if possible because if
- you've got bugs in your verification logic it's much safer to just give up.
- @param wrapped: A valid C{info_callback} for pyOpenSSL.
- @type wrapped: L{callable}
- @return: A valid C{info_callback} for pyOpenSSL that handles any errors in
- C{wrapped}.
- @rtype: L{callable}
- """
- def infoCallback(connection, where, ret):
- try:
- return wrapped(connection, where, ret)
- except:
- f = Failure()
- log.err(f, "Error during info_callback")
- connection.get_app_data().failVerification(f)
- return infoCallback
- @implementer(IOpenSSLClientConnectionCreator)
- class ClientTLSOptions(object):
- """
- Client creator for TLS.
- Private implementation type (not exposed to applications) for public
- L{optionsForClientTLS} API.
- @ivar _ctx: The context to use for new connections.
- @type _ctx: L{OpenSSL.SSL.Context}
- @ivar _hostname: The hostname to verify, as specified by the application,
- as some human-readable text.
- @type _hostname: L{unicode}
- @ivar _hostnameBytes: The hostname to verify, decoded into IDNA-encoded
- bytes. This is passed to APIs which think that hostnames are bytes,
- such as OpenSSL's SNI implementation.
- @type _hostnameBytes: L{bytes}
- @ivar _hostnameASCII: The hostname, as transcoded into IDNA ASCII-range
- unicode code points. This is pre-transcoded because the
- C{service_identity} package is rather strict about requiring the
- C{idna} package from PyPI for internationalized domain names, rather
- than working with Python's built-in (but sometimes broken) IDNA
- encoding. ASCII values, however, will always work.
- @type _hostnameASCII: L{unicode}
- @ivar _hostnameIsDnsName: Whether or not the C{_hostname} is a DNSName.
- Will be L{False} if C{_hostname} is an IP address or L{True} if
- C{_hostname} is a DNSName
- @type _hostnameIsDnsName: L{bool}
- """
- def __init__(self, hostname, ctx):
- """
- Initialize L{ClientTLSOptions}.
- @param hostname: The hostname to verify as input by a human.
- @type hostname: L{unicode}
- @param ctx: an L{OpenSSL.SSL.Context} to use for new connections.
- @type ctx: L{OpenSSL.SSL.Context}.
- """
- self._ctx = ctx
- self._hostname = hostname
- if isIPAddress(hostname) or isIPv6Address(hostname):
- self._hostnameBytes = hostname.encode('ascii')
- self._hostnameIsDnsName = False
- else:
- self._hostnameBytes = _idnaBytes(hostname)
- self._hostnameIsDnsName = True
- self._hostnameASCII = self._hostnameBytes.decode("ascii")
- ctx.set_info_callback(
- _tolerateErrors(self._identityVerifyingInfoCallback)
- )
- def clientConnectionForTLS(self, tlsProtocol):
- """
- Create a TLS connection for a client.
- @note: This will call C{set_app_data} on its connection. If you're
- delegating to this implementation of this method, don't ever call
- C{set_app_data} or C{set_info_callback} on the returned connection,
- or you'll break the implementation of various features of this
- class.
- @param tlsProtocol: the TLS protocol initiating the connection.
- @type tlsProtocol: L{twisted.protocols.tls.TLSMemoryBIOProtocol}
- @return: the configured client connection.
- @rtype: L{OpenSSL.SSL.Connection}
- """
- context = self._ctx
- connection = SSL.Connection(context, None)
- connection.set_app_data(tlsProtocol)
- return connection
- def _identityVerifyingInfoCallback(self, connection, where, ret):
- """
- U{info_callback
- <http://pythonhosted.org/pyOpenSSL/api/ssl.html#OpenSSL.SSL.Context.set_info_callback>
- } for pyOpenSSL that verifies the hostname in the presented certificate
- matches the one passed to this L{ClientTLSOptions}.
- @param connection: the connection which is handshaking.
- @type connection: L{OpenSSL.SSL.Connection}
- @param where: flags indicating progress through a TLS handshake.
- @type where: L{int}
- @param ret: ignored
- @type ret: ignored
- """
- # Literal IPv4 and IPv6 addresses are not permitted
- # as host names according to the RFCs
- if where & SSL.SSL_CB_HANDSHAKE_START and self._hostnameIsDnsName:
- connection.set_tlsext_host_name(self._hostnameBytes)
- elif where & SSL.SSL_CB_HANDSHAKE_DONE:
- try:
- if self._hostnameIsDnsName:
- verifyHostname(connection, self._hostnameASCII)
- else:
- verifyIPAddress(connection, self._hostnameASCII)
- except VerificationError:
- f = Failure()
- transport = connection.get_app_data()
- transport.failVerification(f)
- def optionsForClientTLS(hostname, trustRoot=None, clientCertificate=None,
- acceptableProtocols=None, **kw):
- """
- Create a L{client connection creator <IOpenSSLClientConnectionCreator>} for
- use with APIs such as L{SSL4ClientEndpoint
- <twisted.internet.endpoints.SSL4ClientEndpoint>}, L{connectSSL
- <twisted.internet.interfaces.IReactorSSL.connectSSL>}, and L{startTLS
- <twisted.internet.interfaces.ITLSTransport.startTLS>}.
- @since: 14.0
- @param hostname: The expected name of the remote host. This serves two
- purposes: first, and most importantly, it verifies that the certificate
- received from the server correctly identifies the specified hostname.
- The second purpose is to use the U{Server Name Indication extension
- <https://en.wikipedia.org/wiki/Server_Name_Indication>} to indicate to
- the server which certificate should be used.
- @type hostname: L{unicode}
- @param trustRoot: Specification of trust requirements of peers. This may be
- a L{Certificate} or the result of L{platformTrust}. By default it is
- L{platformTrust} and you probably shouldn't adjust it unless you really
- know what you're doing. Be aware that clients using this interface
- I{must} verify the server; you cannot explicitly pass L{None} since
- that just means to use L{platformTrust}.
- @type trustRoot: L{IOpenSSLTrustRoot}
- @param clientCertificate: The certificate and private key that the client
- will use to authenticate to the server. If unspecified, the client will
- not authenticate.
- @type clientCertificate: L{PrivateCertificate}
- @param acceptableProtocols: The protocols this peer is willing to speak
- after the TLS negotiation has completed, advertised over both ALPN and
- NPN. If this argument is specified, and no overlap can be found with
- the other peer, the connection will fail to be established. If the
- remote peer does not offer NPN or ALPN, the connection will be
- established, but no protocol wil be negotiated. Protocols earlier in
- the list are preferred over those later in the list.
- @type acceptableProtocols: L{list} of L{bytes}
- @param extraCertificateOptions: keyword-only argument; this is a dictionary
- of additional keyword arguments to be presented to
- L{CertificateOptions}. Please avoid using this unless you absolutely
- need to; any time you need to pass an option here that is a bug in this
- interface.
- @type extraCertificateOptions: L{dict}
- @param kw: (Backwards compatibility hack to allow keyword-only arguments on
- Python 2. Please ignore; arbitrary keyword arguments will be errors.)
- @type kw: L{dict}
- @return: A client connection creator.
- @rtype: L{IOpenSSLClientConnectionCreator}
- """
- extraCertificateOptions = kw.pop('extraCertificateOptions', None) or {}
- if trustRoot is None:
- trustRoot = platformTrust()
- if kw:
- raise TypeError(
- "optionsForClientTLS() got an unexpected keyword argument"
- " '{arg}'".format(
- arg=kw.popitem()[0]
- )
- )
- if not isinstance(hostname, unicode):
- raise TypeError(
- "optionsForClientTLS requires text for host names, not "
- + hostname.__class__.__name__
- )
- if clientCertificate:
- extraCertificateOptions.update(
- privateKey=clientCertificate.privateKey.original,
- certificate=clientCertificate.original
- )
- certificateOptions = OpenSSLCertificateOptions(
- trustRoot=trustRoot,
- acceptableProtocols=acceptableProtocols,
- **extraCertificateOptions
- )
- return ClientTLSOptions(hostname, certificateOptions.getContext())
- @implementer(IOpenSSLContextFactory)
- class OpenSSLCertificateOptions(object):
- """
- A L{CertificateOptions <twisted.internet.ssl.CertificateOptions>} specifies
- the security properties for a client or server TLS connection used with
- OpenSSL.
- @ivar _options: Any option flags to set on the L{OpenSSL.SSL.Context}
- object that will be created.
- @type _options: L{int}
- @ivar _cipherString: An OpenSSL-specific cipher string.
- @type _cipherString: L{unicode}
- @ivar _defaultMinimumTLSVersion: The default TLS version that will be
- negotiated. This should be a "safe default", with wide client and
- server support, vs an optimally secure one that excludes a large number
- of users. As of late 2016, TLSv1.0 is that safe default.
- @type _defaultMinimumTLSVersion: L{TLSVersion} constant
- """
- # Factory for creating contexts. Configurable for testability.
- _contextFactory = SSL.Context
- _context = None
- _OP_NO_TLSv1_3 = _tlsDisableFlags[TLSVersion.TLSv1_3]
- _defaultMinimumTLSVersion = TLSVersion.TLSv1_0
- @_mutuallyExclusiveArguments([
- ['trustRoot', 'requireCertificate'],
- ['trustRoot', 'verify'],
- ['trustRoot', 'caCerts'],
- ['method', 'insecurelyLowerMinimumTo'],
- ['method', 'raiseMinimumTo'],
- ['raiseMinimumTo', 'insecurelyLowerMinimumTo'],
- ['method', 'lowerMaximumSecurityTo'],
- ])
- def __init__(self,
- privateKey=None,
- certificate=None,
- method=None,
- verify=False,
- caCerts=None,
- verifyDepth=9,
- requireCertificate=True,
- verifyOnce=True,
- enableSingleUseKeys=True,
- enableSessions=True,
- fixBrokenPeers=False,
- enableSessionTickets=False,
- extraCertChain=None,
- acceptableCiphers=None,
- dhParameters=None,
- trustRoot=None,
- acceptableProtocols=None,
- raiseMinimumTo=None,
- insecurelyLowerMinimumTo=None,
- lowerMaximumSecurityTo=None,
- ):
- """
- Create an OpenSSL context SSL connection context factory.
- @param privateKey: A PKey object holding the private key.
- @param certificate: An X509 object holding the certificate.
- @param method: Deprecated, use a combination of
- C{insecurelyLowerMinimumTo}, C{raiseMinimumTo}, or
- C{lowerMaximumSecurityTo} instead. The SSL protocol to use, one of
- C{SSLv23_METHOD}, C{SSLv2_METHOD}, C{SSLv3_METHOD}, C{TLSv1_METHOD}
- (or any other method constants provided by pyOpenSSL). By default,
- a setting will be used which allows TLSv1.0, TLSv1.1, and TLSv1.2.
- Can not be used with C{insecurelyLowerMinimumTo},
- C{raiseMinimumTo}, or C{lowerMaximumSecurityTo}
- @param verify: Please use a C{trustRoot} keyword argument instead,
- since it provides the same functionality in a less error-prone way.
- By default this is L{False}.
- If L{True}, verify certificates received from the peer and fail the
- handshake if verification fails. Otherwise, allow anonymous
- sessions and sessions with certificates which fail validation.
- @param caCerts: Please use a C{trustRoot} keyword argument instead,
- since it provides the same functionality in a less error-prone way.
- List of certificate authority certificate objects to use to verify
- the peer's certificate. Only used if verify is L{True} and will be
- ignored otherwise. Since verify is L{False} by default, this is
- L{None} by default.
- @type caCerts: L{list} of L{OpenSSL.crypto.X509}
- @param verifyDepth: Depth in certificate chain down to which to verify.
- If unspecified, use the underlying default (9).
- @param requireCertificate: Please use a C{trustRoot} keyword argument
- instead, since it provides the same functionality in a less
- error-prone way.
- If L{True}, do not allow anonymous sessions; defaults to L{True}.
- @param verifyOnce: If True, do not re-verify the certificate on session
- resumption.
- @param enableSingleUseKeys: If L{True}, generate a new key whenever
- ephemeral DH and ECDH parameters are used to prevent small subgroup
- attacks and to ensure perfect forward secrecy.
- @param enableSessions: If True, set a session ID on each context. This
- allows a shortened handshake to be used when a known client
- reconnects.
- @param fixBrokenPeers: If True, enable various non-spec protocol fixes
- for broken SSL implementations. This should be entirely safe,
- according to the OpenSSL documentation, but YMMV. This option is
- now off by default, because it causes problems with connections
- between peers using OpenSSL 0.9.8a.
- @param enableSessionTickets: If L{True}, enable session ticket
- extension for session resumption per RFC 5077. Note there is no
- support for controlling session tickets. This option is off by
- default, as some server implementations don't correctly process
- incoming empty session ticket extensions in the hello.
- @param extraCertChain: List of certificates that I{complete} your
- verification chain if the certificate authority that signed your
- C{certificate} isn't widely supported. Do I{not} add
- C{certificate} to it.
- @type extraCertChain: C{list} of L{OpenSSL.crypto.X509}
- @param acceptableCiphers: Ciphers that are acceptable for connections.
- Uses a secure default if left L{None}.
- @type acceptableCiphers: L{IAcceptableCiphers}
- @param dhParameters: Key generation parameters that are required for
- Diffie-Hellman key exchange. If this argument is left L{None},
- C{EDH} ciphers are I{disabled} regardless of C{acceptableCiphers}.
- @type dhParameters: L{DiffieHellmanParameters
- <twisted.internet.ssl.DiffieHellmanParameters>}
- @param trustRoot: Specification of trust requirements of peers. If
- this argument is specified, the peer is verified. It requires a
- certificate, and that certificate must be signed by one of the
- certificate authorities specified by this object.
- Note that since this option specifies the same information as
- C{caCerts}, C{verify}, and C{requireCertificate}, specifying any of
- those options in combination with this one will raise a
- L{TypeError}.
- @type trustRoot: L{IOpenSSLTrustRoot}
- @param acceptableProtocols: The protocols this peer is willing to speak
- after the TLS negotiation has completed, advertised over both ALPN
- and NPN. If this argument is specified, and no overlap can be
- found with the other peer, the connection will fail to be
- established. If the remote peer does not offer NPN or ALPN, the
- connection will be established, but no protocol wil be negotiated.
- Protocols earlier in the list are preferred over those later in the
- list.
- @type acceptableProtocols: L{list} of L{bytes}
- @param raiseMinimumTo: The minimum TLS version that you want to use, or
- Twisted's default if it is higher. Use this if you want to make
- your client/server more secure than Twisted's default, but will
- accept Twisted's default instead if it moves higher than this
- value. You probably want to use this over
- C{insecurelyLowerMinimumTo}.
- @type raiseMinimumTo: L{TLSVersion} constant
- @param insecurelyLowerMinimumTo: The minimum TLS version to use,
- possibly lower than Twisted's default. If not specified, it is a
- generally considered safe default (TLSv1.0). If you want to raise
- your minimum TLS version to above that of this default, use
- C{raiseMinimumTo}. DO NOT use this argument unless you are
- absolutely sure this is what you want.
- @type insecurelyLowerMinimumTo: L{TLSVersion} constant
- @param lowerMaximumSecurityTo: The maximum TLS version to use. If not
- specified, it is the most recent your OpenSSL supports. You only
- want to set this if the peer that you are communicating with has
- problems with more recent TLS versions, it lowers your security
- when communicating with newer peers. DO NOT use this argument
- unless you are absolutely sure this is what you want.
- @type lowerMaximumSecurityTo: L{TLSVersion} constant
- @raise ValueError: when C{privateKey} or C{certificate} are set without
- setting the respective other.
- @raise ValueError: when C{verify} is L{True} but C{caCerts} doesn't
- specify any CA certificates.
- @raise ValueError: when C{extraCertChain} is passed without specifying
- C{privateKey} or C{certificate}.
- @raise ValueError: when C{acceptableCiphers} doesn't yield any usable
- ciphers for the current platform.
- @raise TypeError: if C{trustRoot} is passed in combination with
- C{caCert}, C{verify}, or C{requireCertificate}. Please prefer
- C{trustRoot} in new code, as its semantics are less tricky.
- @raise TypeError: if C{method} is passed in combination with
- C{tlsProtocols}. Please prefer the more explicit C{tlsProtocols}
- in new code.
- @raises NotImplementedError: If acceptableProtocols were provided but
- no negotiation mechanism is available.
- """
- if (privateKey is None) != (certificate is None):
- raise ValueError(
- "Specify neither or both of privateKey and certificate")
- self.privateKey = privateKey
- self.certificate = certificate
- # Set basic security options: disallow insecure SSLv2, disallow TLS
- # compression to avoid CRIME attack, make the server choose the
- # ciphers.
- self._options = (
- SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
- SSL.OP_CIPHER_SERVER_PREFERENCE
- )
- # Set the mode to Release Buffers, which demallocs send/recv buffers on
- # idle TLS connections to save memory
- self._mode = SSL.MODE_RELEASE_BUFFERS
- if method is None:
- self.method = SSL.SSLv23_METHOD
- if raiseMinimumTo:
- if (lowerMaximumSecurityTo and
- raiseMinimumTo > lowerMaximumSecurityTo):
- raise ValueError(
- ("raiseMinimumTo needs to be lower than "
- "lowerMaximumSecurityTo"))
- if raiseMinimumTo > self._defaultMinimumTLSVersion:
- insecurelyLowerMinimumTo = raiseMinimumTo
- if insecurelyLowerMinimumTo is None:
- insecurelyLowerMinimumTo = self._defaultMinimumTLSVersion
- # If you set the max lower than the default, but don't set the
- # minimum, pull it down to that
- if (lowerMaximumSecurityTo and
- insecurelyLowerMinimumTo > lowerMaximumSecurityTo):
- insecurelyLowerMinimumTo = lowerMaximumSecurityTo
- if (lowerMaximumSecurityTo and
- insecurelyLowerMinimumTo > lowerMaximumSecurityTo):
- raise ValueError(
- ("insecurelyLowerMinimumTo needs to be lower than "
- "lowerMaximumSecurityTo"))
- excludedVersions = _getExcludedTLSProtocols(
- insecurelyLowerMinimumTo, lowerMaximumSecurityTo)
- for version in excludedVersions:
- self._options |= _tlsDisableFlags[version]
- else:
- warnings.warn(
- ("Passing method to twisted.internet.ssl.CertificateOptions "
- "was deprecated in Twisted 17.1.0. Please use a combination "
- "of insecurelyLowerMinimumTo, raiseMinimumTo, and "
- "lowerMaximumSecurityTo instead, as Twisted will correctly "
- "configure the method."),
- DeprecationWarning, stacklevel=3)
- # Otherwise respect the application decision.
- self.method = method
- if verify and not caCerts:
- raise ValueError("Specify client CA certificate information if and"
- " only if enabling certificate verification")
- self.verify = verify
- if extraCertChain is not None and None in (privateKey, certificate):
- raise ValueError("A private key and a certificate are required "
- "when adding a supplemental certificate chain.")
- if extraCertChain is not None:
- self.extraCertChain = extraCertChain
- else:
- self.extraCertChain = []
- self.caCerts = caCerts
- self.verifyDepth = verifyDepth
- self.requireCertificate = requireCertificate
- self.verifyOnce = verifyOnce
- self.enableSingleUseKeys = enableSingleUseKeys
- if enableSingleUseKeys:
- self._options |= SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE
- self.enableSessions = enableSessions
- self.fixBrokenPeers = fixBrokenPeers
- if fixBrokenPeers:
- self._options |= SSL.OP_ALL
- self.enableSessionTickets = enableSessionTickets
- if not enableSessionTickets:
- self._options |= SSL.OP_NO_TICKET
- self.dhParameters = dhParameters
- self._ecChooser = _ChooseDiffieHellmanEllipticCurve(
- SSL.OPENSSL_VERSION_NUMBER,
- openSSLlib=pyOpenSSLlib,
- openSSLcrypto=crypto,
- )
- if acceptableCiphers is None:
- acceptableCiphers = defaultCiphers
- # This needs to run when method and _options are finalized.
- self._cipherString = u':'.join(
- c.fullName
- for c in acceptableCiphers.selectCiphers(
- _expandCipherString(u'ALL', self.method, self._options)
- )
- )
- if self._cipherString == u'':
- raise ValueError(
- 'Supplied IAcceptableCiphers yielded no usable ciphers '
- 'on this platform.'
- )
- if trustRoot is None:
- if self.verify:
- trustRoot = OpenSSLCertificateAuthorities(caCerts)
- else:
- self.verify = True
- self.requireCertificate = True
- trustRoot = IOpenSSLTrustRoot(trustRoot)
- self.trustRoot = trustRoot
- if acceptableProtocols is not None and not protocolNegotiationMechanisms():
- raise NotImplementedError(
- "No support for protocol negotiation on this platform."
- )
- self._acceptableProtocols = acceptableProtocols
- def __getstate__(self):
- d = self.__dict__.copy()
- try:
- del d['_context']
- except KeyError:
- pass
- return d
- def __setstate__(self, state):
- self.__dict__ = state
- def getContext(self):
- """
- Return an L{OpenSSL.SSL.Context} object.
- """
- if self._context is None:
- self._context = self._makeContext()
- return self._context
- def _makeContext(self):
- ctx = self._contextFactory(self.method)
- ctx.set_options(self._options)
- ctx.set_mode(self._mode)
- if self.certificate is not None and self.privateKey is not None:
- ctx.use_certificate(self.certificate)
- ctx.use_privatekey(self.privateKey)
- for extraCert in self.extraCertChain:
- ctx.add_extra_chain_cert(extraCert)
- # Sanity check
- ctx.check_privatekey()
- verifyFlags = SSL.VERIFY_NONE
- if self.verify:
- verifyFlags = SSL.VERIFY_PEER
- if self.requireCertificate:
- verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
- if self.verifyOnce:
- verifyFlags |= SSL.VERIFY_CLIENT_ONCE
- self.trustRoot._addCACertsToContext(ctx)
- # It'd be nice if pyOpenSSL let us pass None here for this behavior (as
- # the underlying OpenSSL API call allows NULL to be passed). It
- # doesn't, so we'll supply a function which does the same thing.
- def _verifyCallback(conn, cert, errno, depth, preverify_ok):
- return preverify_ok
- ctx.set_verify(verifyFlags, _verifyCallback)
- if self.verifyDepth is not None:
- ctx.set_verify_depth(self.verifyDepth)
- if self.enableSessions:
- # 32 bytes is the maximum length supported
- # Unfortunately pyOpenSSL doesn't provide SSL_MAX_SESSION_ID_LENGTH
- sessionName = secureRandom(32)
- ctx.set_session_id(sessionName)
- if self.dhParameters:
- ctx.load_tmp_dh(self.dhParameters._dhFile.path)
- ctx.set_cipher_list(self._cipherString.encode('ascii'))
- self._ecChooser.configureECDHCurve(ctx)
- if self._acceptableProtocols:
- # Try to set NPN and ALPN. _acceptableProtocols cannot be set by
- # the constructor unless at least one mechanism is supported.
- _setAcceptableProtocols(ctx, self._acceptableProtocols)
- return ctx
- OpenSSLCertificateOptions.__getstate__ = deprecated(
- Version("Twisted", 15, 0, 0),
- "a real persistence system")(OpenSSLCertificateOptions.__getstate__)
- OpenSSLCertificateOptions.__setstate__ = deprecated(
- Version("Twisted", 15, 0, 0),
- "a real persistence system")(OpenSSLCertificateOptions.__setstate__)
- @implementer(ICipher)
- class OpenSSLCipher(FancyEqMixin, object):
- """
- A representation of an OpenSSL cipher.
- """
- compareAttributes = ('fullName',)
- def __init__(self, fullName):
- """
- @param fullName: The full name of the cipher. For example
- C{u"ECDHE-RSA-AES256-GCM-SHA384"}.
- @type fullName: L{unicode}
- """
- self.fullName = fullName
- def __repr__(self):
- """
- A runnable representation of the cipher.
- """
- return 'OpenSSLCipher({0!r})'.format(self.fullName)
- def _expandCipherString(cipherString, method, options):
- """
- Expand C{cipherString} according to C{method} and C{options} to a list
- of explicit ciphers that are supported by the current platform.
- @param cipherString: An OpenSSL cipher string to expand.
- @type cipherString: L{unicode}
- @param method: An OpenSSL method like C{SSL.TLSv1_METHOD} used for
- determining the effective ciphers.
- @param options: OpenSSL options like C{SSL.OP_NO_SSLv3} ORed together.
- @type options: L{int}
- @return: The effective list of explicit ciphers that results from the
- arguments on the current platform.
- @rtype: L{list} of L{ICipher}
- """
- ctx = SSL.Context(method)
- ctx.set_options(options)
- try:
- ctx.set_cipher_list(cipherString.encode('ascii'))
- except SSL.Error as e:
- # OpenSSL 1.1.1 turns an invalid cipher list into TLS 1.3
- # ciphers, so pyOpenSSL >= 19.0.0 raises an artificial Error
- # that lacks a corresponding OpenSSL error if the cipher list
- # consists only of these after a call to set_cipher_list.
- if not e.args[0]:
- return []
- if e.args[0][0][2] == 'no cipher match':
- return []
- else:
- raise
- conn = SSL.Connection(ctx, None)
- ciphers = conn.get_cipher_list()
- if isinstance(ciphers[0], unicode):
- return [OpenSSLCipher(cipher) for cipher in ciphers]
- else:
- return [OpenSSLCipher(cipher.decode('ascii')) for cipher in ciphers]
- @implementer(IAcceptableCiphers)
- class OpenSSLAcceptableCiphers(object):
- """
- A representation of ciphers that are acceptable for TLS connections.
- """
- def __init__(self, ciphers):
- self._ciphers = ciphers
- def selectCiphers(self, availableCiphers):
- return [cipher
- for cipher in self._ciphers
- if cipher in availableCiphers]
- @classmethod
- def fromOpenSSLCipherString(cls, cipherString):
- """
- Create a new instance using an OpenSSL cipher string.
- @param cipherString: An OpenSSL cipher string that describes what
- cipher suites are acceptable.
- See the documentation of U{OpenSSL
- <http://www.openssl.org/docs/apps/ciphers.html#CIPHER_STRINGS>} or
- U{Apache
- <http://httpd.apache.org/docs/2.4/mod/mod_ssl.html#sslciphersuite>}
- for details.
- @type cipherString: L{unicode}
- @return: Instance representing C{cipherString}.
- @rtype: L{twisted.internet.ssl.AcceptableCiphers}
- """
- return cls(_expandCipherString(
- nativeString(cipherString),
- SSL.SSLv23_METHOD, SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
- )
- # A secure default.
- # Sources for more information on TLS ciphers:
- #
- # - https://wiki.mozilla.org/Security/Server_Side_TLS
- # - https://www.ssllabs.com/projects/best-practices/index.html
- # - https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
- #
- # The general intent is:
- # - Prefer cipher suites that offer perfect forward secrecy (DHE/ECDHE),
- # - prefer ECDHE over DHE for better performance,
- # - prefer any AES-GCM and ChaCha20 over any AES-CBC for better performance and
- # security,
- # - prefer AES-GCM to ChaCha20 because AES hardware support is common,
- # - disable NULL authentication, MD5 MACs and DSS for security reasons.
- #
- defaultCiphers = OpenSSLAcceptableCiphers.fromOpenSSLCipherString(
- "TLS13-AES-256-GCM-SHA384:TLS13-CHACHA20-POLY1305-SHA256:"
- "TLS13-AES-128-GCM-SHA256:"
- "ECDH+AESGCM:ECDH+CHACHA20:DH+AESGCM:DH+CHACHA20:ECDH+AES256:DH+AES256:"
- "ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:"
- "!aNULL:!MD5:!DSS"
- )
- _defaultCurveName = u"prime256v1"
- class _ChooseDiffieHellmanEllipticCurve(object):
- """
- Chooses the best elliptic curve for Elliptic Curve Diffie-Hellman
- key exchange, and provides a C{configureECDHCurve} method to set
- the curve, when appropriate, on a new L{OpenSSL.SSL.Context}.
- The C{configureECDHCurve} method will be set to one of the
- following based on the provided OpenSSL version and configuration:
- - L{_configureOpenSSL110}
- - L{_configureOpenSSL102}
- - L{_configureOpenSSL101}
- - L{_configureOpenSSL101NoCurves}.
- @param openSSLVersion: The OpenSSL version number.
- @type openSSLVersion: L{int}
- @see: L{OpenSSL.SSL.OPENSSL_VERSION_NUMBER}
- @param openSSLlib: The OpenSSL C{cffi} library module.
- @param openSSLlib: The OpenSSL L{crypto} module.
- @see: L{crypto}
- """
- def __init__(self, openSSLVersion, openSSLlib, openSSLcrypto):
- self._openSSLlib = openSSLlib
- self._openSSLcrypto = openSSLcrypto
- if openSSLVersion >= 0x10100000:
- self.configureECDHCurve = self._configureOpenSSL110
- elif openSSLVersion >= 0x10002000:
- self.configureECDHCurve = self._configureOpenSSL102
- else:
- try:
- self._ecCurve = openSSLcrypto.get_elliptic_curve(
- _defaultCurveName)
- except ValueError:
- # The get_elliptic_curve method raises a ValueError
- # when the curve does not exist.
- self.configureECDHCurve = self._configureOpenSSL101NoCurves
- else:
- self.configureECDHCurve = self._configureOpenSSL101
- def _configureOpenSSL110(self, ctx):
- """
- OpenSSL 1.1.0 Contexts are preconfigured with an optimal set
- of ECDH curves. This method does nothing.
- @param ctx: L{OpenSSL.SSL.Context}
- """
- def _configureOpenSSL102(self, ctx):
- """
- Have the context automatically choose elliptic curves for
- ECDH. Run on OpenSSL 1.0.2 and OpenSSL 1.1.0+, but only has
- an effect on OpenSSL 1.0.2.
- @param ctx: The context which .
- @type ctx: L{OpenSSL.SSL.Context}
- """
- ctxPtr = ctx._context
- try:
- self._openSSLlib.SSL_CTX_set_ecdh_auto(ctxPtr, True)
- except:
- pass
- def _configureOpenSSL101(self, ctx):
- """
- Set the default elliptic curve for ECDH on the context. Only
- run on OpenSSL 1.0.1.
- @param ctx: The context on which to set the ECDH curve.
- @type ctx: L{OpenSSL.SSL.Context}
- """
- try:
- ctx.set_tmp_ecdh(self._ecCurve)
- except:
- pass
- def _configureOpenSSL101NoCurves(self, ctx):
- """
- No elliptic curves are available on OpenSSL 1.0.1. We can't
- set anything, so do nothing.
- @param ctx: The context on which to set the ECDH curve.
- @type ctx: L{OpenSSL.SSL.Context}
- """
- class OpenSSLDiffieHellmanParameters(object):
- """
- A representation of key generation parameters that are required for
- Diffie-Hellman key exchange.
- """
- def __init__(self, parameters):
- self._dhFile = parameters
- @classmethod
- def fromFile(cls, filePath):
- """
- Load parameters from a file.
- Such a file can be generated using the C{openssl} command line tool as
- following:
- C{openssl dhparam -out dh_param_2048.pem -2 2048}
- Please refer to U{OpenSSL's C{dhparam} documentation
- <http://www.openssl.org/docs/apps/dhparam.html>} for further details.
- @param filePath: A file containing parameters for Diffie-Hellman key
- exchange.
- @type filePath: L{FilePath <twisted.python.filepath.FilePath>}
- @return: An instance that loads its parameters from C{filePath}.
- @rtype: L{DiffieHellmanParameters
- <twisted.internet.ssl.DiffieHellmanParameters>}
- """
- return cls(filePath)
- def _setAcceptableProtocols(context, acceptableProtocols):
- """
- Called to set up the L{OpenSSL.SSL.Context} for doing NPN and/or ALPN
- negotiation.
- @param context: The context which is set up.
- @type context: L{OpenSSL.SSL.Context}
- @param acceptableProtocols: The protocols this peer is willing to speak
- after the TLS negotiation has completed, advertised over both ALPN and
- NPN. If this argument is specified, and no overlap can be found with
- the other peer, the connection will fail to be established. If the
- remote peer does not offer NPN or ALPN, the connection will be
- established, but no protocol wil be negotiated. Protocols earlier in
- the list are preferred over those later in the list.
- @type acceptableProtocols: L{list} of L{bytes}
- """
- def protoSelectCallback(conn, protocols):
- """
- NPN client-side and ALPN server-side callback used to select
- the next protocol. Prefers protocols found earlier in
- C{_acceptableProtocols}.
- @param conn: The context which is set up.
- @type conn: L{OpenSSL.SSL.Connection}
- @param conn: Protocols advertised by the other side.
- @type conn: L{list} of L{bytes}
- """
- overlap = set(protocols) & set(acceptableProtocols)
- for p in acceptableProtocols:
- if p in overlap:
- return p
- else:
- return b''
- # If we don't actually have protocols to negotiate, don't set anything up.
- # Depending on OpenSSL version, failing some of the selection callbacks can
- # cause the handshake to fail, which is presumably not what was intended
- # here.
- if not acceptableProtocols:
- return
- supported = protocolNegotiationMechanisms()
- if supported & ProtocolNegotiationSupport.NPN:
- def npnAdvertiseCallback(conn):
- return acceptableProtocols
- context.set_npn_advertise_callback(npnAdvertiseCallback)
- context.set_npn_select_callback(protoSelectCallback)
- if supported & ProtocolNegotiationSupport.ALPN:
- context.set_alpn_select_callback(protoSelectCallback)
- context.set_alpn_protos(acceptableProtocols)
|