123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818 |
- # -*- test-case-name: twisted.conch.test.test_keys -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Handling of RSA, DSA, ECDSA, and Ed25519 keys.
- """
- from __future__ import annotations
- import binascii
- import struct
- import unicodedata
- import warnings
- from base64 import b64encode, decodebytes, encodebytes
- from hashlib import md5, sha256
- from typing import Any
- import bcrypt
- from cryptography import utils
- from cryptography.exceptions import InvalidSignature
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import hashes, serialization
- from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519, padding, rsa
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from cryptography.hazmat.primitives.serialization import (
- load_pem_private_key,
- load_ssh_public_key,
- )
- from typing_extensions import Literal
- from twisted.conch.ssh import common, sexpy
- from twisted.conch.ssh.common import int_to_bytes
- from twisted.python import randbytes
- from twisted.python.compat import iterbytes, nativeString
- from twisted.python.constants import NamedConstant, Names
- from twisted.python.deprecate import _mutuallyExclusiveArguments
- try:
- from cryptography.hazmat.primitives.asymmetric.utils import (
- decode_dss_signature,
- encode_dss_signature,
- )
- except ImportError:
- from cryptography.hazmat.primitives.asymmetric.utils import ( # type: ignore[no-redef,attr-defined]
- decode_rfc6979_signature as decode_dss_signature,
- encode_rfc6979_signature as encode_dss_signature,
- )
- # Curve lookup table
- _curveTable = {
- b"ecdsa-sha2-nistp256": ec.SECP256R1(),
- b"ecdsa-sha2-nistp384": ec.SECP384R1(),
- b"ecdsa-sha2-nistp521": ec.SECP521R1(),
- }
- _secToNist = {
- b"secp256r1": b"nistp256",
- b"secp384r1": b"nistp384",
- b"secp521r1": b"nistp521",
- }
- Ed25519PublicKey = ed25519.Ed25519PublicKey
- Ed25519PrivateKey = ed25519.Ed25519PrivateKey
- class BadKeyError(Exception):
- """
- Raised when a key isn't what we expected from it.
- XXX: we really need to check for bad keys
- """
- class BadSignatureAlgorithmError(Exception):
- """
- Raised when a public key signature algorithm name isn't defined for this
- public key format.
- """
- class EncryptedKeyError(Exception):
- """
- Raised when an encrypted key is presented to fromString/fromFile without
- a password.
- """
- class BadFingerPrintFormat(Exception):
- """
- Raises when unsupported fingerprint formats are presented to fingerprint.
- """
- class FingerprintFormats(Names):
- """
- Constants representing the supported formats of key fingerprints.
- @cvar MD5_HEX: Named constant representing fingerprint format generated
- using md5[RFC1321] algorithm in hexadecimal encoding.
- @type MD5_HEX: L{twisted.python.constants.NamedConstant}
- @cvar SHA256_BASE64: Named constant representing fingerprint format
- generated using sha256[RFC4634] algorithm in base64 encoding
- @type SHA256_BASE64: L{twisted.python.constants.NamedConstant}
- """
- MD5_HEX = NamedConstant()
- SHA256_BASE64 = NamedConstant()
- class PassphraseNormalizationError(Exception):
- """
- Raised when a passphrase contains Unicode characters that cannot be
- normalized using the available Unicode character database.
- """
- def _normalizePassphrase(passphrase):
- """
- Normalize a passphrase, which may be Unicode.
- If the passphrase is Unicode, this follows the requirements of U{NIST
- 800-63B, section
- 5.1.1.2<https://pages.nist.gov/800-63-3/sp800-63b.html#memsecretver>}
- for Unicode characters in memorized secrets: it applies the
- Normalization Process for Stabilized Strings using NFKC normalization.
- The passphrase is then encoded using UTF-8.
- @type passphrase: L{bytes} or L{unicode} or L{None}
- @param passphrase: The passphrase to normalize.
- @return: The normalized passphrase, if any.
- @rtype: L{bytes} or L{None}
- @raises PassphraseNormalizationError: if the passphrase is Unicode and
- cannot be normalized using the available Unicode character database.
- """
- if isinstance(passphrase, str):
- # The Normalization Process for Stabilized Strings requires aborting
- # with an error if the string contains any unassigned code point.
- if any(unicodedata.category(c) == "Cn" for c in passphrase):
- # Perhaps not very helpful, but we don't want to leak any other
- # information about the passphrase.
- raise PassphraseNormalizationError()
- return unicodedata.normalize("NFKC", passphrase).encode("UTF-8")
- else:
- return passphrase
- class Key:
- """
- An object representing a key. A key can be either a public or
- private key. A public key can verify a signature; a private key can
- create or verify a signature. To generate a string that can be stored
- on disk, use the toString method. If you have a private key, but want
- the string representation of the public key, use Key.public().toString().
- """
- @classmethod
- def fromFile(cls, filename, type=None, passphrase=None):
- """
- Load a key from a file.
- @param filename: The path to load key data from.
- @type type: L{str} or L{None}
- @param type: A string describing the format the key data is in, or
- L{None} to attempt detection of the type.
- @type passphrase: L{bytes} or L{None}
- @param passphrase: The passphrase the key is encrypted with, or L{None}
- if there is no encryption.
- @rtype: L{Key}
- @return: The loaded key.
- """
- with open(filename, "rb") as f:
- return cls.fromString(f.read(), type, passphrase)
- @classmethod
- def fromString(cls, data, type=None, passphrase=None):
- """
- Return a Key object corresponding to the string data.
- type is optionally the type of string, matching a _fromString_*
- method. Otherwise, the _guessStringType() classmethod will be used
- to guess a type. If the key is encrypted, passphrase is used as
- the decryption key.
- @type data: L{bytes}
- @param data: The key data.
- @type type: L{str} or L{None}
- @param type: A string describing the format the key data is in, or
- L{None} to attempt detection of the type.
- @type passphrase: L{bytes} or L{None}
- @param passphrase: The passphrase the key is encrypted with, or L{None}
- if there is no encryption.
- @rtype: L{Key}
- @return: The loaded key.
- """
- if isinstance(data, str):
- data = data.encode("utf-8")
- passphrase = _normalizePassphrase(passphrase)
- if type is None:
- type = cls._guessStringType(data)
- if type is None:
- raise BadKeyError(f"cannot guess the type of {data!r}")
- method = getattr(cls, f"_fromString_{type.upper()}", None)
- if method is None:
- raise BadKeyError(f"no _fromString method for {type}")
- if method.__code__.co_argcount == 2: # No passphrase
- if passphrase:
- raise BadKeyError("key not encrypted")
- return method(data)
- else:
- return method(data, passphrase)
- @classmethod
- def _fromString_BLOB(cls, blob):
- """
- Return a public key object corresponding to this public key blob.
- The format of a RSA public key blob is::
- string 'ssh-rsa'
- integer e
- integer n
- The format of a DSA public key blob is::
- string 'ssh-dss'
- integer p
- integer q
- integer g
- integer y
- The format of ECDSA-SHA2-* public key blob is::
- string 'ecdsa-sha2-[identifier]'
- integer x
- integer y
- identifier is the standard NIST curve name.
- The format of an Ed25519 public key blob is::
- string 'ssh-ed25519'
- string a
- @type blob: L{bytes}
- @param blob: The key data.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if the key type (the first string) is unknown.
- """
- keyType, rest = common.getNS(blob)
- if keyType == b"ssh-rsa":
- e, n, rest = common.getMP(rest, 2)
- return cls(rsa.RSAPublicNumbers(e, n).public_key(default_backend()))
- elif keyType == b"ssh-dss":
- p, q, g, y, rest = common.getMP(rest, 4)
- return cls(
- dsa.DSAPublicNumbers(
- y=y, parameter_numbers=dsa.DSAParameterNumbers(p=p, q=q, g=g)
- ).public_key(default_backend())
- )
- elif keyType in _curveTable:
- return cls(
- ec.EllipticCurvePublicKey.from_encoded_point(
- _curveTable[keyType], common.getNS(rest, 2)[1]
- )
- )
- elif keyType == b"ssh-ed25519":
- a, rest = common.getNS(rest)
- return cls._fromEd25519Components(a)
- else:
- raise BadKeyError(f"unknown blob type: {keyType}")
- @classmethod
- def _fromString_PRIVATE_BLOB(cls, blob):
- """
- Return a private key object corresponding to this private key blob.
- The blob formats are as follows:
- RSA keys::
- string 'ssh-rsa'
- integer n
- integer e
- integer d
- integer u
- integer p
- integer q
- DSA keys::
- string 'ssh-dss'
- integer p
- integer q
- integer g
- integer y
- integer x
- EC keys::
- string 'ecdsa-sha2-[identifier]'
- string identifier
- string q
- integer privateValue
- identifier is the standard NIST curve name.
- Ed25519 keys::
- string 'ssh-ed25519'
- string a
- string k || a
- @type blob: L{bytes}
- @param blob: The key data.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if
- * the key type (the first string) is unknown
- * the curve name of an ECDSA key does not match the key type
- """
- keyType, rest = common.getNS(blob)
- if keyType == b"ssh-rsa":
- n, e, d, u, p, q, rest = common.getMP(rest, 6)
- return cls._fromRSAComponents(n=n, e=e, d=d, p=p, q=q)
- elif keyType == b"ssh-dss":
- p, q, g, y, x, rest = common.getMP(rest, 5)
- return cls._fromDSAComponents(y=y, g=g, p=p, q=q, x=x)
- elif keyType in _curveTable:
- curve = _curveTable[keyType]
- curveName, q, rest = common.getNS(rest, 2)
- if curveName != _secToNist[curve.name.encode("ascii")]:
- raise BadKeyError(
- "ECDSA curve name %r does not match key "
- "type %r" % (curveName, keyType)
- )
- privateValue, rest = common.getMP(rest)
- return cls._fromECEncodedPoint(
- encodedPoint=q, curve=keyType, privateValue=privateValue
- )
- elif keyType == b"ssh-ed25519":
- # OpenSSH's format repeats the public key bytes for some reason.
- # We're only interested in the private key here anyway.
- a, combined, rest = common.getNS(rest, 2)
- k = combined[:32]
- return cls._fromEd25519Components(a, k=k)
- else:
- raise BadKeyError(f"unknown blob type: {keyType}")
- @classmethod
- def _fromString_PUBLIC_OPENSSH(cls, data):
- """
- Return a public key object corresponding to this OpenSSH public key
- string. The format of an OpenSSH public key string is::
- <key type> <base64-encoded public key blob>
- @type data: L{bytes}
- @param data: The key data.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if the blob type is unknown.
- """
- # ECDSA keys don't need base64 decoding which is required
- # for RSA or DSA key.
- if data.startswith(b"ecdsa-sha2"):
- return cls(load_ssh_public_key(data, default_backend()))
- blob = decodebytes(data.split()[1])
- return cls._fromString_BLOB(blob)
- @classmethod
- def _fromPrivateOpenSSH_v1(cls, data, passphrase):
- """
- Return a private key object corresponding to this OpenSSH private key
- string, in the "openssh-key-v1" format introduced in OpenSSH 6.5.
- The format of an openssh-key-v1 private key string is::
- -----BEGIN OPENSSH PRIVATE KEY-----
- <base64-encoded SSH protocol string>
- -----END OPENSSH PRIVATE KEY-----
- The SSH protocol string is as described in
- U{PROTOCOL.key<https://cvsweb.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL.key>}.
- @type data: L{bytes}
- @param data: The key data.
- @type passphrase: L{bytes} or L{None}
- @param passphrase: The passphrase the key is encrypted with, or L{None}
- if it is not encrypted.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if
- * a passphrase is provided for an unencrypted key
- * the SSH protocol encoding is incorrect
- @raises EncryptedKeyError: if
- * a passphrase is not provided for an encrypted key
- """
- lines = data.strip().splitlines()
- keyList = decodebytes(b"".join(lines[1:-1]))
- if not keyList.startswith(b"openssh-key-v1\0"):
- raise BadKeyError("unknown OpenSSH private key format")
- keyList = keyList[len(b"openssh-key-v1\0") :]
- cipher, kdf, kdfOptions, rest = common.getNS(keyList, 3)
- n = struct.unpack("!L", rest[:4])[0]
- if n != 1:
- raise BadKeyError(
- "only OpenSSH private key files containing "
- "a single key are supported"
- )
- # Ignore public key
- _, encPrivKeyList, _ = common.getNS(rest[4:], 2)
- if cipher != b"none":
- if not passphrase:
- raise EncryptedKeyError(
- "Passphrase must be provided " "for an encrypted key"
- )
- # Determine cipher
- if cipher in (b"aes128-ctr", b"aes192-ctr", b"aes256-ctr"):
- algorithmClass = algorithms.AES
- blockSize = 16
- keySize = int(cipher[3:6]) // 8
- ivSize = blockSize
- else:
- raise BadKeyError(f"unknown encryption type {cipher!r}")
- if kdf == b"bcrypt":
- salt, rest = common.getNS(kdfOptions)
- rounds = struct.unpack("!L", rest[:4])[0]
- decKey = bcrypt.kdf(
- passphrase,
- salt,
- keySize + ivSize,
- rounds,
- # We can only use the number of rounds that OpenSSH used.
- ignore_few_rounds=True,
- )
- else:
- raise BadKeyError(f"unknown KDF type {kdf!r}")
- if (len(encPrivKeyList) % blockSize) != 0:
- raise BadKeyError("bad padding")
- decryptor = Cipher(
- algorithmClass(decKey[:keySize]),
- modes.CTR(decKey[keySize : keySize + ivSize]),
- backend=default_backend(),
- ).decryptor()
- privKeyList = decryptor.update(encPrivKeyList) + decryptor.finalize()
- else:
- if kdf != b"none":
- raise BadKeyError(
- "private key specifies KDF %r but no " "cipher" % (kdf,)
- )
- privKeyList = encPrivKeyList
- check1 = struct.unpack("!L", privKeyList[:4])[0]
- check2 = struct.unpack("!L", privKeyList[4:8])[0]
- if check1 != check2:
- raise BadKeyError("check values do not match: %d != %d" % (check1, check2))
- return cls._fromString_PRIVATE_BLOB(privKeyList[8:])
- @classmethod
- def _fromPrivateOpenSSH_PEM(cls, data, passphrase):
- """
- Return a private key object corresponding to this OpenSSH private key
- string, in the old PEM-based format.
- The format of a PEM-based OpenSSH private key string is::
- -----BEGIN <key type> PRIVATE KEY-----
- [Proc-Type: 4,ENCRYPTED
- DEK-Info: DES-EDE3-CBC,<initialization value>]
- <base64-encoded ASN.1 structure>
- ------END <key type> PRIVATE KEY------
- The ASN.1 structure of a RSA key is::
- (0, n, e, d, p, q)
- The ASN.1 structure of a DSA key is::
- (0, p, q, g, y, x)
- The ASN.1 structure of a ECDSA key is::
- (ECParameters, OID, NULL)
- @type data: L{bytes}
- @param data: The key data.
- @type passphrase: L{bytes} or L{None}
- @param passphrase: The passphrase the key is encrypted with, or L{None}
- if it is not encrypted.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if
- * a passphrase is provided for an unencrypted key
- * the ASN.1 encoding is incorrect
- @raises EncryptedKeyError: if
- * a passphrase is not provided for an encrypted key
- """
- lines = data.strip().splitlines()
- kind = lines[0][11:-17]
- # cryptography considers an empty byte string a passphrase, but
- # twisted considers that to be "no password". So we need to convert
- # to None on empty.
- if not passphrase:
- passphrase = None
- if kind in (b"EC", b"RSA", b"DSA"):
- try:
- key = load_pem_private_key(data, passphrase, default_backend())
- except TypeError:
- raise EncryptedKeyError(
- "Passphrase must be provided for an encrypted key"
- )
- except ValueError:
- raise BadKeyError("Failed to decode key (Bad Passphrase?)")
- return cls(key)
- else:
- raise BadKeyError(f"unknown key type {kind}")
- @classmethod
- def _fromString_PRIVATE_OPENSSH(cls, data, passphrase):
- """
- Return a private key object corresponding to this OpenSSH private key
- string. If the key is encrypted, passphrase MUST be provided.
- Providing a passphrase for an unencrypted key is an error.
- @type data: L{bytes}
- @param data: The key data.
- @type passphrase: L{bytes} or L{None}
- @param passphrase: The passphrase the key is encrypted with, or L{None}
- if it is not encrypted.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if
- * a passphrase is provided for an unencrypted key
- * the encoding is incorrect
- @raises EncryptedKeyError: if
- * a passphrase is not provided for an encrypted key
- """
- if data.strip().splitlines()[0][11:-17] == b"OPENSSH":
- # New-format (openssh-key-v1) key
- return cls._fromPrivateOpenSSH_v1(data, passphrase)
- else:
- # Old-format (PEM) key
- return cls._fromPrivateOpenSSH_PEM(data, passphrase)
- @classmethod
- def _fromString_PUBLIC_LSH(cls, data):
- """
- Return a public key corresponding to this LSH public key string.
- The LSH public key string format is::
- <s-expression: ('public-key', (<key type>, (<name, <value>)+))>
- The names for a RSA (key type 'rsa-pkcs1-sha1') key are: n, e.
- The names for a DSA (key type 'dsa') key are: y, g, p, q.
- @type data: L{bytes}
- @param data: The key data.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if the key type is unknown
- """
- sexp = sexpy.parse(decodebytes(data[1:-1]))
- assert sexp[0] == b"public-key"
- kd = {}
- for name, data in sexp[1][1:]:
- kd[name] = common.getMP(common.NS(data))[0]
- if sexp[1][0] == b"dsa":
- return cls._fromDSAComponents(
- y=kd[b"y"], g=kd[b"g"], p=kd[b"p"], q=kd[b"q"]
- )
- elif sexp[1][0] == b"rsa-pkcs1-sha1":
- return cls._fromRSAComponents(n=kd[b"n"], e=kd[b"e"])
- else:
- raise BadKeyError(f"unknown lsh key type {sexp[1][0]}")
- @classmethod
- def _fromString_PRIVATE_LSH(cls, data):
- """
- Return a private key corresponding to this LSH private key string.
- The LSH private key string format is::
- <s-expression: ('private-key', (<key type>, (<name>, <value>)+))>
- The names for a RSA (key type 'rsa-pkcs1-sha1') key are: n, e, d, p, q.
- The names for a DSA (key type 'dsa') key are: y, g, p, q, x.
- @type data: L{bytes}
- @param data: The key data.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if the key type is unknown
- """
- sexp = sexpy.parse(data)
- assert sexp[0] == b"private-key"
- kd = {}
- for name, data in sexp[1][1:]:
- kd[name] = common.getMP(common.NS(data))[0]
- if sexp[1][0] == b"dsa":
- assert len(kd) == 5, len(kd)
- return cls._fromDSAComponents(
- y=kd[b"y"], g=kd[b"g"], p=kd[b"p"], q=kd[b"q"], x=kd[b"x"]
- )
- elif sexp[1][0] == b"rsa-pkcs1":
- assert len(kd) == 8, len(kd)
- if kd[b"p"] > kd[b"q"]: # Make p smaller than q
- kd[b"p"], kd[b"q"] = kd[b"q"], kd[b"p"]
- return cls._fromRSAComponents(
- n=kd[b"n"], e=kd[b"e"], d=kd[b"d"], p=kd[b"p"], q=kd[b"q"]
- )
- else:
- raise BadKeyError(f"unknown lsh key type {sexp[1][0]}")
- @classmethod
- def _fromString_AGENTV3(cls, data):
- """
- Return a private key object corresponsing to the Secure Shell Key
- Agent v3 format.
- The SSH Key Agent v3 format for a RSA key is::
- string 'ssh-rsa'
- integer e
- integer d
- integer n
- integer u
- integer p
- integer q
- The SSH Key Agent v3 format for a DSA key is::
- string 'ssh-dss'
- integer p
- integer q
- integer g
- integer y
- integer x
- @type data: L{bytes}
- @param data: The key data.
- @return: A new key.
- @rtype: L{twisted.conch.ssh.keys.Key}
- @raises BadKeyError: if the key type (the first string) is unknown
- """
- keyType, data = common.getNS(data)
- if keyType == b"ssh-dss":
- p, data = common.getMP(data)
- q, data = common.getMP(data)
- g, data = common.getMP(data)
- y, data = common.getMP(data)
- x, data = common.getMP(data)
- return cls._fromDSAComponents(y=y, g=g, p=p, q=q, x=x)
- elif keyType == b"ssh-rsa":
- e, data = common.getMP(data)
- d, data = common.getMP(data)
- n, data = common.getMP(data)
- u, data = common.getMP(data)
- p, data = common.getMP(data)
- q, data = common.getMP(data)
- return cls._fromRSAComponents(n=n, e=e, d=d, p=p, q=q, u=u)
- else:
- raise BadKeyError(f"unknown key type {keyType}")
- @classmethod
- def _guessStringType(cls, data):
- """
- Guess the type of key in data. The types map to _fromString_*
- methods.
- @type data: L{bytes}
- @param data: The key data.
- """
- if data.startswith(b"ssh-") or data.startswith(b"ecdsa-sha2-"):
- return "public_openssh"
- elif data.startswith(b"-----BEGIN"):
- return "private_openssh"
- elif data.startswith(b"{"):
- return "public_lsh"
- elif data.startswith(b"("):
- return "private_lsh"
- elif (
- data.startswith(b"\x00\x00\x00\x07ssh-")
- or data.startswith(b"\x00\x00\x00\x13ecdsa-")
- or data.startswith(b"\x00\x00\x00\x0bssh-ed25519")
- ):
- ignored, rest = common.getNS(data)
- count = 0
- while rest:
- count += 1
- ignored, rest = common.getMP(rest)
- if count > 4:
- return "agentv3"
- else:
- return "blob"
- @classmethod
- def _fromRSAComponents(cls, n, e, d=None, p=None, q=None, u=None):
- """
- Build a key from RSA numerical components.
- @type n: L{int}
- @param n: The 'n' RSA variable.
- @type e: L{int}
- @param e: The 'e' RSA variable.
- @type d: L{int} or L{None}
- @param d: The 'd' RSA variable (optional for a public key).
- @type p: L{int} or L{None}
- @param p: The 'p' RSA variable (optional for a public key).
- @type q: L{int} or L{None}
- @param q: The 'q' RSA variable (optional for a public key).
- @type u: L{int} or L{None}
- @param u: The 'u' RSA variable. Ignored, as its value is determined by
- p and q.
- @rtype: L{Key}
- @return: An RSA key constructed from the values as given.
- """
- publicNumbers = rsa.RSAPublicNumbers(e=e, n=n)
- if d is None:
- # We have public components.
- keyObject = publicNumbers.public_key(default_backend())
- else:
- privateNumbers = rsa.RSAPrivateNumbers(
- p=p,
- q=q,
- d=d,
- dmp1=rsa.rsa_crt_dmp1(d, p),
- dmq1=rsa.rsa_crt_dmq1(d, q),
- iqmp=rsa.rsa_crt_iqmp(p, q),
- public_numbers=publicNumbers,
- )
- keyObject = privateNumbers.private_key(default_backend())
- return cls(keyObject)
- @classmethod
- def _fromDSAComponents(cls, y, p, q, g, x=None):
- """
- Build a key from DSA numerical components.
- @type y: L{int}
- @param y: The 'y' DSA variable.
- @type p: L{int}
- @param p: The 'p' DSA variable.
- @type q: L{int}
- @param q: The 'q' DSA variable.
- @type g: L{int}
- @param g: The 'g' DSA variable.
- @type x: L{int} or L{None}
- @param x: The 'x' DSA variable (optional for a public key)
- @rtype: L{Key}
- @return: A DSA key constructed from the values as given.
- """
- publicNumbers = dsa.DSAPublicNumbers(
- y=y, parameter_numbers=dsa.DSAParameterNumbers(p=p, q=q, g=g)
- )
- if x is None:
- # We have public components.
- keyObject = publicNumbers.public_key(default_backend())
- else:
- privateNumbers = dsa.DSAPrivateNumbers(x=x, public_numbers=publicNumbers)
- keyObject = privateNumbers.private_key(default_backend())
- return cls(keyObject)
- @classmethod
- def _fromECComponents(cls, x, y, curve, privateValue=None):
- """
- Build a key from EC components.
- @param x: The affine x component of the public point used for verifying.
- @type x: L{int}
- @param y: The affine y component of the public point used for verifying.
- @type y: L{int}
- @param curve: NIST name of elliptic curve.
- @type curve: L{bytes}
- @param privateValue: The private value.
- @type privateValue: L{int}
- """
- publicNumbers = ec.EllipticCurvePublicNumbers(
- x=x, y=y, curve=_curveTable[curve]
- )
- if privateValue is None:
- # We have public components.
- keyObject = publicNumbers.public_key(default_backend())
- else:
- privateNumbers = ec.EllipticCurvePrivateNumbers(
- private_value=privateValue, public_numbers=publicNumbers
- )
- keyObject = privateNumbers.private_key(default_backend())
- return cls(keyObject)
- @classmethod
- def _fromECEncodedPoint(cls, encodedPoint, curve, privateValue=None):
- """
- Build a key from an EC encoded point.
- @param encodedPoint: The public point encoded as in SEC 1 v2.0
- section 2.3.3.
- @type encodedPoint: L{bytes}
- @param curve: NIST name of elliptic curve.
- @type curve: L{bytes}
- @param privateValue: The private value.
- @type privateValue: L{int}
- """
- if privateValue is None:
- # We have public components.
- keyObject = ec.EllipticCurvePublicKey.from_encoded_point(
- _curveTable[curve], encodedPoint
- )
- else:
- keyObject = ec.derive_private_key(
- privateValue, _curveTable[curve], default_backend()
- )
- return cls(keyObject)
- @classmethod
- def _fromEd25519Components(cls, a, k=None):
- """Build a key from Ed25519 components.
- @param a: The Ed25519 public key, as defined in RFC 8032 section
- 5.1.5.
- @type a: L{bytes}
- @param k: The Ed25519 private key, as defined in RFC 8032 section
- 5.1.5.
- @type k: L{bytes}
- """
- if Ed25519PublicKey is None or Ed25519PrivateKey is None:
- raise BadKeyError("Ed25519 keys not supported on this system")
- if k is None:
- keyObject = Ed25519PublicKey.from_public_bytes(a)
- else:
- keyObject = Ed25519PrivateKey.from_private_bytes(k)
- return cls(keyObject)
- def __init__(self, keyObject):
- """
- Initialize with a private or public
- C{cryptography.hazmat.primitives.asymmetric} key.
- @param keyObject: Low level key.
- @type keyObject: C{cryptography.hazmat.primitives.asymmetric} key.
- """
- self._keyObject = keyObject
- def __eq__(self, other: object) -> bool:
- """
- Return True if other represents an object with the same key.
- """
- if isinstance(other, Key):
- return self.type() == other.type() and self.data() == other.data()
- else:
- return NotImplemented
- def __repr__(self) -> str:
- """
- Return a pretty representation of this object.
- """
- if self.type() == "EC":
- data = self.data()
- name = data["curve"].decode("utf-8")
- if self.isPublic():
- out = f"<Elliptic Curve Public Key ({name[-3:]} bits)"
- else:
- out = f"<Elliptic Curve Private Key ({name[-3:]} bits)"
- for k, v in sorted(data.items()):
- if k == "curve":
- out += f"\ncurve:\n\t{name}"
- else:
- out += f"\n{k}:\n\t{v}"
- return out + ">\n"
- else:
- lines = [
- "<%s %s (%s bits)"
- % (
- nativeString(self.type()),
- self.isPublic() and "Public Key" or "Private Key",
- self.size(),
- )
- ]
- for k, v in sorted(self.data().items()):
- lines.append(f"attr {k}:")
- by = v if self.type() == "Ed25519" else common.MP(v)[4:]
- while by:
- m = by[:15]
- by = by[15:]
- o = ""
- for c in iterbytes(m):
- o = o + f"{ord(c):02x}:"
- if len(m) < 15:
- o = o[:-1]
- lines.append("\t" + o)
- lines[-1] = lines[-1] + ">"
- return "\n".join(lines)
- def isPublic(self):
- """
- Check if this instance is a public key.
- @return: C{True} if this is a public key.
- """
- return isinstance(
- self._keyObject,
- (
- rsa.RSAPublicKey,
- dsa.DSAPublicKey,
- ec.EllipticCurvePublicKey,
- ed25519.Ed25519PublicKey,
- ),
- )
- def public(self):
- """
- Returns a version of this key containing only the public key data.
- If this is a public key, this may or may not be the same object
- as self.
- @rtype: L{Key}
- @return: A public key.
- """
- if self.isPublic():
- return self
- else:
- return Key(self._keyObject.public_key())
- def fingerprint(self, format=FingerprintFormats.MD5_HEX):
- """
- The fingerprint of a public key consists of the output of the
- message-digest algorithm in the specified format.
- Supported formats include L{FingerprintFormats.MD5_HEX} and
- L{FingerprintFormats.SHA256_BASE64}
- The input to the algorithm is the public key data as specified by [RFC4253].
- The output of sha256[RFC4634] algorithm is presented to the
- user in the form of base64 encoded sha256 hashes.
- Example: C{US5jTUa0kgX5ZxdqaGF0yGRu8EgKXHNmoT8jHKo1StM=}
- The output of the MD5[RFC1321](default) algorithm is presented to the user as
- a sequence of 16 octets printed as hexadecimal with lowercase letters
- and separated by colons.
- Example: C{c1:b1:30:29:d7:b8:de:6c:97:77:10:d7:46:41:63:87}
- @param format: Format for fingerprint generation. Consists
- hash function and representation format.
- Default is L{FingerprintFormats.MD5_HEX}
- @since: 8.2
- @return: the user presentation of this L{Key}'s fingerprint, as a
- string.
- @rtype: L{str}
- """
- if format is FingerprintFormats.SHA256_BASE64:
- return nativeString(b64encode(sha256(self.blob()).digest()))
- elif format is FingerprintFormats.MD5_HEX:
- return nativeString(
- b":".join(
- [binascii.hexlify(x) for x in iterbytes(md5(self.blob()).digest())]
- )
- )
- else:
- raise BadFingerPrintFormat(f"Unsupported fingerprint format: {format}")
- def type(self) -> Literal["RSA", "DSA", "EC", "Ed25519"]:
- """
- Return the type of the object we wrap. Currently this can only be
- 'RSA', 'DSA', 'EC', or 'Ed25519'.
- @rtype: L{str}
- @raises RuntimeError: If the object type is unknown.
- """
- if isinstance(self._keyObject, (rsa.RSAPublicKey, rsa.RSAPrivateKey)):
- return "RSA"
- elif isinstance(self._keyObject, (dsa.DSAPublicKey, dsa.DSAPrivateKey)):
- return "DSA"
- elif isinstance(
- self._keyObject, (ec.EllipticCurvePublicKey, ec.EllipticCurvePrivateKey)
- ):
- return "EC"
- elif isinstance(
- self._keyObject, (ed25519.Ed25519PublicKey, ed25519.Ed25519PrivateKey)
- ):
- return "Ed25519"
- else:
- raise RuntimeError(f"unknown type of object: {self._keyObject!r}")
- def sshType(self):
- """
- Get the type of the object we wrap as defined in the SSH protocol,
- defined in RFC 4253, Section 6.6 and RFC 8332, section 4 (this is a
- public key format name, not a public key algorithm name). Currently
- this can only be b'ssh-rsa', b'ssh-dss', b'ecdsa-sha2-[identifier]'
- or b'ssh-ed25519'.
- identifier is the standard NIST curve name
- @return: The key type format.
- @rtype: L{bytes}
- """
- if self.type() == "EC":
- return (
- b"ecdsa-sha2-" + _secToNist[self._keyObject.curve.name.encode("ascii")]
- )
- else:
- return {
- "RSA": b"ssh-rsa",
- "DSA": b"ssh-dss",
- "Ed25519": b"ssh-ed25519",
- }[self.type()]
- def supportedSignatureAlgorithms(self):
- """
- Get the public key signature algorithms supported by this key.
- @return: A list of supported public key signature algorithm names.
- @rtype: L{list} of L{bytes}
- """
- if self.type() == "RSA":
- return [b"rsa-sha2-512", b"rsa-sha2-256", b"ssh-rsa"]
- else:
- return [self.sshType()]
- def _getHashAlgorithm(self, signatureType):
- """
- Return a hash algorithm for this key type given an SSH signature
- algorithm name, or L{None} if no such hash algorithm is defined for
- this key type.
- """
- if self.type() == "EC":
- # Hash algorithm depends on key size
- if signatureType == self.sshType():
- keySize = self.size()
- if keySize <= 256:
- return hashes.SHA256()
- elif keySize <= 384:
- return hashes.SHA384()
- else:
- return hashes.SHA512()
- else:
- return None
- else:
- return {
- ("RSA", b"ssh-rsa"): hashes.SHA1(),
- ("RSA", b"rsa-sha2-256"): hashes.SHA256(),
- ("RSA", b"rsa-sha2-512"): hashes.SHA512(),
- ("DSA", b"ssh-dss"): hashes.SHA1(),
- ("Ed25519", b"ssh-ed25519"): hashes.SHA512(),
- }.get((self.type(), signatureType))
- def size(self):
- """
- Return the size of the object we wrap.
- @return: The size of the key.
- @rtype: L{int}
- """
- if self._keyObject is None:
- return 0
- elif self.type() == "EC":
- return self._keyObject.curve.key_size
- elif self.type() == "Ed25519":
- return 256
- return self._keyObject.key_size
- def data(self) -> dict[str, Any]:
- """
- Return the values of the public key as a dictionary.
- @rtype: L{dict}
- """
- if isinstance(self._keyObject, rsa.RSAPublicKey):
- rsa_pub_numbers = self._keyObject.public_numbers()
- return {
- "n": rsa_pub_numbers.n,
- "e": rsa_pub_numbers.e,
- }
- elif isinstance(self._keyObject, rsa.RSAPrivateKey):
- rsa_priv_numbers = self._keyObject.private_numbers()
- return {
- "n": rsa_priv_numbers.public_numbers.n,
- "e": rsa_priv_numbers.public_numbers.e,
- "d": rsa_priv_numbers.d,
- "p": rsa_priv_numbers.p,
- "q": rsa_priv_numbers.q,
- # Use a trick: iqmp is q^-1 % p, u is p^-1 % q
- "u": rsa.rsa_crt_iqmp(rsa_priv_numbers.q, rsa_priv_numbers.p),
- }
- elif isinstance(self._keyObject, dsa.DSAPublicKey):
- dsa_pub_numbers = self._keyObject.public_numbers()
- return {
- "y": dsa_pub_numbers.y,
- "g": dsa_pub_numbers.parameter_numbers.g,
- "p": dsa_pub_numbers.parameter_numbers.p,
- "q": dsa_pub_numbers.parameter_numbers.q,
- }
- elif isinstance(self._keyObject, dsa.DSAPrivateKey):
- dsa_priv_numbers = self._keyObject.private_numbers()
- return {
- "x": dsa_priv_numbers.x,
- "y": dsa_priv_numbers.public_numbers.y,
- "g": dsa_priv_numbers.public_numbers.parameter_numbers.g,
- "p": dsa_priv_numbers.public_numbers.parameter_numbers.p,
- "q": dsa_priv_numbers.public_numbers.parameter_numbers.q,
- }
- elif isinstance(self._keyObject, ec.EllipticCurvePublicKey):
- ec_pub_numbers = self._keyObject.public_numbers()
- return {
- "x": ec_pub_numbers.x,
- "y": ec_pub_numbers.y,
- "curve": self.sshType(),
- }
- elif isinstance(self._keyObject, ec.EllipticCurvePrivateKey):
- ec_priv_numbers = self._keyObject.private_numbers()
- return {
- "x": ec_priv_numbers.public_numbers.x,
- "y": ec_priv_numbers.public_numbers.y,
- "privateValue": ec_priv_numbers.private_value,
- "curve": self.sshType(),
- }
- elif isinstance(self._keyObject, ed25519.Ed25519PublicKey):
- return {
- "a": self._keyObject.public_bytes(
- serialization.Encoding.Raw, serialization.PublicFormat.Raw
- ),
- }
- elif isinstance(self._keyObject, ed25519.Ed25519PrivateKey):
- return {
- "a": self._keyObject.public_key().public_bytes(
- serialization.Encoding.Raw, serialization.PublicFormat.Raw
- ),
- "k": self._keyObject.private_bytes(
- serialization.Encoding.Raw,
- serialization.PrivateFormat.Raw,
- serialization.NoEncryption(),
- ),
- }
- else:
- raise RuntimeError(f"Unexpected key type: {self._keyObject}")
- def blob(self):
- """
- Return the public key blob for this key. The blob is the
- over-the-wire format for public keys.
- SECSH-TRANS RFC 4253 Section 6.6.
- RSA keys::
- string 'ssh-rsa'
- integer e
- integer n
- DSA keys::
- string 'ssh-dss'
- integer p
- integer q
- integer g
- integer y
- EC keys::
- string 'ecdsa-sha2-[identifier]'
- integer x
- integer y
- identifier is the standard NIST curve name
- Ed25519 keys::
- string 'ssh-ed25519'
- string a
- @rtype: L{bytes}
- """
- type = self.type()
- data = self.data()
- if type == "RSA":
- return common.NS(b"ssh-rsa") + common.MP(data["e"]) + common.MP(data["n"])
- elif type == "DSA":
- return (
- common.NS(b"ssh-dss")
- + common.MP(data["p"])
- + common.MP(data["q"])
- + common.MP(data["g"])
- + common.MP(data["y"])
- )
- elif type == "EC":
- byteLength = (self._keyObject.curve.key_size + 7) // 8
- return (
- common.NS(data["curve"])
- + common.NS(data["curve"][-8:])
- + common.NS(
- b"\x04"
- + utils.int_to_bytes(data["x"], byteLength)
- + utils.int_to_bytes(data["y"], byteLength)
- )
- )
- elif type == "Ed25519":
- return common.NS(b"ssh-ed25519") + common.NS(data["a"])
- else:
- raise BadKeyError(f"unknown key type: {type}")
- def privateBlob(self):
- """
- Return the private key blob for this key. The blob is the
- over-the-wire format for private keys:
- Specification in OpenSSH PROTOCOL.agent
- RSA keys::
- string 'ssh-rsa'
- integer n
- integer e
- integer d
- integer u
- integer p
- integer q
- DSA keys::
- string 'ssh-dss'
- integer p
- integer q
- integer g
- integer y
- integer x
- EC keys::
- string 'ecdsa-sha2-[identifier]'
- integer x
- integer y
- integer privateValue
- identifier is the NIST standard curve name.
- Ed25519 keys::
- string 'ssh-ed25519'
- string a
- string k || a
- """
- type = self.type()
- data = self.data()
- if type == "RSA":
- iqmp = rsa.rsa_crt_iqmp(data["p"], data["q"])
- return (
- common.NS(b"ssh-rsa")
- + common.MP(data["n"])
- + common.MP(data["e"])
- + common.MP(data["d"])
- + common.MP(iqmp)
- + common.MP(data["p"])
- + common.MP(data["q"])
- )
- elif type == "DSA":
- return (
- common.NS(b"ssh-dss")
- + common.MP(data["p"])
- + common.MP(data["q"])
- + common.MP(data["g"])
- + common.MP(data["y"])
- + common.MP(data["x"])
- )
- elif type == "EC":
- encPub = self._keyObject.public_key().public_bytes(
- serialization.Encoding.X962,
- serialization.PublicFormat.UncompressedPoint,
- )
- return (
- common.NS(data["curve"])
- + common.NS(data["curve"][-8:])
- + common.NS(encPub)
- + common.MP(data["privateValue"])
- )
- elif type == "Ed25519":
- return (
- common.NS(b"ssh-ed25519")
- + common.NS(data["a"])
- + common.NS(data["k"] + data["a"])
- )
- else:
- raise BadKeyError(f"unknown key type: {type}")
- @_mutuallyExclusiveArguments(
- [
- ["extra", "comment"],
- ["extra", "passphrase"],
- ]
- )
- def toString(self, type, extra=None, subtype=None, comment=None, passphrase=None):
- """
- Create a string representation of this key. If the key is a private
- key and you want the representation of its public key, use
- C{key.public().toString()}. type maps to a _toString_* method.
- @param type: The type of string to emit. Currently supported values
- are C{'OPENSSH'}, C{'LSH'}, and C{'AGENTV3'}.
- @type type: L{str}
- @param extra: Any extra data supported by the selected format which
- is not part of the key itself. For public OpenSSH keys, this is
- a comment. For private OpenSSH keys, this is a passphrase to
- encrypt with. (Deprecated since Twisted 20.3.0; use C{comment}
- or C{passphrase} as appropriate instead.)
- @type extra: L{bytes} or L{unicode} or L{None}
- @param subtype: A subtype of the requested C{type} to emit. Only
- supported for private OpenSSH keys, for which the currently
- supported subtypes are C{'PEM'} and C{'v1'}. If not given, an
- appropriate default is used.
- @type subtype: L{str} or L{None}
- @param comment: A comment to include with the key. Only supported
- for OpenSSH keys.
- Present since Twisted 20.3.0.
- @type comment: L{bytes} or L{unicode} or L{None}
- @param passphrase: A passphrase to encrypt the key with. Only
- supported for private OpenSSH keys.
- Present since Twisted 20.3.0.
- @type passphrase: L{bytes} or L{unicode} or L{None}
- @rtype: L{bytes}
- """
- if extra is not None:
- # Compatibility with old parameter format.
- warnings.warn(
- "The 'extra' argument to "
- "twisted.conch.ssh.keys.Key.toString was deprecated in "
- "Twisted 20.3.0; use 'comment' or 'passphrase' instead.",
- DeprecationWarning,
- stacklevel=3,
- )
- if self.isPublic():
- comment = extra
- else:
- passphrase = extra
- if isinstance(comment, str):
- comment = comment.encode("utf-8")
- passphrase = _normalizePassphrase(passphrase)
- method = getattr(self, f"_toString_{type.upper()}", None)
- if method is None:
- raise BadKeyError(f"unknown key type: {type}")
- return method(subtype=subtype, comment=comment, passphrase=passphrase)
- def _toPublicOpenSSH(self, comment=None):
- """
- Return a public OpenSSH key string.
- See _fromString_PUBLIC_OPENSSH for the string format.
- @type comment: L{bytes} or L{None}
- @param comment: A comment to include with the key, or L{None} to
- omit the comment.
- """
- if self.type() == "EC":
- if not comment:
- comment = b""
- return (
- self._keyObject.public_bytes(
- serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH
- )
- + b" "
- + comment
- ).strip()
- b64Data = encodebytes(self.blob()).replace(b"\n", b"")
- if not comment:
- comment = b""
- return (self.sshType() + b" " + b64Data + b" " + comment).strip()
- def _toPrivateOpenSSH_v1(self, comment=None, passphrase=None):
- """
- Return a private OpenSSH key string, in the "openssh-key-v1" format
- introduced in OpenSSH 6.5.
- See _fromPrivateOpenSSH_v1 for the string format.
- @type passphrase: L{bytes} or L{None}
- @param passphrase: The passphrase to encrypt the key with, or L{None}
- if it is not encrypted.
- """
- if passphrase:
- # For now we just hardcode the cipher to the one used by
- # OpenSSH. We could make this configurable later if it's
- # needed.
- cipher = algorithms.AES
- cipherName = b"aes256-ctr"
- kdfName = b"bcrypt"
- blockSize = cipher.block_size // 8
- keySize = 32
- ivSize = blockSize
- salt = randbytes.secureRandom(ivSize)
- rounds = 100
- kdfOptions = common.NS(salt) + struct.pack("!L", rounds)
- else:
- cipherName = b"none"
- kdfName = b"none"
- blockSize = 8
- kdfOptions = b""
- check = randbytes.secureRandom(4)
- privKeyList = check + check + self.privateBlob() + common.NS(comment or b"")
- padByte = 0
- while len(privKeyList) % blockSize:
- padByte += 1
- privKeyList += bytes((padByte & 0xFF,))
- if passphrase:
- encKey = bcrypt.kdf(passphrase, salt, keySize + ivSize, 100)
- encryptor = Cipher(
- cipher(encKey[:keySize]),
- modes.CTR(encKey[keySize : keySize + ivSize]),
- backend=default_backend(),
- ).encryptor()
- encPrivKeyList = encryptor.update(privKeyList) + encryptor.finalize()
- else:
- encPrivKeyList = privKeyList
- blob = (
- b"openssh-key-v1\0"
- + common.NS(cipherName)
- + common.NS(kdfName)
- + common.NS(kdfOptions)
- + struct.pack("!L", 1)
- + common.NS(self.blob())
- + common.NS(encPrivKeyList)
- )
- b64Data = encodebytes(blob).replace(b"\n", b"")
- lines = (
- [b"-----BEGIN OPENSSH PRIVATE KEY-----"]
- + [b64Data[i : i + 64] for i in range(0, len(b64Data), 64)]
- + [b"-----END OPENSSH PRIVATE KEY-----"]
- )
- return b"\n".join(lines) + b"\n"
- def _toPrivateOpenSSH_PEM(self, passphrase=None):
- """
- Return a private OpenSSH key string, in the old PEM-based format.
- See _fromPrivateOpenSSH_PEM for the string format.
- @type passphrase: L{bytes} or L{None}
- @param passphrase: The passphrase to encrypt the key with, or L{None}
- if it is not encrypted.
- """
- if not passphrase:
- # unencrypted private key
- encryptor = serialization.NoEncryption()
- else:
- encryptor = serialization.BestAvailableEncryption(passphrase)
- if self.type() != "Ed25519":
- return self._keyObject.private_bytes(
- serialization.Encoding.PEM,
- serialization.PrivateFormat.TraditionalOpenSSL,
- encryptor,
- )
- else:
- # TODO: why not just support serialization here
- assert self.type() == "Ed25519"
- raise ValueError(
- "cannot serialize Ed25519 key to OpenSSH PEM format; use v1 " "instead"
- )
- def _toString_OPENSSH(self, subtype=None, comment=None, passphrase=None):
- """
- Return a public or private OpenSSH string. See
- L{_fromString_PUBLIC_OPENSSH} and L{_fromPrivateOpenSSH_PEM} for the
- string formats.
- @param subtype: A subtype to emit. Only supported for private keys,
- for which the currently supported subtypes are C{'PEM'} and C{'v1'}.
- If not given, an appropriate default is used.
- @type subtype: L{str} or L{None}
- @param comment: Comment for a public key.
- @type comment: L{bytes}
- @param passphrase: Passphrase for a private key.
- @type passphrase: L{bytes}
- @rtype: L{bytes}
- """
- if self.isPublic():
- return self._toPublicOpenSSH(comment=comment)
- # No pre-v1 format is defined for Ed25519 keys.
- elif subtype == "v1" or (subtype is None and self.type() == "Ed25519"):
- return self._toPrivateOpenSSH_v1(comment=comment, passphrase=passphrase)
- elif subtype is None or subtype == "PEM":
- return self._toPrivateOpenSSH_PEM(passphrase=passphrase)
- else:
- raise ValueError(f"unknown subtype {subtype}")
- def _toString_LSH(self, **kwargs):
- """
- Return a public or private LSH key. See _fromString_PUBLIC_LSH and
- _fromString_PRIVATE_LSH for the key formats.
- @rtype: L{bytes}
- """
- data = self.data()
- type = self.type()
- if self.isPublic():
- if type == "RSA":
- keyData = sexpy.pack(
- [
- [
- b"public-key",
- [
- b"rsa-pkcs1-sha1",
- [b"n", common.MP(data["n"])[4:]],
- [b"e", common.MP(data["e"])[4:]],
- ],
- ]
- ]
- )
- elif type == "DSA":
- keyData = sexpy.pack(
- [
- [
- b"public-key",
- [
- b"dsa",
- [b"p", common.MP(data["p"])[4:]],
- [b"q", common.MP(data["q"])[4:]],
- [b"g", common.MP(data["g"])[4:]],
- [b"y", common.MP(data["y"])[4:]],
- ],
- ]
- ]
- )
- else:
- raise BadKeyError(f"unknown key type {type}")
- return b"{" + encodebytes(keyData).replace(b"\n", b"") + b"}"
- else:
- if type == "RSA":
- p, q = data["p"], data["q"]
- iqmp = rsa.rsa_crt_iqmp(p, q)
- return sexpy.pack(
- [
- [
- b"private-key",
- [
- b"rsa-pkcs1",
- [b"n", common.MP(data["n"])[4:]],
- [b"e", common.MP(data["e"])[4:]],
- [b"d", common.MP(data["d"])[4:]],
- [b"p", common.MP(q)[4:]],
- [b"q", common.MP(p)[4:]],
- [b"a", common.MP(data["d"] % (q - 1))[4:]],
- [b"b", common.MP(data["d"] % (p - 1))[4:]],
- [b"c", common.MP(iqmp)[4:]],
- ],
- ]
- ]
- )
- elif type == "DSA":
- return sexpy.pack(
- [
- [
- b"private-key",
- [
- b"dsa",
- [b"p", common.MP(data["p"])[4:]],
- [b"q", common.MP(data["q"])[4:]],
- [b"g", common.MP(data["g"])[4:]],
- [b"y", common.MP(data["y"])[4:]],
- [b"x", common.MP(data["x"])[4:]],
- ],
- ]
- ]
- )
- else:
- raise BadKeyError(f"unknown key type {type}'")
- def _toString_AGENTV3(self, **kwargs):
- """
- Return a private Secure Shell Agent v3 key. See
- _fromString_AGENTV3 for the key format.
- @rtype: L{bytes}
- """
- data = self.data()
- if not self.isPublic():
- if self.type() == "RSA":
- values = (
- data["e"],
- data["d"],
- data["n"],
- data["u"],
- data["p"],
- data["q"],
- )
- elif self.type() == "DSA":
- values = (data["p"], data["q"], data["g"], data["y"], data["x"])
- return common.NS(self.sshType()) + b"".join(map(common.MP, values))
- def sign(self, data, signatureType=None):
- """
- Sign some data with this key.
- SECSH-TRANS RFC 4253 Section 6.6.
- @type data: L{bytes}
- @param data: The data to sign.
- @type signatureType: L{bytes}
- @param signatureType: The SSH public key algorithm name to sign this
- data with, or L{None} to use a reasonable default for the key.
- @rtype: L{bytes}
- @return: A signature for the given data.
- """
- keyType = self.type()
- if signatureType is None:
- # Use the SSH public key type name by default, since for all
- # current key types this can also be used as a public key
- # algorithm name. (This exists for compatibility; new code
- # should explicitly specify a public key algorithm name.)
- signatureType = self.sshType()
- hashAlgorithm = self._getHashAlgorithm(signatureType)
- if hashAlgorithm is None:
- raise BadSignatureAlgorithmError(
- f"public key signature algorithm {signatureType} is not "
- f"defined for {keyType} keys"
- )
- if keyType == "RSA":
- sig = self._keyObject.sign(data, padding.PKCS1v15(), hashAlgorithm)
- ret = common.NS(sig)
- elif keyType == "DSA":
- sig = self._keyObject.sign(data, hashAlgorithm)
- (r, s) = decode_dss_signature(sig)
- # SSH insists that the DSS signature blob be two 160-bit integers
- # concatenated together. The sig[0], [1] numbers from obj.sign
- # are just numbers, and could be any length from 0 to 160 bits.
- # Make sure they are padded out to 160 bits (20 bytes each)
- ret = common.NS(int_to_bytes(r, 20) + int_to_bytes(s, 20))
- elif keyType == "EC": # Pragma: no branch
- signature = self._keyObject.sign(data, ec.ECDSA(hashAlgorithm))
- (r, s) = decode_dss_signature(signature)
- rb = int_to_bytes(r)
- sb = int_to_bytes(s)
- # Int_to_bytes returns rb[0] as a str in python2
- # and an as int in python3
- if type(rb[0]) is str:
- rcomp = ord(rb[0])
- else:
- rcomp = rb[0]
- # If the MSB is set, prepend a null byte for correct formatting.
- if rcomp & 0x80:
- rb = b"\x00" + rb
- if type(sb[0]) is str:
- scomp = ord(sb[0])
- else:
- scomp = sb[0]
- if scomp & 0x80:
- sb = b"\x00" + sb
- ret = common.NS(common.NS(rb) + common.NS(sb))
- elif keyType == "Ed25519":
- ret = common.NS(self._keyObject.sign(data))
- return common.NS(signatureType) + ret
- def verify(self, signature, data):
- """
- Verify a signature using this key.
- @type signature: L{bytes}
- @param signature: The signature to verify.
- @type data: L{bytes}
- @param data: The signed data.
- @rtype: L{bool}
- @return: C{True} if the signature is valid.
- """
- if len(signature) == 40:
- # DSA key with no padding
- signatureType, signature = b"ssh-dss", common.NS(signature)
- else:
- signatureType, signature = common.getNS(signature)
- hashAlgorithm = self._getHashAlgorithm(signatureType)
- if hashAlgorithm is None:
- return False
- keyType = self.type()
- if keyType == "RSA":
- k = self._keyObject
- if not self.isPublic():
- k = k.public_key()
- args = (
- common.getNS(signature)[0],
- data,
- padding.PKCS1v15(),
- hashAlgorithm,
- )
- elif keyType == "DSA":
- concatenatedSignature = common.getNS(signature)[0]
- r = int.from_bytes(concatenatedSignature[:20], "big")
- s = int.from_bytes(concatenatedSignature[20:], "big")
- signature = encode_dss_signature(r, s)
- k = self._keyObject
- if not self.isPublic():
- k = k.public_key()
- args = (signature, data, hashAlgorithm)
- elif keyType == "EC": # Pragma: no branch
- concatenatedSignature = common.getNS(signature)[0]
- rstr, sstr, rest = common.getNS(concatenatedSignature, 2)
- r = int.from_bytes(rstr, "big")
- s = int.from_bytes(sstr, "big")
- signature = encode_dss_signature(r, s)
- k = self._keyObject
- if not self.isPublic():
- k = k.public_key()
- args = (signature, data, ec.ECDSA(hashAlgorithm))
- elif keyType == "Ed25519":
- k = self._keyObject
- if not self.isPublic():
- k = k.public_key()
- args = (common.getNS(signature)[0], data)
- try:
- k.verify(*args)
- except InvalidSignature:
- return False
- else:
- return True
- def _getPersistentRSAKey(location, keySize=4096):
- """
- This function returns a persistent L{Key}.
- The key is loaded from a PEM file in C{location}. If it does not exist, a
- key with the key size of C{keySize} is generated and saved.
- @param location: Where the key is stored.
- @type location: L{twisted.python.filepath.FilePath}
- @param keySize: The size of the key, if it needs to be generated.
- @type keySize: L{int}
- @returns: A persistent key.
- @rtype: L{Key}
- """
- location.parent().makedirs(ignoreExistingDirectory=True)
- # If it doesn't exist, we want to generate a new key and save it
- if not location.exists():
- privateKey = rsa.generate_private_key(
- public_exponent=65537, key_size=keySize, backend=default_backend()
- )
- pem = privateKey.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- )
- location.setContent(pem)
- # By this point (save any hilarious race conditions) we should have a
- # working PEM file. Load it!
- # (Future archaeological readers: I chose not to short circuit above,
- # because then there's two exit paths to this code!)
- with location.open("rb") as keyFile:
- privateKey = serialization.load_pem_private_key(
- keyFile.read(), password=None, backend=default_backend()
- )
- return Key(privateKey)
|