about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/cryptography/x509/ocsp.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/cryptography/x509/ocsp.py')
-rw-r--r--.venv/lib/python3.12/site-packages/cryptography/x509/ocsp.py344
1 files changed, 344 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/cryptography/x509/ocsp.py b/.venv/lib/python3.12/site-packages/cryptography/x509/ocsp.py
new file mode 100644
index 00000000..5a011c41
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/cryptography/x509/ocsp.py
@@ -0,0 +1,344 @@
+# This file is dual licensed under the terms of the Apache License, Version
+# 2.0, and the BSD License. See the LICENSE file in the root of this repository
+# for complete details.
+
+from __future__ import annotations
+
+import datetime
+import typing
+
+from cryptography import utils, x509
+from cryptography.hazmat.bindings._rust import ocsp
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric.types import (
+    CertificateIssuerPrivateKeyTypes,
+)
+from cryptography.x509.base import (
+    _EARLIEST_UTC_TIME,
+    _convert_to_naive_utc_time,
+    _reject_duplicate_extension,
+)
+
+
+class OCSPResponderEncoding(utils.Enum):
+    HASH = "By Hash"
+    NAME = "By Name"
+
+
+class OCSPResponseStatus(utils.Enum):
+    SUCCESSFUL = 0
+    MALFORMED_REQUEST = 1
+    INTERNAL_ERROR = 2
+    TRY_LATER = 3
+    SIG_REQUIRED = 5
+    UNAUTHORIZED = 6
+
+
+_ALLOWED_HASHES = (
+    hashes.SHA1,
+    hashes.SHA224,
+    hashes.SHA256,
+    hashes.SHA384,
+    hashes.SHA512,
+)
+
+
+def _verify_algorithm(algorithm: hashes.HashAlgorithm) -> None:
+    if not isinstance(algorithm, _ALLOWED_HASHES):
+        raise ValueError(
+            "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512"
+        )
+
+
+class OCSPCertStatus(utils.Enum):
+    GOOD = 0
+    REVOKED = 1
+    UNKNOWN = 2
+
+
+class _SingleResponse:
+    def __init__(
+        self,
+        cert: x509.Certificate,
+        issuer: x509.Certificate,
+        algorithm: hashes.HashAlgorithm,
+        cert_status: OCSPCertStatus,
+        this_update: datetime.datetime,
+        next_update: datetime.datetime | None,
+        revocation_time: datetime.datetime | None,
+        revocation_reason: x509.ReasonFlags | None,
+    ):
+        if not isinstance(cert, x509.Certificate) or not isinstance(
+            issuer, x509.Certificate
+        ):
+            raise TypeError("cert and issuer must be a Certificate")
+
+        _verify_algorithm(algorithm)
+        if not isinstance(this_update, datetime.datetime):
+            raise TypeError("this_update must be a datetime object")
+        if next_update is not None and not isinstance(
+            next_update, datetime.datetime
+        ):
+            raise TypeError("next_update must be a datetime object or None")
+
+        self._cert = cert
+        self._issuer = issuer
+        self._algorithm = algorithm
+        self._this_update = this_update
+        self._next_update = next_update
+
+        if not isinstance(cert_status, OCSPCertStatus):
+            raise TypeError(
+                "cert_status must be an item from the OCSPCertStatus enum"
+            )
+        if cert_status is not OCSPCertStatus.REVOKED:
+            if revocation_time is not None:
+                raise ValueError(
+                    "revocation_time can only be provided if the certificate "
+                    "is revoked"
+                )
+            if revocation_reason is not None:
+                raise ValueError(
+                    "revocation_reason can only be provided if the certificate"
+                    " is revoked"
+                )
+        else:
+            if not isinstance(revocation_time, datetime.datetime):
+                raise TypeError("revocation_time must be a datetime object")
+
+            revocation_time = _convert_to_naive_utc_time(revocation_time)
+            if revocation_time < _EARLIEST_UTC_TIME:
+                raise ValueError(
+                    "The revocation_time must be on or after"
+                    " 1950 January 1."
+                )
+
+            if revocation_reason is not None and not isinstance(
+                revocation_reason, x509.ReasonFlags
+            ):
+                raise TypeError(
+                    "revocation_reason must be an item from the ReasonFlags "
+                    "enum or None"
+                )
+
+        self._cert_status = cert_status
+        self._revocation_time = revocation_time
+        self._revocation_reason = revocation_reason
+
+
+OCSPRequest = ocsp.OCSPRequest
+OCSPResponse = ocsp.OCSPResponse
+OCSPSingleResponse = ocsp.OCSPSingleResponse
+
+
+class OCSPRequestBuilder:
+    def __init__(
+        self,
+        request: tuple[
+            x509.Certificate, x509.Certificate, hashes.HashAlgorithm
+        ]
+        | None = None,
+        request_hash: tuple[bytes, bytes, int, hashes.HashAlgorithm]
+        | None = None,
+        extensions: list[x509.Extension[x509.ExtensionType]] = [],
+    ) -> None:
+        self._request = request
+        self._request_hash = request_hash
+        self._extensions = extensions
+
+    def add_certificate(
+        self,
+        cert: x509.Certificate,
+        issuer: x509.Certificate,
+        algorithm: hashes.HashAlgorithm,
+    ) -> OCSPRequestBuilder:
+        if self._request is not None or self._request_hash is not None:
+            raise ValueError("Only one certificate can be added to a request")
+
+        _verify_algorithm(algorithm)
+        if not isinstance(cert, x509.Certificate) or not isinstance(
+            issuer, x509.Certificate
+        ):
+            raise TypeError("cert and issuer must be a Certificate")
+
+        return OCSPRequestBuilder(
+            (cert, issuer, algorithm), self._request_hash, self._extensions
+        )
+
+    def add_certificate_by_hash(
+        self,
+        issuer_name_hash: bytes,
+        issuer_key_hash: bytes,
+        serial_number: int,
+        algorithm: hashes.HashAlgorithm,
+    ) -> OCSPRequestBuilder:
+        if self._request is not None or self._request_hash is not None:
+            raise ValueError("Only one certificate can be added to a request")
+
+        if not isinstance(serial_number, int):
+            raise TypeError("serial_number must be an integer")
+
+        _verify_algorithm(algorithm)
+        utils._check_bytes("issuer_name_hash", issuer_name_hash)
+        utils._check_bytes("issuer_key_hash", issuer_key_hash)
+        if algorithm.digest_size != len(
+            issuer_name_hash
+        ) or algorithm.digest_size != len(issuer_key_hash):
+            raise ValueError(
+                "issuer_name_hash and issuer_key_hash must be the same length "
+                "as the digest size of the algorithm"
+            )
+
+        return OCSPRequestBuilder(
+            self._request,
+            (issuer_name_hash, issuer_key_hash, serial_number, algorithm),
+            self._extensions,
+        )
+
+    def add_extension(
+        self, extval: x509.ExtensionType, critical: bool
+    ) -> OCSPRequestBuilder:
+        if not isinstance(extval, x509.ExtensionType):
+            raise TypeError("extension must be an ExtensionType")
+
+        extension = x509.Extension(extval.oid, critical, extval)
+        _reject_duplicate_extension(extension, self._extensions)
+
+        return OCSPRequestBuilder(
+            self._request, self._request_hash, [*self._extensions, extension]
+        )
+
+    def build(self) -> OCSPRequest:
+        if self._request is None and self._request_hash is None:
+            raise ValueError("You must add a certificate before building")
+
+        return ocsp.create_ocsp_request(self)
+
+
+class OCSPResponseBuilder:
+    def __init__(
+        self,
+        response: _SingleResponse | None = None,
+        responder_id: tuple[x509.Certificate, OCSPResponderEncoding]
+        | None = None,
+        certs: list[x509.Certificate] | None = None,
+        extensions: list[x509.Extension[x509.ExtensionType]] = [],
+    ):
+        self._response = response
+        self._responder_id = responder_id
+        self._certs = certs
+        self._extensions = extensions
+
+    def add_response(
+        self,
+        cert: x509.Certificate,
+        issuer: x509.Certificate,
+        algorithm: hashes.HashAlgorithm,
+        cert_status: OCSPCertStatus,
+        this_update: datetime.datetime,
+        next_update: datetime.datetime | None,
+        revocation_time: datetime.datetime | None,
+        revocation_reason: x509.ReasonFlags | None,
+    ) -> OCSPResponseBuilder:
+        if self._response is not None:
+            raise ValueError("Only one response per OCSPResponse.")
+
+        singleresp = _SingleResponse(
+            cert,
+            issuer,
+            algorithm,
+            cert_status,
+            this_update,
+            next_update,
+            revocation_time,
+            revocation_reason,
+        )
+        return OCSPResponseBuilder(
+            singleresp,
+            self._responder_id,
+            self._certs,
+            self._extensions,
+        )
+
+    def responder_id(
+        self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate
+    ) -> OCSPResponseBuilder:
+        if self._responder_id is not None:
+            raise ValueError("responder_id can only be set once")
+        if not isinstance(responder_cert, x509.Certificate):
+            raise TypeError("responder_cert must be a Certificate")
+        if not isinstance(encoding, OCSPResponderEncoding):
+            raise TypeError(
+                "encoding must be an element from OCSPResponderEncoding"
+            )
+
+        return OCSPResponseBuilder(
+            self._response,
+            (responder_cert, encoding),
+            self._certs,
+            self._extensions,
+        )
+
+    def certificates(
+        self, certs: typing.Iterable[x509.Certificate]
+    ) -> OCSPResponseBuilder:
+        if self._certs is not None:
+            raise ValueError("certificates may only be set once")
+        certs = list(certs)
+        if len(certs) == 0:
+            raise ValueError("certs must not be an empty list")
+        if not all(isinstance(x, x509.Certificate) for x in certs):
+            raise TypeError("certs must be a list of Certificates")
+        return OCSPResponseBuilder(
+            self._response,
+            self._responder_id,
+            certs,
+            self._extensions,
+        )
+
+    def add_extension(
+        self, extval: x509.ExtensionType, critical: bool
+    ) -> OCSPResponseBuilder:
+        if not isinstance(extval, x509.ExtensionType):
+            raise TypeError("extension must be an ExtensionType")
+
+        extension = x509.Extension(extval.oid, critical, extval)
+        _reject_duplicate_extension(extension, self._extensions)
+
+        return OCSPResponseBuilder(
+            self._response,
+            self._responder_id,
+            self._certs,
+            [*self._extensions, extension],
+        )
+
+    def sign(
+        self,
+        private_key: CertificateIssuerPrivateKeyTypes,
+        algorithm: hashes.HashAlgorithm | None,
+    ) -> OCSPResponse:
+        if self._response is None:
+            raise ValueError("You must add a response before signing")
+        if self._responder_id is None:
+            raise ValueError("You must add a responder_id before signing")
+
+        return ocsp.create_ocsp_response(
+            OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
+        )
+
+    @classmethod
+    def build_unsuccessful(
+        cls, response_status: OCSPResponseStatus
+    ) -> OCSPResponse:
+        if not isinstance(response_status, OCSPResponseStatus):
+            raise TypeError(
+                "response_status must be an item from OCSPResponseStatus"
+            )
+        if response_status is OCSPResponseStatus.SUCCESSFUL:
+            raise ValueError("response_status cannot be SUCCESSFUL")
+
+        return ocsp.create_ocsp_response(response_status, None, None, None)
+
+
+load_der_ocsp_request = ocsp.load_der_ocsp_request
+load_der_ocsp_response = ocsp.load_der_ocsp_response