diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization')
5 files changed, 2171 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/__init__.py b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/__init__.py new file mode 100644 index 00000000..07b2264b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/__init__.py @@ -0,0 +1,63 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import annotations + +from cryptography.hazmat.primitives._serialization import ( + BestAvailableEncryption, + Encoding, + KeySerializationEncryption, + NoEncryption, + ParameterFormat, + PrivateFormat, + PublicFormat, + _KeySerializationEncryption, +) +from cryptography.hazmat.primitives.serialization.base import ( + load_der_parameters, + load_der_private_key, + load_der_public_key, + load_pem_parameters, + load_pem_private_key, + load_pem_public_key, +) +from cryptography.hazmat.primitives.serialization.ssh import ( + SSHCertificate, + SSHCertificateBuilder, + SSHCertificateType, + SSHCertPrivateKeyTypes, + SSHCertPublicKeyTypes, + SSHPrivateKeyTypes, + SSHPublicKeyTypes, + load_ssh_private_key, + load_ssh_public_identity, + load_ssh_public_key, +) + +__all__ = [ + "BestAvailableEncryption", + "Encoding", + "KeySerializationEncryption", + "NoEncryption", + "ParameterFormat", + "PrivateFormat", + "PublicFormat", + "SSHCertPrivateKeyTypes", + "SSHCertPublicKeyTypes", + "SSHCertificate", + "SSHCertificateBuilder", + "SSHCertificateType", + "SSHPrivateKeyTypes", + "SSHPublicKeyTypes", + "_KeySerializationEncryption", + "load_der_parameters", + "load_der_private_key", + "load_der_public_key", + "load_pem_parameters", + "load_pem_private_key", + "load_pem_public_key", + "load_ssh_private_key", + "load_ssh_public_identity", + "load_ssh_public_key", +] diff --git a/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/base.py b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/base.py new file mode 100644 index 00000000..e7c998b7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/base.py @@ -0,0 +1,14 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from cryptography.hazmat.bindings._rust import openssl as rust_openssl + +load_pem_private_key = rust_openssl.keys.load_pem_private_key +load_der_private_key = rust_openssl.keys.load_der_private_key + +load_pem_public_key = rust_openssl.keys.load_pem_public_key +load_der_public_key = rust_openssl.keys.load_der_public_key + +load_pem_parameters = rust_openssl.dh.from_pem_parameters +load_der_parameters = rust_openssl.dh.from_der_parameters diff --git a/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/pkcs12.py b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/pkcs12.py new file mode 100644 index 00000000..549e1f99 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/pkcs12.py @@ -0,0 +1,156 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import annotations + +import typing + +from cryptography import x509 +from cryptography.hazmat.bindings._rust import pkcs12 as rust_pkcs12 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives._serialization import PBES as PBES +from cryptography.hazmat.primitives.asymmetric import ( + dsa, + ec, + ed448, + ed25519, + rsa, +) +from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes + +__all__ = [ + "PBES", + "PKCS12Certificate", + "PKCS12KeyAndCertificates", + "PKCS12PrivateKeyTypes", + "load_key_and_certificates", + "load_pkcs12", + "serialize_key_and_certificates", +] + +PKCS12PrivateKeyTypes = typing.Union[ + rsa.RSAPrivateKey, + dsa.DSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, +] + + +PKCS12Certificate = rust_pkcs12.PKCS12Certificate + + +class PKCS12KeyAndCertificates: + def __init__( + self, + key: PrivateKeyTypes | None, + cert: PKCS12Certificate | None, + additional_certs: list[PKCS12Certificate], + ): + if key is not None and not isinstance( + key, + ( + rsa.RSAPrivateKey, + dsa.DSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, + ), + ): + raise TypeError( + "Key must be RSA, DSA, EllipticCurve, ED25519, or ED448" + " private key, or None." + ) + if cert is not None and not isinstance(cert, PKCS12Certificate): + raise TypeError("cert must be a PKCS12Certificate object or None") + if not all( + isinstance(add_cert, PKCS12Certificate) + for add_cert in additional_certs + ): + raise TypeError( + "all values in additional_certs must be PKCS12Certificate" + " objects" + ) + self._key = key + self._cert = cert + self._additional_certs = additional_certs + + @property + def key(self) -> PrivateKeyTypes | None: + return self._key + + @property + def cert(self) -> PKCS12Certificate | None: + return self._cert + + @property + def additional_certs(self) -> list[PKCS12Certificate]: + return self._additional_certs + + def __eq__(self, other: object) -> bool: + if not isinstance(other, PKCS12KeyAndCertificates): + return NotImplemented + + return ( + self.key == other.key + and self.cert == other.cert + and self.additional_certs == other.additional_certs + ) + + def __hash__(self) -> int: + return hash((self.key, self.cert, tuple(self.additional_certs))) + + def __repr__(self) -> str: + fmt = ( + "<PKCS12KeyAndCertificates(key={}, cert={}, additional_certs={})>" + ) + return fmt.format(self.key, self.cert, self.additional_certs) + + +load_key_and_certificates = rust_pkcs12.load_key_and_certificates +load_pkcs12 = rust_pkcs12.load_pkcs12 + + +_PKCS12CATypes = typing.Union[ + x509.Certificate, + PKCS12Certificate, +] + + +def serialize_key_and_certificates( + name: bytes | None, + key: PKCS12PrivateKeyTypes | None, + cert: x509.Certificate | None, + cas: typing.Iterable[_PKCS12CATypes] | None, + encryption_algorithm: serialization.KeySerializationEncryption, +) -> bytes: + if key is not None and not isinstance( + key, + ( + rsa.RSAPrivateKey, + dsa.DSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, + ), + ): + raise TypeError( + "Key must be RSA, DSA, EllipticCurve, ED25519, or ED448" + " private key, or None." + ) + + if not isinstance( + encryption_algorithm, serialization.KeySerializationEncryption + ): + raise TypeError( + "Key encryption algorithm must be a " + "KeySerializationEncryption instance" + ) + + if key is None and cert is None and not cas: + raise ValueError("You must supply at least one of key, cert, or cas") + + return rust_pkcs12.serialize_key_and_certificates( + name, key, cert, cas, encryption_algorithm + ) diff --git a/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/pkcs7.py b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/pkcs7.py new file mode 100644 index 00000000..882e345f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/pkcs7.py @@ -0,0 +1,369 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import annotations + +import email.base64mime +import email.generator +import email.message +import email.policy +import io +import typing + +from cryptography import utils, x509 +from cryptography.exceptions import UnsupportedAlgorithm, _Reasons +from cryptography.hazmat.bindings._rust import pkcs7 as rust_pkcs7 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa +from cryptography.utils import _check_byteslike + +load_pem_pkcs7_certificates = rust_pkcs7.load_pem_pkcs7_certificates + +load_der_pkcs7_certificates = rust_pkcs7.load_der_pkcs7_certificates + +serialize_certificates = rust_pkcs7.serialize_certificates + +PKCS7HashTypes = typing.Union[ + hashes.SHA224, + hashes.SHA256, + hashes.SHA384, + hashes.SHA512, +] + +PKCS7PrivateKeyTypes = typing.Union[ + rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey +] + + +class PKCS7Options(utils.Enum): + Text = "Add text/plain MIME type" + Binary = "Don't translate input data into canonical MIME format" + DetachedSignature = "Don't embed data in the PKCS7 structure" + NoCapabilities = "Don't embed SMIME capabilities" + NoAttributes = "Don't embed authenticatedAttributes" + NoCerts = "Don't embed signer certificate" + + +class PKCS7SignatureBuilder: + def __init__( + self, + data: bytes | None = None, + signers: list[ + tuple[ + x509.Certificate, + PKCS7PrivateKeyTypes, + PKCS7HashTypes, + padding.PSS | padding.PKCS1v15 | None, + ] + ] = [], + additional_certs: list[x509.Certificate] = [], + ): + self._data = data + self._signers = signers + self._additional_certs = additional_certs + + def set_data(self, data: bytes) -> PKCS7SignatureBuilder: + _check_byteslike("data", data) + if self._data is not None: + raise ValueError("data may only be set once") + + return PKCS7SignatureBuilder(data, self._signers) + + def add_signer( + self, + certificate: x509.Certificate, + private_key: PKCS7PrivateKeyTypes, + hash_algorithm: PKCS7HashTypes, + *, + rsa_padding: padding.PSS | padding.PKCS1v15 | None = None, + ) -> PKCS7SignatureBuilder: + if not isinstance( + hash_algorithm, + ( + hashes.SHA224, + hashes.SHA256, + hashes.SHA384, + hashes.SHA512, + ), + ): + raise TypeError( + "hash_algorithm must be one of hashes.SHA224, " + "SHA256, SHA384, or SHA512" + ) + if not isinstance(certificate, x509.Certificate): + raise TypeError("certificate must be a x509.Certificate") + + if not isinstance( + private_key, (rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey) + ): + raise TypeError("Only RSA & EC keys are supported at this time.") + + if rsa_padding is not None: + if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)): + raise TypeError("Padding must be PSS or PKCS1v15") + if not isinstance(private_key, rsa.RSAPrivateKey): + raise TypeError("Padding is only supported for RSA keys") + + return PKCS7SignatureBuilder( + self._data, + [ + *self._signers, + (certificate, private_key, hash_algorithm, rsa_padding), + ], + ) + + def add_certificate( + self, certificate: x509.Certificate + ) -> PKCS7SignatureBuilder: + if not isinstance(certificate, x509.Certificate): + raise TypeError("certificate must be a x509.Certificate") + + return PKCS7SignatureBuilder( + self._data, self._signers, [*self._additional_certs, certificate] + ) + + def sign( + self, + encoding: serialization.Encoding, + options: typing.Iterable[PKCS7Options], + backend: typing.Any = None, + ) -> bytes: + if len(self._signers) == 0: + raise ValueError("Must have at least one signer") + if self._data is None: + raise ValueError("You must add data to sign") + options = list(options) + if not all(isinstance(x, PKCS7Options) for x in options): + raise ValueError("options must be from the PKCS7Options enum") + if encoding not in ( + serialization.Encoding.PEM, + serialization.Encoding.DER, + serialization.Encoding.SMIME, + ): + raise ValueError( + "Must be PEM, DER, or SMIME from the Encoding enum" + ) + + # Text is a meaningless option unless it is accompanied by + # DetachedSignature + if ( + PKCS7Options.Text in options + and PKCS7Options.DetachedSignature not in options + ): + raise ValueError( + "When passing the Text option you must also pass " + "DetachedSignature" + ) + + if PKCS7Options.Text in options and encoding in ( + serialization.Encoding.DER, + serialization.Encoding.PEM, + ): + raise ValueError( + "The Text option is only available for SMIME serialization" + ) + + # No attributes implies no capabilities so we'll error if you try to + # pass both. + if ( + PKCS7Options.NoAttributes in options + and PKCS7Options.NoCapabilities in options + ): + raise ValueError( + "NoAttributes is a superset of NoCapabilities. Do not pass " + "both values." + ) + + return rust_pkcs7.sign_and_serialize(self, encoding, options) + + +class PKCS7EnvelopeBuilder: + def __init__( + self, + *, + _data: bytes | None = None, + _recipients: list[x509.Certificate] | None = None, + ): + from cryptography.hazmat.backends.openssl.backend import ( + backend as ossl, + ) + + if not ossl.rsa_encryption_supported(padding=padding.PKCS1v15()): + raise UnsupportedAlgorithm( + "RSA with PKCS1 v1.5 padding is not supported by this version" + " of OpenSSL.", + _Reasons.UNSUPPORTED_PADDING, + ) + self._data = _data + self._recipients = _recipients if _recipients is not None else [] + + def set_data(self, data: bytes) -> PKCS7EnvelopeBuilder: + _check_byteslike("data", data) + if self._data is not None: + raise ValueError("data may only be set once") + + return PKCS7EnvelopeBuilder(_data=data, _recipients=self._recipients) + + def add_recipient( + self, + certificate: x509.Certificate, + ) -> PKCS7EnvelopeBuilder: + if not isinstance(certificate, x509.Certificate): + raise TypeError("certificate must be a x509.Certificate") + + if not isinstance(certificate.public_key(), rsa.RSAPublicKey): + raise TypeError("Only RSA keys are supported at this time.") + + return PKCS7EnvelopeBuilder( + _data=self._data, + _recipients=[ + *self._recipients, + certificate, + ], + ) + + def encrypt( + self, + encoding: serialization.Encoding, + options: typing.Iterable[PKCS7Options], + ) -> bytes: + if len(self._recipients) == 0: + raise ValueError("Must have at least one recipient") + if self._data is None: + raise ValueError("You must add data to encrypt") + options = list(options) + if not all(isinstance(x, PKCS7Options) for x in options): + raise ValueError("options must be from the PKCS7Options enum") + if encoding not in ( + serialization.Encoding.PEM, + serialization.Encoding.DER, + serialization.Encoding.SMIME, + ): + raise ValueError( + "Must be PEM, DER, or SMIME from the Encoding enum" + ) + + # Only allow options that make sense for encryption + if any( + opt not in [PKCS7Options.Text, PKCS7Options.Binary] + for opt in options + ): + raise ValueError( + "Only the following options are supported for encryption: " + "Text, Binary" + ) + elif PKCS7Options.Text in options and PKCS7Options.Binary in options: + # OpenSSL accepts both options at the same time, but ignores Text. + # We fail defensively to avoid unexpected outputs. + raise ValueError( + "Cannot use Binary and Text options at the same time" + ) + + return rust_pkcs7.encrypt_and_serialize(self, encoding, options) + + +pkcs7_decrypt_der = rust_pkcs7.decrypt_der +pkcs7_decrypt_pem = rust_pkcs7.decrypt_pem +pkcs7_decrypt_smime = rust_pkcs7.decrypt_smime + + +def _smime_signed_encode( + data: bytes, signature: bytes, micalg: str, text_mode: bool +) -> bytes: + # This function works pretty hard to replicate what OpenSSL does + # precisely. For good and for ill. + + m = email.message.Message() + m.add_header("MIME-Version", "1.0") + m.add_header( + "Content-Type", + "multipart/signed", + protocol="application/x-pkcs7-signature", + micalg=micalg, + ) + + m.preamble = "This is an S/MIME signed message\n" + + msg_part = OpenSSLMimePart() + msg_part.set_payload(data) + if text_mode: + msg_part.add_header("Content-Type", "text/plain") + m.attach(msg_part) + + sig_part = email.message.MIMEPart() + sig_part.add_header( + "Content-Type", "application/x-pkcs7-signature", name="smime.p7s" + ) + sig_part.add_header("Content-Transfer-Encoding", "base64") + sig_part.add_header( + "Content-Disposition", "attachment", filename="smime.p7s" + ) + sig_part.set_payload( + email.base64mime.body_encode(signature, maxlinelen=65) + ) + del sig_part["MIME-Version"] + m.attach(sig_part) + + fp = io.BytesIO() + g = email.generator.BytesGenerator( + fp, + maxheaderlen=0, + mangle_from_=False, + policy=m.policy.clone(linesep="\r\n"), + ) + g.flatten(m) + return fp.getvalue() + + +def _smime_enveloped_encode(data: bytes) -> bytes: + m = email.message.Message() + m.add_header("MIME-Version", "1.0") + m.add_header("Content-Disposition", "attachment", filename="smime.p7m") + m.add_header( + "Content-Type", + "application/pkcs7-mime", + smime_type="enveloped-data", + name="smime.p7m", + ) + m.add_header("Content-Transfer-Encoding", "base64") + + m.set_payload(email.base64mime.body_encode(data, maxlinelen=65)) + + return m.as_bytes(policy=m.policy.clone(linesep="\n", max_line_length=0)) + + +def _smime_enveloped_decode(data: bytes) -> bytes: + m = email.message_from_bytes(data) + if m.get_content_type() not in { + "application/x-pkcs7-mime", + "application/pkcs7-mime", + }: + raise ValueError("Not an S/MIME enveloped message") + return bytes(m.get_payload(decode=True)) + + +def _smime_remove_text_headers(data: bytes) -> bytes: + m = email.message_from_bytes(data) + # Using get() instead of get_content_type() since it has None as default, + # where the latter has "text/plain". Both methods are case-insensitive. + content_type = m.get("content-type") + if content_type is None: + raise ValueError( + "Decrypted MIME data has no 'Content-Type' header. " + "Please remove the 'Text' option to parse it manually." + ) + if "text/plain" not in content_type: + raise ValueError( + f"Decrypted MIME data content type is '{content_type}', not " + "'text/plain'. Remove the 'Text' option to parse it manually." + ) + return bytes(m.get_payload(decode=True)) + + +class OpenSSLMimePart(email.message.MIMEPart): + # A MIMEPart subclass that replicates OpenSSL's behavior of not including + # a newline if there are no headers. + def _write_headers(self, generator) -> None: + if list(self.raw_items()): + generator._write_headers(self) diff --git a/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/ssh.py b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/ssh.py new file mode 100644 index 00000000..c01afb0c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/cryptography/hazmat/primitives/serialization/ssh.py @@ -0,0 +1,1569 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import annotations + +import binascii +import enum +import os +import re +import typing +import warnings +from base64 import encodebytes as _base64_encode +from dataclasses import dataclass + +from cryptography import utils +from cryptography.exceptions import UnsupportedAlgorithm +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ( + dsa, + ec, + ed25519, + padding, + rsa, +) +from cryptography.hazmat.primitives.asymmetric import utils as asym_utils +from cryptography.hazmat.primitives.ciphers import ( + AEADDecryptionContext, + Cipher, + algorithms, + modes, +) +from cryptography.hazmat.primitives.serialization import ( + Encoding, + KeySerializationEncryption, + NoEncryption, + PrivateFormat, + PublicFormat, + _KeySerializationEncryption, +) + +try: + from bcrypt import kdf as _bcrypt_kdf + + _bcrypt_supported = True +except ImportError: + _bcrypt_supported = False + + def _bcrypt_kdf( + password: bytes, + salt: bytes, + desired_key_bytes: int, + rounds: int, + ignore_few_rounds: bool = False, + ) -> bytes: + raise UnsupportedAlgorithm("Need bcrypt module") + + +_SSH_ED25519 = b"ssh-ed25519" +_SSH_RSA = b"ssh-rsa" +_SSH_DSA = b"ssh-dss" +_ECDSA_NISTP256 = b"ecdsa-sha2-nistp256" +_ECDSA_NISTP384 = b"ecdsa-sha2-nistp384" +_ECDSA_NISTP521 = b"ecdsa-sha2-nistp521" +_CERT_SUFFIX = b"-cert-v01@openssh.com" + +# U2F application string suffixed pubkey +_SK_SSH_ED25519 = b"sk-ssh-ed25519@openssh.com" +_SK_SSH_ECDSA_NISTP256 = b"sk-ecdsa-sha2-nistp256@openssh.com" + +# These are not key types, only algorithms, so they cannot appear +# as a public key type +_SSH_RSA_SHA256 = b"rsa-sha2-256" +_SSH_RSA_SHA512 = b"rsa-sha2-512" + +_SSH_PUBKEY_RC = re.compile(rb"\A(\S+)[ \t]+(\S+)") +_SK_MAGIC = b"openssh-key-v1\0" +_SK_START = b"-----BEGIN OPENSSH PRIVATE KEY-----" +_SK_END = b"-----END OPENSSH PRIVATE KEY-----" +_BCRYPT = b"bcrypt" +_NONE = b"none" +_DEFAULT_CIPHER = b"aes256-ctr" +_DEFAULT_ROUNDS = 16 + +# re is only way to work on bytes-like data +_PEM_RC = re.compile(_SK_START + b"(.*?)" + _SK_END, re.DOTALL) + +# padding for max blocksize +_PADDING = memoryview(bytearray(range(1, 1 + 16))) + + +@dataclass +class _SSHCipher: + alg: type[algorithms.AES] + key_len: int + mode: type[modes.CTR] | type[modes.CBC] | type[modes.GCM] + block_len: int + iv_len: int + tag_len: int | None + is_aead: bool + + +# ciphers that are actually used in key wrapping +_SSH_CIPHERS: dict[bytes, _SSHCipher] = { + b"aes256-ctr": _SSHCipher( + alg=algorithms.AES, + key_len=32, + mode=modes.CTR, + block_len=16, + iv_len=16, + tag_len=None, + is_aead=False, + ), + b"aes256-cbc": _SSHCipher( + alg=algorithms.AES, + key_len=32, + mode=modes.CBC, + block_len=16, + iv_len=16, + tag_len=None, + is_aead=False, + ), + b"aes256-gcm@openssh.com": _SSHCipher( + alg=algorithms.AES, + key_len=32, + mode=modes.GCM, + block_len=16, + iv_len=12, + tag_len=16, + is_aead=True, + ), +} + +# map local curve name to key type +_ECDSA_KEY_TYPE = { + "secp256r1": _ECDSA_NISTP256, + "secp384r1": _ECDSA_NISTP384, + "secp521r1": _ECDSA_NISTP521, +} + + +def _get_ssh_key_type(key: SSHPrivateKeyTypes | SSHPublicKeyTypes) -> bytes: + if isinstance(key, ec.EllipticCurvePrivateKey): + key_type = _ecdsa_key_type(key.public_key()) + elif isinstance(key, ec.EllipticCurvePublicKey): + key_type = _ecdsa_key_type(key) + elif isinstance(key, (rsa.RSAPrivateKey, rsa.RSAPublicKey)): + key_type = _SSH_RSA + elif isinstance(key, (dsa.DSAPrivateKey, dsa.DSAPublicKey)): + key_type = _SSH_DSA + elif isinstance( + key, (ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey) + ): + key_type = _SSH_ED25519 + else: + raise ValueError("Unsupported key type") + + return key_type + + +def _ecdsa_key_type(public_key: ec.EllipticCurvePublicKey) -> bytes: + """Return SSH key_type and curve_name for private key.""" + curve = public_key.curve + if curve.name not in _ECDSA_KEY_TYPE: + raise ValueError( + f"Unsupported curve for ssh private key: {curve.name!r}" + ) + return _ECDSA_KEY_TYPE[curve.name] + + +def _ssh_pem_encode( + data: bytes, + prefix: bytes = _SK_START + b"\n", + suffix: bytes = _SK_END + b"\n", +) -> bytes: + return b"".join([prefix, _base64_encode(data), suffix]) + + +def _check_block_size(data: bytes, block_len: int) -> None: + """Require data to be full blocks""" + if not data or len(data) % block_len != 0: + raise ValueError("Corrupt data: missing padding") + + +def _check_empty(data: bytes) -> None: + """All data should have been parsed.""" + if data: + raise ValueError("Corrupt data: unparsed data") + + +def _init_cipher( + ciphername: bytes, + password: bytes | None, + salt: bytes, + rounds: int, +) -> Cipher[modes.CBC | modes.CTR | modes.GCM]: + """Generate key + iv and return cipher.""" + if not password: + raise ValueError("Key is password-protected.") + + ciph = _SSH_CIPHERS[ciphername] + seed = _bcrypt_kdf( + password, salt, ciph.key_len + ciph.iv_len, rounds, True + ) + return Cipher( + ciph.alg(seed[: ciph.key_len]), + ciph.mode(seed[ciph.key_len :]), + ) + + +def _get_u32(data: memoryview) -> tuple[int, memoryview]: + """Uint32""" + if len(data) < 4: + raise ValueError("Invalid data") + return int.from_bytes(data[:4], byteorder="big"), data[4:] + + +def _get_u64(data: memoryview) -> tuple[int, memoryview]: + """Uint64""" + if len(data) < 8: + raise ValueError("Invalid data") + return int.from_bytes(data[:8], byteorder="big"), data[8:] + + +def _get_sshstr(data: memoryview) -> tuple[memoryview, memoryview]: + """Bytes with u32 length prefix""" + n, data = _get_u32(data) + if n > len(data): + raise ValueError("Invalid data") + return data[:n], data[n:] + + +def _get_mpint(data: memoryview) -> tuple[int, memoryview]: + """Big integer.""" + val, data = _get_sshstr(data) + if val and val[0] > 0x7F: + raise ValueError("Invalid data") + return int.from_bytes(val, "big"), data + + +def _to_mpint(val: int) -> bytes: + """Storage format for signed bigint.""" + if val < 0: + raise ValueError("negative mpint not allowed") + if not val: + return b"" + nbytes = (val.bit_length() + 8) // 8 + return utils.int_to_bytes(val, nbytes) + + +class _FragList: + """Build recursive structure without data copy.""" + + flist: list[bytes] + + def __init__(self, init: list[bytes] | None = None) -> None: + self.flist = [] + if init: + self.flist.extend(init) + + def put_raw(self, val: bytes) -> None: + """Add plain bytes""" + self.flist.append(val) + + def put_u32(self, val: int) -> None: + """Big-endian uint32""" + self.flist.append(val.to_bytes(length=4, byteorder="big")) + + def put_u64(self, val: int) -> None: + """Big-endian uint64""" + self.flist.append(val.to_bytes(length=8, byteorder="big")) + + def put_sshstr(self, val: bytes | _FragList) -> None: + """Bytes prefixed with u32 length""" + if isinstance(val, (bytes, memoryview, bytearray)): + self.put_u32(len(val)) + self.flist.append(val) + else: + self.put_u32(val.size()) + self.flist.extend(val.flist) + + def put_mpint(self, val: int) -> None: + """Big-endian bigint prefixed with u32 length""" + self.put_sshstr(_to_mpint(val)) + + def size(self) -> int: + """Current number of bytes""" + return sum(map(len, self.flist)) + + def render(self, dstbuf: memoryview, pos: int = 0) -> int: + """Write into bytearray""" + for frag in self.flist: + flen = len(frag) + start, pos = pos, pos + flen + dstbuf[start:pos] = frag + return pos + + def tobytes(self) -> bytes: + """Return as bytes""" + buf = memoryview(bytearray(self.size())) + self.render(buf) + return buf.tobytes() + + +class _SSHFormatRSA: + """Format for RSA keys. + + Public: + mpint e, n + Private: + mpint n, e, d, iqmp, p, q + """ + + def get_public( + self, data: memoryview + ) -> tuple[tuple[int, int], memoryview]: + """RSA public fields""" + e, data = _get_mpint(data) + n, data = _get_mpint(data) + return (e, n), data + + def load_public( + self, data: memoryview + ) -> tuple[rsa.RSAPublicKey, memoryview]: + """Make RSA public key from data.""" + (e, n), data = self.get_public(data) + public_numbers = rsa.RSAPublicNumbers(e, n) + public_key = public_numbers.public_key() + return public_key, data + + def load_private( + self, data: memoryview, pubfields + ) -> tuple[rsa.RSAPrivateKey, memoryview]: + """Make RSA private key from data.""" + n, data = _get_mpint(data) + e, data = _get_mpint(data) + d, data = _get_mpint(data) + iqmp, data = _get_mpint(data) + p, data = _get_mpint(data) + q, data = _get_mpint(data) + + if (e, n) != pubfields: + raise ValueError("Corrupt data: rsa field mismatch") + dmp1 = rsa.rsa_crt_dmp1(d, p) + dmq1 = rsa.rsa_crt_dmq1(d, q) + public_numbers = rsa.RSAPublicNumbers(e, n) + private_numbers = rsa.RSAPrivateNumbers( + p, q, d, dmp1, dmq1, iqmp, public_numbers + ) + private_key = private_numbers.private_key() + return private_key, data + + def encode_public( + self, public_key: rsa.RSAPublicKey, f_pub: _FragList + ) -> None: + """Write RSA public key""" + pubn = public_key.public_numbers() + f_pub.put_mpint(pubn.e) + f_pub.put_mpint(pubn.n) + + def encode_private( + self, private_key: rsa.RSAPrivateKey, f_priv: _FragList + ) -> None: + """Write RSA private key""" + private_numbers = private_key.private_numbers() + public_numbers = private_numbers.public_numbers + + f_priv.put_mpint(public_numbers.n) + f_priv.put_mpint(public_numbers.e) + + f_priv.put_mpint(private_numbers.d) + f_priv.put_mpint(private_numbers.iqmp) + f_priv.put_mpint(private_numbers.p) + f_priv.put_mpint(private_numbers.q) + + +class _SSHFormatDSA: + """Format for DSA keys. + + Public: + mpint p, q, g, y + Private: + mpint p, q, g, y, x + """ + + def get_public(self, data: memoryview) -> tuple[tuple, memoryview]: + """DSA public fields""" + p, data = _get_mpint(data) + q, data = _get_mpint(data) + g, data = _get_mpint(data) + y, data = _get_mpint(data) + return (p, q, g, y), data + + def load_public( + self, data: memoryview + ) -> tuple[dsa.DSAPublicKey, memoryview]: + """Make DSA public key from data.""" + (p, q, g, y), data = self.get_public(data) + parameter_numbers = dsa.DSAParameterNumbers(p, q, g) + public_numbers = dsa.DSAPublicNumbers(y, parameter_numbers) + self._validate(public_numbers) + public_key = public_numbers.public_key() + return public_key, data + + def load_private( + self, data: memoryview, pubfields + ) -> tuple[dsa.DSAPrivateKey, memoryview]: + """Make DSA private key from data.""" + (p, q, g, y), data = self.get_public(data) + x, data = _get_mpint(data) + + if (p, q, g, y) != pubfields: + raise ValueError("Corrupt data: dsa field mismatch") + parameter_numbers = dsa.DSAParameterNumbers(p, q, g) + public_numbers = dsa.DSAPublicNumbers(y, parameter_numbers) + self._validate(public_numbers) + private_numbers = dsa.DSAPrivateNumbers(x, public_numbers) + private_key = private_numbers.private_key() + return private_key, data + + def encode_public( + self, public_key: dsa.DSAPublicKey, f_pub: _FragList + ) -> None: + """Write DSA public key""" + public_numbers = public_key.public_numbers() + parameter_numbers = public_numbers.parameter_numbers + self._validate(public_numbers) + + f_pub.put_mpint(parameter_numbers.p) + f_pub.put_mpint(parameter_numbers.q) + f_pub.put_mpint(parameter_numbers.g) + f_pub.put_mpint(public_numbers.y) + + def encode_private( + self, private_key: dsa.DSAPrivateKey, f_priv: _FragList + ) -> None: + """Write DSA private key""" + self.encode_public(private_key.public_key(), f_priv) + f_priv.put_mpint(private_key.private_numbers().x) + + def _validate(self, public_numbers: dsa.DSAPublicNumbers) -> None: + parameter_numbers = public_numbers.parameter_numbers + if parameter_numbers.p.bit_length() != 1024: + raise ValueError("SSH supports only 1024 bit DSA keys") + + +class _SSHFormatECDSA: + """Format for ECDSA keys. + + Public: + str curve + bytes point + Private: + str curve + bytes point + mpint secret + """ + + def __init__(self, ssh_curve_name: bytes, curve: ec.EllipticCurve): + self.ssh_curve_name = ssh_curve_name + self.curve = curve + + def get_public( + self, data: memoryview + ) -> tuple[tuple[memoryview, memoryview], memoryview]: + """ECDSA public fields""" + curve, data = _get_sshstr(data) + point, data = _get_sshstr(data) + if curve != self.ssh_curve_name: + raise ValueError("Curve name mismatch") + if point[0] != 4: + raise NotImplementedError("Need uncompressed point") + return (curve, point), data + + def load_public( + self, data: memoryview + ) -> tuple[ec.EllipticCurvePublicKey, memoryview]: + """Make ECDSA public key from data.""" + (_, point), data = self.get_public(data) + public_key = ec.EllipticCurvePublicKey.from_encoded_point( + self.curve, point.tobytes() + ) + return public_key, data + + def load_private( + self, data: memoryview, pubfields + ) -> tuple[ec.EllipticCurvePrivateKey, memoryview]: + """Make ECDSA private key from data.""" + (curve_name, point), data = self.get_public(data) + secret, data = _get_mpint(data) + + if (curve_name, point) != pubfields: + raise ValueError("Corrupt data: ecdsa field mismatch") + private_key = ec.derive_private_key(secret, self.curve) + return private_key, data + + def encode_public( + self, public_key: ec.EllipticCurvePublicKey, f_pub: _FragList + ) -> None: + """Write ECDSA public key""" + point = public_key.public_bytes( + Encoding.X962, PublicFormat.UncompressedPoint + ) + f_pub.put_sshstr(self.ssh_curve_name) + f_pub.put_sshstr(point) + + def encode_private( + self, private_key: ec.EllipticCurvePrivateKey, f_priv: _FragList + ) -> None: + """Write ECDSA private key""" + public_key = private_key.public_key() + private_numbers = private_key.private_numbers() + + self.encode_public(public_key, f_priv) + f_priv.put_mpint(private_numbers.private_value) + + +class _SSHFormatEd25519: + """Format for Ed25519 keys. + + Public: + bytes point + Private: + bytes point + bytes secret_and_point + """ + + def get_public( + self, data: memoryview + ) -> tuple[tuple[memoryview], memoryview]: + """Ed25519 public fields""" + point, data = _get_sshstr(data) + return (point,), data + + def load_public( + self, data: memoryview + ) -> tuple[ed25519.Ed25519PublicKey, memoryview]: + """Make Ed25519 public key from data.""" + (point,), data = self.get_public(data) + public_key = ed25519.Ed25519PublicKey.from_public_bytes( + point.tobytes() + ) + return public_key, data + + def load_private( + self, data: memoryview, pubfields + ) -> tuple[ed25519.Ed25519PrivateKey, memoryview]: + """Make Ed25519 private key from data.""" + (point,), data = self.get_public(data) + keypair, data = _get_sshstr(data) + + secret = keypair[:32] + point2 = keypair[32:] + if point != point2 or (point,) != pubfields: + raise ValueError("Corrupt data: ed25519 field mismatch") + private_key = ed25519.Ed25519PrivateKey.from_private_bytes(secret) + return private_key, data + + def encode_public( + self, public_key: ed25519.Ed25519PublicKey, f_pub: _FragList + ) -> None: + """Write Ed25519 public key""" + raw_public_key = public_key.public_bytes( + Encoding.Raw, PublicFormat.Raw + ) + f_pub.put_sshstr(raw_public_key) + + def encode_private( + self, private_key: ed25519.Ed25519PrivateKey, f_priv: _FragList + ) -> None: + """Write Ed25519 private key""" + public_key = private_key.public_key() + raw_private_key = private_key.private_bytes( + Encoding.Raw, PrivateFormat.Raw, NoEncryption() + ) + raw_public_key = public_key.public_bytes( + Encoding.Raw, PublicFormat.Raw + ) + f_keypair = _FragList([raw_private_key, raw_public_key]) + + self.encode_public(public_key, f_priv) + f_priv.put_sshstr(f_keypair) + + +def load_application(data) -> tuple[memoryview, memoryview]: + """ + U2F application strings + """ + application, data = _get_sshstr(data) + if not application.tobytes().startswith(b"ssh:"): + raise ValueError( + "U2F application string does not start with b'ssh:' " + f"({application})" + ) + return application, data + + +class _SSHFormatSKEd25519: + """ + The format of a sk-ssh-ed25519@openssh.com public key is: + + string "sk-ssh-ed25519@openssh.com" + string public key + string application (user-specified, but typically "ssh:") + """ + + def load_public( + self, data: memoryview + ) -> tuple[ed25519.Ed25519PublicKey, memoryview]: + """Make Ed25519 public key from data.""" + public_key, data = _lookup_kformat(_SSH_ED25519).load_public(data) + _, data = load_application(data) + return public_key, data + + +class _SSHFormatSKECDSA: + """ + The format of a sk-ecdsa-sha2-nistp256@openssh.com public key is: + + string "sk-ecdsa-sha2-nistp256@openssh.com" + string curve name + ec_point Q + string application (user-specified, but typically "ssh:") + """ + + def load_public( + self, data: memoryview + ) -> tuple[ec.EllipticCurvePublicKey, memoryview]: + """Make ECDSA public key from data.""" + public_key, data = _lookup_kformat(_ECDSA_NISTP256).load_public(data) + _, data = load_application(data) + return public_key, data + + +_KEY_FORMATS = { + _SSH_RSA: _SSHFormatRSA(), + _SSH_DSA: _SSHFormatDSA(), + _SSH_ED25519: _SSHFormatEd25519(), + _ECDSA_NISTP256: _SSHFormatECDSA(b"nistp256", ec.SECP256R1()), + _ECDSA_NISTP384: _SSHFormatECDSA(b"nistp384", ec.SECP384R1()), + _ECDSA_NISTP521: _SSHFormatECDSA(b"nistp521", ec.SECP521R1()), + _SK_SSH_ED25519: _SSHFormatSKEd25519(), + _SK_SSH_ECDSA_NISTP256: _SSHFormatSKECDSA(), +} + + +def _lookup_kformat(key_type: bytes): + """Return valid format or throw error""" + if not isinstance(key_type, bytes): + key_type = memoryview(key_type).tobytes() + if key_type in _KEY_FORMATS: + return _KEY_FORMATS[key_type] + raise UnsupportedAlgorithm(f"Unsupported key type: {key_type!r}") + + +SSHPrivateKeyTypes = typing.Union[ + ec.EllipticCurvePrivateKey, + rsa.RSAPrivateKey, + dsa.DSAPrivateKey, + ed25519.Ed25519PrivateKey, +] + + +def load_ssh_private_key( + data: bytes, + password: bytes | None, + backend: typing.Any = None, +) -> SSHPrivateKeyTypes: + """Load private key from OpenSSH custom encoding.""" + utils._check_byteslike("data", data) + if password is not None: + utils._check_bytes("password", password) + + m = _PEM_RC.search(data) + if not m: + raise ValueError("Not OpenSSH private key format") + p1 = m.start(1) + p2 = m.end(1) + data = binascii.a2b_base64(memoryview(data)[p1:p2]) + if not data.startswith(_SK_MAGIC): + raise ValueError("Not OpenSSH private key format") + data = memoryview(data)[len(_SK_MAGIC) :] + + # parse header + ciphername, data = _get_sshstr(data) + kdfname, data = _get_sshstr(data) + kdfoptions, data = _get_sshstr(data) + nkeys, data = _get_u32(data) + if nkeys != 1: + raise ValueError("Only one key supported") + + # load public key data + pubdata, data = _get_sshstr(data) + pub_key_type, pubdata = _get_sshstr(pubdata) + kformat = _lookup_kformat(pub_key_type) + pubfields, pubdata = kformat.get_public(pubdata) + _check_empty(pubdata) + + if (ciphername, kdfname) != (_NONE, _NONE): + ciphername_bytes = ciphername.tobytes() + if ciphername_bytes not in _SSH_CIPHERS: + raise UnsupportedAlgorithm( + f"Unsupported cipher: {ciphername_bytes!r}" + ) + if kdfname != _BCRYPT: + raise UnsupportedAlgorithm(f"Unsupported KDF: {kdfname!r}") + blklen = _SSH_CIPHERS[ciphername_bytes].block_len + tag_len = _SSH_CIPHERS[ciphername_bytes].tag_len + # load secret data + edata, data = _get_sshstr(data) + # see https://bugzilla.mindrot.org/show_bug.cgi?id=3553 for + # information about how OpenSSH handles AEAD tags + if _SSH_CIPHERS[ciphername_bytes].is_aead: + tag = bytes(data) + if len(tag) != tag_len: + raise ValueError("Corrupt data: invalid tag length for cipher") + else: + _check_empty(data) + _check_block_size(edata, blklen) + salt, kbuf = _get_sshstr(kdfoptions) + rounds, kbuf = _get_u32(kbuf) + _check_empty(kbuf) + ciph = _init_cipher(ciphername_bytes, password, salt.tobytes(), rounds) + dec = ciph.decryptor() + edata = memoryview(dec.update(edata)) + if _SSH_CIPHERS[ciphername_bytes].is_aead: + assert isinstance(dec, AEADDecryptionContext) + _check_empty(dec.finalize_with_tag(tag)) + else: + # _check_block_size requires data to be a full block so there + # should be no output from finalize + _check_empty(dec.finalize()) + else: + # load secret data + edata, data = _get_sshstr(data) + _check_empty(data) + blklen = 8 + _check_block_size(edata, blklen) + ck1, edata = _get_u32(edata) + ck2, edata = _get_u32(edata) + if ck1 != ck2: + raise ValueError("Corrupt data: broken checksum") + + # load per-key struct + key_type, edata = _get_sshstr(edata) + if key_type != pub_key_type: + raise ValueError("Corrupt data: key type mismatch") + private_key, edata = kformat.load_private(edata, pubfields) + # We don't use the comment + _, edata = _get_sshstr(edata) + + # yes, SSH does padding check *after* all other parsing is done. + # need to follow as it writes zero-byte padding too. + if edata != _PADDING[: len(edata)]: + raise ValueError("Corrupt data: invalid padding") + + if isinstance(private_key, dsa.DSAPrivateKey): + warnings.warn( + "SSH DSA keys are deprecated and will be removed in a future " + "release.", + utils.DeprecatedIn40, + stacklevel=2, + ) + + return private_key + + +def _serialize_ssh_private_key( + private_key: SSHPrivateKeyTypes, + password: bytes, + encryption_algorithm: KeySerializationEncryption, +) -> bytes: + """Serialize private key with OpenSSH custom encoding.""" + utils._check_bytes("password", password) + if isinstance(private_key, dsa.DSAPrivateKey): + warnings.warn( + "SSH DSA key support is deprecated and will be " + "removed in a future release", + utils.DeprecatedIn40, + stacklevel=4, + ) + + key_type = _get_ssh_key_type(private_key) + kformat = _lookup_kformat(key_type) + + # setup parameters + f_kdfoptions = _FragList() + if password: + ciphername = _DEFAULT_CIPHER + blklen = _SSH_CIPHERS[ciphername].block_len + kdfname = _BCRYPT + rounds = _DEFAULT_ROUNDS + if ( + isinstance(encryption_algorithm, _KeySerializationEncryption) + and encryption_algorithm._kdf_rounds is not None + ): + rounds = encryption_algorithm._kdf_rounds + salt = os.urandom(16) + f_kdfoptions.put_sshstr(salt) + f_kdfoptions.put_u32(rounds) + ciph = _init_cipher(ciphername, password, salt, rounds) + else: + ciphername = kdfname = _NONE + blklen = 8 + ciph = None + nkeys = 1 + checkval = os.urandom(4) + comment = b"" + + # encode public and private parts together + f_public_key = _FragList() + f_public_key.put_sshstr(key_type) + kformat.encode_public(private_key.public_key(), f_public_key) + + f_secrets = _FragList([checkval, checkval]) + f_secrets.put_sshstr(key_type) + kformat.encode_private(private_key, f_secrets) + f_secrets.put_sshstr(comment) + f_secrets.put_raw(_PADDING[: blklen - (f_secrets.size() % blklen)]) + + # top-level structure + f_main = _FragList() + f_main.put_raw(_SK_MAGIC) + f_main.put_sshstr(ciphername) + f_main.put_sshstr(kdfname) + f_main.put_sshstr(f_kdfoptions) + f_main.put_u32(nkeys) + f_main.put_sshstr(f_public_key) + f_main.put_sshstr(f_secrets) + + # copy result info bytearray + slen = f_secrets.size() + mlen = f_main.size() + buf = memoryview(bytearray(mlen + blklen)) + f_main.render(buf) + ofs = mlen - slen + + # encrypt in-place + if ciph is not None: + ciph.encryptor().update_into(buf[ofs:mlen], buf[ofs:]) + + return _ssh_pem_encode(buf[:mlen]) + + +SSHPublicKeyTypes = typing.Union[ + ec.EllipticCurvePublicKey, + rsa.RSAPublicKey, + dsa.DSAPublicKey, + ed25519.Ed25519PublicKey, +] + +SSHCertPublicKeyTypes = typing.Union[ + ec.EllipticCurvePublicKey, + rsa.RSAPublicKey, + ed25519.Ed25519PublicKey, +] + + +class SSHCertificateType(enum.Enum): + USER = 1 + HOST = 2 + + +class SSHCertificate: + def __init__( + self, + _nonce: memoryview, + _public_key: SSHPublicKeyTypes, + _serial: int, + _cctype: int, + _key_id: memoryview, + _valid_principals: list[bytes], + _valid_after: int, + _valid_before: int, + _critical_options: dict[bytes, bytes], + _extensions: dict[bytes, bytes], + _sig_type: memoryview, + _sig_key: memoryview, + _inner_sig_type: memoryview, + _signature: memoryview, + _tbs_cert_body: memoryview, + _cert_key_type: bytes, + _cert_body: memoryview, + ): + self._nonce = _nonce + self._public_key = _public_key + self._serial = _serial + try: + self._type = SSHCertificateType(_cctype) + except ValueError: + raise ValueError("Invalid certificate type") + self._key_id = _key_id + self._valid_principals = _valid_principals + self._valid_after = _valid_after + self._valid_before = _valid_before + self._critical_options = _critical_options + self._extensions = _extensions + self._sig_type = _sig_type + self._sig_key = _sig_key + self._inner_sig_type = _inner_sig_type + self._signature = _signature + self._cert_key_type = _cert_key_type + self._cert_body = _cert_body + self._tbs_cert_body = _tbs_cert_body + + @property + def nonce(self) -> bytes: + return bytes(self._nonce) + + def public_key(self) -> SSHCertPublicKeyTypes: + # make mypy happy until we remove DSA support entirely and + # the underlying union won't have a disallowed type + return typing.cast(SSHCertPublicKeyTypes, self._public_key) + + @property + def serial(self) -> int: + return self._serial + + @property + def type(self) -> SSHCertificateType: + return self._type + + @property + def key_id(self) -> bytes: + return bytes(self._key_id) + + @property + def valid_principals(self) -> list[bytes]: + return self._valid_principals + + @property + def valid_before(self) -> int: + return self._valid_before + + @property + def valid_after(self) -> int: + return self._valid_after + + @property + def critical_options(self) -> dict[bytes, bytes]: + return self._critical_options + + @property + def extensions(self) -> dict[bytes, bytes]: + return self._extensions + + def signature_key(self) -> SSHCertPublicKeyTypes: + sigformat = _lookup_kformat(self._sig_type) + signature_key, sigkey_rest = sigformat.load_public(self._sig_key) + _check_empty(sigkey_rest) + return signature_key + + def public_bytes(self) -> bytes: + return ( + bytes(self._cert_key_type) + + b" " + + binascii.b2a_base64(bytes(self._cert_body), newline=False) + ) + + def verify_cert_signature(self) -> None: + signature_key = self.signature_key() + if isinstance(signature_key, ed25519.Ed25519PublicKey): + signature_key.verify( + bytes(self._signature), bytes(self._tbs_cert_body) + ) + elif isinstance(signature_key, ec.EllipticCurvePublicKey): + # The signature is encoded as a pair of big-endian integers + r, data = _get_mpint(self._signature) + s, data = _get_mpint(data) + _check_empty(data) + computed_sig = asym_utils.encode_dss_signature(r, s) + hash_alg = _get_ec_hash_alg(signature_key.curve) + signature_key.verify( + computed_sig, bytes(self._tbs_cert_body), ec.ECDSA(hash_alg) + ) + else: + assert isinstance(signature_key, rsa.RSAPublicKey) + if self._inner_sig_type == _SSH_RSA: + hash_alg = hashes.SHA1() + elif self._inner_sig_type == _SSH_RSA_SHA256: + hash_alg = hashes.SHA256() + else: + assert self._inner_sig_type == _SSH_RSA_SHA512 + hash_alg = hashes.SHA512() + signature_key.verify( + bytes(self._signature), + bytes(self._tbs_cert_body), + padding.PKCS1v15(), + hash_alg, + ) + + +def _get_ec_hash_alg(curve: ec.EllipticCurve) -> hashes.HashAlgorithm: + if isinstance(curve, ec.SECP256R1): + return hashes.SHA256() + elif isinstance(curve, ec.SECP384R1): + return hashes.SHA384() + else: + assert isinstance(curve, ec.SECP521R1) + return hashes.SHA512() + + +def _load_ssh_public_identity( + data: bytes, + _legacy_dsa_allowed=False, +) -> SSHCertificate | SSHPublicKeyTypes: + utils._check_byteslike("data", data) + + m = _SSH_PUBKEY_RC.match(data) + if not m: + raise ValueError("Invalid line format") + key_type = orig_key_type = m.group(1) + key_body = m.group(2) + with_cert = False + if key_type.endswith(_CERT_SUFFIX): + with_cert = True + key_type = key_type[: -len(_CERT_SUFFIX)] + if key_type == _SSH_DSA and not _legacy_dsa_allowed: + raise UnsupportedAlgorithm( + "DSA keys aren't supported in SSH certificates" + ) + kformat = _lookup_kformat(key_type) + + try: + rest = memoryview(binascii.a2b_base64(key_body)) + except (TypeError, binascii.Error): + raise ValueError("Invalid format") + + if with_cert: + cert_body = rest + inner_key_type, rest = _get_sshstr(rest) + if inner_key_type != orig_key_type: + raise ValueError("Invalid key format") + if with_cert: + nonce, rest = _get_sshstr(rest) + public_key, rest = kformat.load_public(rest) + if with_cert: + serial, rest = _get_u64(rest) + cctype, rest = _get_u32(rest) + key_id, rest = _get_sshstr(rest) + principals, rest = _get_sshstr(rest) + valid_principals = [] + while principals: + principal, principals = _get_sshstr(principals) + valid_principals.append(bytes(principal)) + valid_after, rest = _get_u64(rest) + valid_before, rest = _get_u64(rest) + crit_options, rest = _get_sshstr(rest) + critical_options = _parse_exts_opts(crit_options) + exts, rest = _get_sshstr(rest) + extensions = _parse_exts_opts(exts) + # Get the reserved field, which is unused. + _, rest = _get_sshstr(rest) + sig_key_raw, rest = _get_sshstr(rest) + sig_type, sig_key = _get_sshstr(sig_key_raw) + if sig_type == _SSH_DSA and not _legacy_dsa_allowed: + raise UnsupportedAlgorithm( + "DSA signatures aren't supported in SSH certificates" + ) + # Get the entire cert body and subtract the signature + tbs_cert_body = cert_body[: -len(rest)] + signature_raw, rest = _get_sshstr(rest) + _check_empty(rest) + inner_sig_type, sig_rest = _get_sshstr(signature_raw) + # RSA certs can have multiple algorithm types + if ( + sig_type == _SSH_RSA + and inner_sig_type + not in [_SSH_RSA_SHA256, _SSH_RSA_SHA512, _SSH_RSA] + ) or (sig_type != _SSH_RSA and inner_sig_type != sig_type): + raise ValueError("Signature key type does not match") + signature, sig_rest = _get_sshstr(sig_rest) + _check_empty(sig_rest) + return SSHCertificate( + nonce, + public_key, + serial, + cctype, + key_id, + valid_principals, + valid_after, + valid_before, + critical_options, + extensions, + sig_type, + sig_key, + inner_sig_type, + signature, + tbs_cert_body, + orig_key_type, + cert_body, + ) + else: + _check_empty(rest) + return public_key + + +def load_ssh_public_identity( + data: bytes, +) -> SSHCertificate | SSHPublicKeyTypes: + return _load_ssh_public_identity(data) + + +def _parse_exts_opts(exts_opts: memoryview) -> dict[bytes, bytes]: + result: dict[bytes, bytes] = {} + last_name = None + while exts_opts: + name, exts_opts = _get_sshstr(exts_opts) + bname: bytes = bytes(name) + if bname in result: + raise ValueError("Duplicate name") + if last_name is not None and bname < last_name: + raise ValueError("Fields not lexically sorted") + value, exts_opts = _get_sshstr(exts_opts) + if len(value) > 0: + value, extra = _get_sshstr(value) + if len(extra) > 0: + raise ValueError("Unexpected extra data after value") + result[bname] = bytes(value) + last_name = bname + return result + + +def load_ssh_public_key( + data: bytes, backend: typing.Any = None +) -> SSHPublicKeyTypes: + cert_or_key = _load_ssh_public_identity(data, _legacy_dsa_allowed=True) + public_key: SSHPublicKeyTypes + if isinstance(cert_or_key, SSHCertificate): + public_key = cert_or_key.public_key() + else: + public_key = cert_or_key + + if isinstance(public_key, dsa.DSAPublicKey): + warnings.warn( + "SSH DSA keys are deprecated and will be removed in a future " + "release.", + utils.DeprecatedIn40, + stacklevel=2, + ) + return public_key + + +def serialize_ssh_public_key(public_key: SSHPublicKeyTypes) -> bytes: + """One-line public key format for OpenSSH""" + if isinstance(public_key, dsa.DSAPublicKey): + warnings.warn( + "SSH DSA key support is deprecated and will be " + "removed in a future release", + utils.DeprecatedIn40, + stacklevel=4, + ) + key_type = _get_ssh_key_type(public_key) + kformat = _lookup_kformat(key_type) + + f_pub = _FragList() + f_pub.put_sshstr(key_type) + kformat.encode_public(public_key, f_pub) + + pub = binascii.b2a_base64(f_pub.tobytes()).strip() + return b"".join([key_type, b" ", pub]) + + +SSHCertPrivateKeyTypes = typing.Union[ + ec.EllipticCurvePrivateKey, + rsa.RSAPrivateKey, + ed25519.Ed25519PrivateKey, +] + + +# This is an undocumented limit enforced in the openssh codebase for sshd and +# ssh-keygen, but it is undefined in the ssh certificates spec. +_SSHKEY_CERT_MAX_PRINCIPALS = 256 + + +class SSHCertificateBuilder: + def __init__( + self, + _public_key: SSHCertPublicKeyTypes | None = None, + _serial: int | None = None, + _type: SSHCertificateType | None = None, + _key_id: bytes | None = None, + _valid_principals: list[bytes] = [], + _valid_for_all_principals: bool = False, + _valid_before: int | None = None, + _valid_after: int | None = None, + _critical_options: list[tuple[bytes, bytes]] = [], + _extensions: list[tuple[bytes, bytes]] = [], + ): + self._public_key = _public_key + self._serial = _serial + self._type = _type + self._key_id = _key_id + self._valid_principals = _valid_principals + self._valid_for_all_principals = _valid_for_all_principals + self._valid_before = _valid_before + self._valid_after = _valid_after + self._critical_options = _critical_options + self._extensions = _extensions + + def public_key( + self, public_key: SSHCertPublicKeyTypes + ) -> SSHCertificateBuilder: + if not isinstance( + public_key, + ( + ec.EllipticCurvePublicKey, + rsa.RSAPublicKey, + ed25519.Ed25519PublicKey, + ), + ): + raise TypeError("Unsupported key type") + if self._public_key is not None: + raise ValueError("public_key already set") + + return SSHCertificateBuilder( + _public_key=public_key, + _serial=self._serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def serial(self, serial: int) -> SSHCertificateBuilder: + if not isinstance(serial, int): + raise TypeError("serial must be an integer") + if not 0 <= serial < 2**64: + raise ValueError("serial must be between 0 and 2**64") + if self._serial is not None: + raise ValueError("serial already set") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def type(self, type: SSHCertificateType) -> SSHCertificateBuilder: + if not isinstance(type, SSHCertificateType): + raise TypeError("type must be an SSHCertificateType") + if self._type is not None: + raise ValueError("type already set") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def key_id(self, key_id: bytes) -> SSHCertificateBuilder: + if not isinstance(key_id, bytes): + raise TypeError("key_id must be bytes") + if self._key_id is not None: + raise ValueError("key_id already set") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=self._type, + _key_id=key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def valid_principals( + self, valid_principals: list[bytes] + ) -> SSHCertificateBuilder: + if self._valid_for_all_principals: + raise ValueError( + "Principals can't be set because the cert is valid " + "for all principals" + ) + if ( + not all(isinstance(x, bytes) for x in valid_principals) + or not valid_principals + ): + raise TypeError( + "principals must be a list of bytes and can't be empty" + ) + if self._valid_principals: + raise ValueError("valid_principals already set") + + if len(valid_principals) > _SSHKEY_CERT_MAX_PRINCIPALS: + raise ValueError( + "Reached or exceeded the maximum number of valid_principals" + ) + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def valid_for_all_principals(self): + if self._valid_principals: + raise ValueError( + "valid_principals already set, can't set " + "valid_for_all_principals" + ) + if self._valid_for_all_principals: + raise ValueError("valid_for_all_principals already set") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=True, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def valid_before(self, valid_before: int | float) -> SSHCertificateBuilder: + if not isinstance(valid_before, (int, float)): + raise TypeError("valid_before must be an int or float") + valid_before = int(valid_before) + if valid_before < 0 or valid_before >= 2**64: + raise ValueError("valid_before must [0, 2**64)") + if self._valid_before is not None: + raise ValueError("valid_before already set") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def valid_after(self, valid_after: int | float) -> SSHCertificateBuilder: + if not isinstance(valid_after, (int, float)): + raise TypeError("valid_after must be an int or float") + valid_after = int(valid_after) + if valid_after < 0 or valid_after >= 2**64: + raise ValueError("valid_after must [0, 2**64)") + if self._valid_after is not None: + raise ValueError("valid_after already set") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=valid_after, + _critical_options=self._critical_options, + _extensions=self._extensions, + ) + + def add_critical_option( + self, name: bytes, value: bytes + ) -> SSHCertificateBuilder: + if not isinstance(name, bytes) or not isinstance(value, bytes): + raise TypeError("name and value must be bytes") + # This is O(n**2) + if name in [name for name, _ in self._critical_options]: + raise ValueError("Duplicate critical option name") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=[*self._critical_options, (name, value)], + _extensions=self._extensions, + ) + + def add_extension( + self, name: bytes, value: bytes + ) -> SSHCertificateBuilder: + if not isinstance(name, bytes) or not isinstance(value, bytes): + raise TypeError("name and value must be bytes") + # This is O(n**2) + if name in [name for name, _ in self._extensions]: + raise ValueError("Duplicate extension name") + + return SSHCertificateBuilder( + _public_key=self._public_key, + _serial=self._serial, + _type=self._type, + _key_id=self._key_id, + _valid_principals=self._valid_principals, + _valid_for_all_principals=self._valid_for_all_principals, + _valid_before=self._valid_before, + _valid_after=self._valid_after, + _critical_options=self._critical_options, + _extensions=[*self._extensions, (name, value)], + ) + + def sign(self, private_key: SSHCertPrivateKeyTypes) -> SSHCertificate: + if not isinstance( + private_key, + ( + ec.EllipticCurvePrivateKey, + rsa.RSAPrivateKey, + ed25519.Ed25519PrivateKey, + ), + ): + raise TypeError("Unsupported private key type") + + if self._public_key is None: + raise ValueError("public_key must be set") + + # Not required + serial = 0 if self._serial is None else self._serial + + if self._type is None: + raise ValueError("type must be set") + + # Not required + key_id = b"" if self._key_id is None else self._key_id + + # A zero length list is valid, but means the certificate + # is valid for any principal of the specified type. We require + # the user to explicitly set valid_for_all_principals to get + # that behavior. + if not self._valid_principals and not self._valid_for_all_principals: + raise ValueError( + "valid_principals must be set if valid_for_all_principals " + "is False" + ) + + if self._valid_before is None: + raise ValueError("valid_before must be set") + + if self._valid_after is None: + raise ValueError("valid_after must be set") + + if self._valid_after > self._valid_before: + raise ValueError("valid_after must be earlier than valid_before") + + # lexically sort our byte strings + self._critical_options.sort(key=lambda x: x[0]) + self._extensions.sort(key=lambda x: x[0]) + + key_type = _get_ssh_key_type(self._public_key) + cert_prefix = key_type + _CERT_SUFFIX + + # Marshal the bytes to be signed + nonce = os.urandom(32) + kformat = _lookup_kformat(key_type) + f = _FragList() + f.put_sshstr(cert_prefix) + f.put_sshstr(nonce) + kformat.encode_public(self._public_key, f) + f.put_u64(serial) + f.put_u32(self._type.value) + f.put_sshstr(key_id) + fprincipals = _FragList() + for p in self._valid_principals: + fprincipals.put_sshstr(p) + f.put_sshstr(fprincipals.tobytes()) + f.put_u64(self._valid_after) + f.put_u64(self._valid_before) + fcrit = _FragList() + for name, value in self._critical_options: + fcrit.put_sshstr(name) + if len(value) > 0: + foptval = _FragList() + foptval.put_sshstr(value) + fcrit.put_sshstr(foptval.tobytes()) + else: + fcrit.put_sshstr(value) + f.put_sshstr(fcrit.tobytes()) + fext = _FragList() + for name, value in self._extensions: + fext.put_sshstr(name) + if len(value) > 0: + fextval = _FragList() + fextval.put_sshstr(value) + fext.put_sshstr(fextval.tobytes()) + else: + fext.put_sshstr(value) + f.put_sshstr(fext.tobytes()) + f.put_sshstr(b"") # RESERVED FIELD + # encode CA public key + ca_type = _get_ssh_key_type(private_key) + caformat = _lookup_kformat(ca_type) + caf = _FragList() + caf.put_sshstr(ca_type) + caformat.encode_public(private_key.public_key(), caf) + f.put_sshstr(caf.tobytes()) + # Sigs according to the rules defined for the CA's public key + # (RFC4253 section 6.6 for ssh-rsa, RFC5656 for ECDSA, + # and RFC8032 for Ed25519). + if isinstance(private_key, ed25519.Ed25519PrivateKey): + signature = private_key.sign(f.tobytes()) + fsig = _FragList() + fsig.put_sshstr(ca_type) + fsig.put_sshstr(signature) + f.put_sshstr(fsig.tobytes()) + elif isinstance(private_key, ec.EllipticCurvePrivateKey): + hash_alg = _get_ec_hash_alg(private_key.curve) + signature = private_key.sign(f.tobytes(), ec.ECDSA(hash_alg)) + r, s = asym_utils.decode_dss_signature(signature) + fsig = _FragList() + fsig.put_sshstr(ca_type) + fsigblob = _FragList() + fsigblob.put_mpint(r) + fsigblob.put_mpint(s) + fsig.put_sshstr(fsigblob.tobytes()) + f.put_sshstr(fsig.tobytes()) + + else: + assert isinstance(private_key, rsa.RSAPrivateKey) + # Just like Golang, we're going to use SHA512 for RSA + # https://cs.opensource.google/go/x/crypto/+/refs/tags/ + # v0.4.0:ssh/certs.go;l=445 + # RFC 8332 defines SHA256 and 512 as options + fsig = _FragList() + fsig.put_sshstr(_SSH_RSA_SHA512) + signature = private_key.sign( + f.tobytes(), padding.PKCS1v15(), hashes.SHA512() + ) + fsig.put_sshstr(signature) + f.put_sshstr(fsig.tobytes()) + + cert_data = binascii.b2a_base64(f.tobytes()).strip() + # load_ssh_public_identity returns a union, but this is + # guaranteed to be an SSHCertificate, so we cast to make + # mypy happy. + return typing.cast( + SSHCertificate, + load_ssh_public_identity(b"".join([cert_prefix, b" ", cert_data])), + ) |