about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/httpchecksum.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/httpchecksum.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/httpchecksum.py550
1 files changed, 550 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py b/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py
new file mode 100644
index 00000000..929e147e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py
@@ -0,0 +1,550 @@
+# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+"""The interfaces in this module are not intended for public use.
+
+This module defines interfaces for applying checksums to HTTP requests within
+the context of botocore. This involves both resolving the checksum to be used
+based on client configuration and environment, as well as application of the
+checksum to the request.
+"""
+
+import base64
+import io
+import logging
+from binascii import crc32
+from hashlib import sha1, sha256
+
+from botocore.compat import HAS_CRT, has_minimum_crt_version, urlparse
+from botocore.exceptions import (
+    AwsChunkedWrapperError,
+    FlexibleChecksumError,
+    MissingDependencyException,
+)
+from botocore.model import StructureShape
+from botocore.response import StreamingBody
+from botocore.utils import (
+    conditionally_calculate_md5,
+    determine_content_length,
+    has_checksum_header,
+)
+
+if HAS_CRT:
+    from awscrt import checksums as crt_checksums
+else:
+    crt_checksums = None
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_CHECKSUM_ALGORITHM = "CRC32"
+
+
+class BaseChecksum:
+    _CHUNK_SIZE = 1024 * 1024
+
+    def update(self, chunk):
+        pass
+
+    def digest(self):
+        pass
+
+    def b64digest(self):
+        bs = self.digest()
+        return base64.b64encode(bs).decode("ascii")
+
+    def _handle_fileobj(self, fileobj):
+        start_position = fileobj.tell()
+        for chunk in iter(lambda: fileobj.read(self._CHUNK_SIZE), b""):
+            self.update(chunk)
+        fileobj.seek(start_position)
+
+    def handle(self, body):
+        if isinstance(body, (bytes, bytearray)):
+            self.update(body)
+        else:
+            self._handle_fileobj(body)
+        return self.b64digest()
+
+
+class Crc32Checksum(BaseChecksum):
+    def __init__(self):
+        self._int_crc32 = 0
+
+    def update(self, chunk):
+        self._int_crc32 = crc32(chunk, self._int_crc32) & 0xFFFFFFFF
+
+    def digest(self):
+        return self._int_crc32.to_bytes(4, byteorder="big")
+
+
+class CrtCrc32Checksum(BaseChecksum):
+    # Note: This class is only used if the CRT is available
+    def __init__(self):
+        self._int_crc32 = 0
+
+    def update(self, chunk):
+        new_checksum = crt_checksums.crc32(chunk, self._int_crc32)
+        self._int_crc32 = new_checksum & 0xFFFFFFFF
+
+    def digest(self):
+        return self._int_crc32.to_bytes(4, byteorder="big")
+
+
+class CrtCrc32cChecksum(BaseChecksum):
+    # Note: This class is only used if the CRT is available
+    def __init__(self):
+        self._int_crc32c = 0
+
+    def update(self, chunk):
+        new_checksum = crt_checksums.crc32c(chunk, self._int_crc32c)
+        self._int_crc32c = new_checksum & 0xFFFFFFFF
+
+    def digest(self):
+        return self._int_crc32c.to_bytes(4, byteorder="big")
+
+
+class CrtCrc64NvmeChecksum(BaseChecksum):
+    # Note: This class is only used if the CRT is available
+    def __init__(self):
+        self._int_crc64nvme = 0
+
+    def update(self, chunk):
+        new_checksum = crt_checksums.crc64nvme(chunk, self._int_crc64nvme)
+        self._int_crc64nvme = new_checksum & 0xFFFFFFFFFFFFFFFF
+
+    def digest(self):
+        return self._int_crc64nvme.to_bytes(8, byteorder="big")
+
+
+class Sha1Checksum(BaseChecksum):
+    def __init__(self):
+        self._checksum = sha1()
+
+    def update(self, chunk):
+        self._checksum.update(chunk)
+
+    def digest(self):
+        return self._checksum.digest()
+
+
+class Sha256Checksum(BaseChecksum):
+    def __init__(self):
+        self._checksum = sha256()
+
+    def update(self, chunk):
+        self._checksum.update(chunk)
+
+    def digest(self):
+        return self._checksum.digest()
+
+
+class AwsChunkedWrapper:
+    _DEFAULT_CHUNK_SIZE = 1024 * 1024
+
+    def __init__(
+        self,
+        raw,
+        checksum_cls=None,
+        checksum_name="x-amz-checksum",
+        chunk_size=None,
+    ):
+        self._raw = raw
+        self._checksum_name = checksum_name
+        self._checksum_cls = checksum_cls
+        self._reset()
+
+        if chunk_size is None:
+            chunk_size = self._DEFAULT_CHUNK_SIZE
+        self._chunk_size = chunk_size
+
+    def _reset(self):
+        self._remaining = b""
+        self._complete = False
+        self._checksum = None
+        if self._checksum_cls:
+            self._checksum = self._checksum_cls()
+
+    def seek(self, offset, whence=0):
+        if offset != 0 or whence != 0:
+            raise AwsChunkedWrapperError(
+                error_msg="Can only seek to start of stream"
+            )
+        self._reset()
+        self._raw.seek(0)
+
+    def read(self, size=None):
+        # Normalize "read all" size values to None
+        if size is not None and size <= 0:
+            size = None
+
+        # If the underlying body is done and we have nothing left then
+        # end the stream
+        if self._complete and not self._remaining:
+            return b""
+
+        # While we're not done and want more bytes
+        want_more_bytes = size is None or size > len(self._remaining)
+        while not self._complete and want_more_bytes:
+            self._remaining += self._make_chunk()
+            want_more_bytes = size is None or size > len(self._remaining)
+
+        # If size was None, we want to return everything
+        if size is None:
+            size = len(self._remaining)
+
+        # Return a chunk up to the size asked for
+        to_return = self._remaining[:size]
+        self._remaining = self._remaining[size:]
+        return to_return
+
+    def _make_chunk(self):
+        # NOTE: Chunk size is not deterministic as read could return less. This
+        # means we cannot know the content length of the encoded aws-chunked
+        # stream ahead of time without ensuring a consistent chunk size
+        raw_chunk = self._raw.read(self._chunk_size)
+        hex_len = hex(len(raw_chunk))[2:].encode("ascii")
+        self._complete = not raw_chunk
+
+        if self._checksum:
+            self._checksum.update(raw_chunk)
+
+        if self._checksum and self._complete:
+            name = self._checksum_name.encode("ascii")
+            checksum = self._checksum.b64digest().encode("ascii")
+            return b"0\r\n%s:%s\r\n\r\n" % (name, checksum)
+
+        return b"%s\r\n%s\r\n" % (hex_len, raw_chunk)
+
+    def __iter__(self):
+        while not self._complete:
+            yield self._make_chunk()
+
+
+class StreamingChecksumBody(StreamingBody):
+    def __init__(self, raw_stream, content_length, checksum, expected):
+        super().__init__(raw_stream, content_length)
+        self._checksum = checksum
+        self._expected = expected
+
+    def read(self, amt=None):
+        chunk = super().read(amt=amt)
+        self._checksum.update(chunk)
+        if amt is None or (not chunk and amt > 0):
+            self._validate_checksum()
+        return chunk
+
+    def _validate_checksum(self):
+        if self._checksum.digest() != base64.b64decode(self._expected):
+            error_msg = (
+                f"Expected checksum {self._expected} did not match calculated "
+                f"checksum: {self._checksum.b64digest()}"
+            )
+            raise FlexibleChecksumError(error_msg=error_msg)
+
+
+def resolve_checksum_context(request, operation_model, params):
+    resolve_request_checksum_algorithm(request, operation_model, params)
+    resolve_response_checksum_algorithms(request, operation_model, params)
+
+
+def resolve_request_checksum_algorithm(
+    request,
+    operation_model,
+    params,
+    supported_algorithms=None,
+):
+    # If the header is already set by the customer, skip calculation
+    if has_checksum_header(request):
+        return
+
+    checksum_context = request["context"].get("checksum", {})
+    request_checksum_calculation = request["context"][
+        "client_config"
+    ].request_checksum_calculation
+    http_checksum = operation_model.http_checksum
+    request_checksum_required = (
+        operation_model.http_checksum_required
+        or http_checksum.get("requestChecksumRequired")
+    )
+    algorithm_member = http_checksum.get("requestAlgorithmMember")
+    if algorithm_member and algorithm_member in params:
+        # If the client has opted into using flexible checksums and the
+        # request supports it, use that instead of checksum required
+        if supported_algorithms is None:
+            supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS
+
+        algorithm_name = params[algorithm_member].lower()
+        if algorithm_name not in supported_algorithms:
+            if not HAS_CRT and algorithm_name in _CRT_CHECKSUM_ALGORITHMS:
+                raise MissingDependencyException(
+                    msg=(
+                        f"Using {algorithm_name.upper()} requires an "
+                        "additional dependency. You will need to pip install "
+                        "botocore[crt] before proceeding."
+                    )
+                )
+            raise FlexibleChecksumError(
+                error_msg=f"Unsupported checksum algorithm: {algorithm_name}"
+            )
+    elif request_checksum_required or (
+        algorithm_member and request_checksum_calculation == "when_supported"
+    ):
+        # Don't use a default checksum for presigned requests.
+        if request["context"].get("is_presign_request"):
+            return
+        algorithm_name = DEFAULT_CHECKSUM_ALGORITHM.lower()
+        algorithm_member_header = _get_request_algorithm_member_header(
+            operation_model, request, algorithm_member
+        )
+        if algorithm_member_header is not None:
+            checksum_context["request_algorithm_header"] = {
+                "name": algorithm_member_header,
+                "value": DEFAULT_CHECKSUM_ALGORITHM,
+            }
+    else:
+        return
+
+    location_type = "header"
+    if (
+        operation_model.has_streaming_input
+        and urlparse(request["url"]).scheme == "https"
+    ):
+        if request["context"]["client_config"].signature_version != 's3':
+            # Operations with streaming input must support trailers.
+            # We only support unsigned trailer checksums currently. As this
+            # disables payload signing we'll only use trailers over TLS.
+            location_type = "trailer"
+
+    algorithm = {
+        "algorithm": algorithm_name,
+        "in": location_type,
+        "name": f"x-amz-checksum-{algorithm_name}",
+    }
+
+    checksum_context["request_algorithm"] = algorithm
+    request["context"]["checksum"] = checksum_context
+
+
+def _get_request_algorithm_member_header(
+    operation_model, request, algorithm_member
+):
+    """Get the name of the header targeted by the "requestAlgorithmMember"."""
+    operation_input_shape = operation_model.input_shape
+    if not isinstance(operation_input_shape, StructureShape):
+        return
+
+    algorithm_member_shape = operation_input_shape.members.get(
+        algorithm_member
+    )
+
+    if algorithm_member_shape:
+        return algorithm_member_shape.serialization.get("name")
+
+
+def apply_request_checksum(request):
+    checksum_context = request.get("context", {}).get("checksum", {})
+    algorithm = checksum_context.get("request_algorithm")
+
+    if not algorithm:
+        return
+
+    if algorithm == "conditional-md5":
+        # Special case to handle the http checksum required trait
+        conditionally_calculate_md5(request)
+    elif algorithm["in"] == "header":
+        _apply_request_header_checksum(request)
+    elif algorithm["in"] == "trailer":
+        _apply_request_trailer_checksum(request)
+    else:
+        raise FlexibleChecksumError(
+            error_msg="Unknown checksum variant: {}".format(algorithm["in"])
+        )
+    if "request_algorithm_header" in checksum_context:
+        request_algorithm_header = checksum_context["request_algorithm_header"]
+        request["headers"][request_algorithm_header["name"]] = (
+            request_algorithm_header["value"]
+        )
+
+
+def _apply_request_header_checksum(request):
+    checksum_context = request.get("context", {}).get("checksum", {})
+    algorithm = checksum_context.get("request_algorithm")
+    location_name = algorithm["name"]
+    if location_name in request["headers"]:
+        # If the header is already set by the customer, skip calculation
+        return
+    checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"])
+    digest = checksum_cls().handle(request["body"])
+    request["headers"][location_name] = digest
+
+
+def _apply_request_trailer_checksum(request):
+    checksum_context = request.get("context", {}).get("checksum", {})
+    algorithm = checksum_context.get("request_algorithm")
+    location_name = algorithm["name"]
+    checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"])
+
+    headers = request["headers"]
+    body = request["body"]
+
+    if location_name in headers:
+        # If the header is already set by the customer, skip calculation
+        return
+
+    headers["Transfer-Encoding"] = "chunked"
+    if "Content-Encoding" in headers:
+        # We need to preserve the existing content encoding and add
+        # aws-chunked as a new content encoding.
+        headers["Content-Encoding"] += ",aws-chunked"
+    else:
+        headers["Content-Encoding"] = "aws-chunked"
+    headers["X-Amz-Trailer"] = location_name
+
+    content_length = determine_content_length(body)
+    if content_length is not None:
+        # Send the decoded content length if we can determine it. Some
+        # services such as S3 may require the decoded content length
+        headers["X-Amz-Decoded-Content-Length"] = str(content_length)
+
+        if "Content-Length" in headers:
+            del headers["Content-Length"]
+            logger.debug(
+                "Removing the Content-Length header since 'chunked' is specified for Transfer-Encoding."
+            )
+
+    if isinstance(body, (bytes, bytearray)):
+        body = io.BytesIO(body)
+
+    request["body"] = AwsChunkedWrapper(
+        body,
+        checksum_cls=checksum_cls,
+        checksum_name=location_name,
+    )
+
+
+def resolve_response_checksum_algorithms(
+    request, operation_model, params, supported_algorithms=None
+):
+    http_checksum = operation_model.http_checksum
+    mode_member = http_checksum.get("requestValidationModeMember")
+    if mode_member and mode_member in params:
+        if supported_algorithms is None:
+            supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS
+        response_algorithms = {
+            a.lower() for a in http_checksum.get("responseAlgorithms", [])
+        }
+
+        usable_algorithms = []
+        for algorithm in _ALGORITHMS_PRIORITY_LIST:
+            if algorithm not in response_algorithms:
+                continue
+            if algorithm in supported_algorithms:
+                usable_algorithms.append(algorithm)
+
+        checksum_context = request["context"].get("checksum", {})
+        checksum_context["response_algorithms"] = usable_algorithms
+        request["context"]["checksum"] = checksum_context
+
+
+def handle_checksum_body(http_response, response, context, operation_model):
+    headers = response["headers"]
+    checksum_context = context.get("checksum", {})
+    algorithms = checksum_context.get("response_algorithms")
+
+    if not algorithms:
+        return
+
+    for algorithm in algorithms:
+        header_name = f"x-amz-checksum-{algorithm}"
+        # If the header is not found, check the next algorithm
+        if header_name not in headers:
+            continue
+
+        # If a - is in the checksum this is not valid Base64. S3 returns
+        # checksums that include a -# suffix to indicate a checksum derived
+        # from the hash of all part checksums. We cannot wrap this response
+        if "-" in headers[header_name]:
+            continue
+
+        if operation_model.has_streaming_output:
+            response["body"] = _handle_streaming_response(
+                http_response, response, algorithm
+            )
+        else:
+            response["body"] = _handle_bytes_response(
+                http_response, response, algorithm
+            )
+
+        # Expose metadata that the checksum check actually occurred
+        checksum_context = response["context"].get("checksum", {})
+        checksum_context["response_algorithm"] = algorithm
+        response["context"]["checksum"] = checksum_context
+        return
+
+    logger.debug(
+        f'Skipping checksum validation. Response did not contain one of the '
+        f'following algorithms: {algorithms}.'
+    )
+
+
+def _handle_streaming_response(http_response, response, algorithm):
+    checksum_cls = _CHECKSUM_CLS.get(algorithm)
+    header_name = f"x-amz-checksum-{algorithm}"
+    return StreamingChecksumBody(
+        http_response.raw,
+        response["headers"].get("content-length"),
+        checksum_cls(),
+        response["headers"][header_name],
+    )
+
+
+def _handle_bytes_response(http_response, response, algorithm):
+    body = http_response.content
+    header_name = f"x-amz-checksum-{algorithm}"
+    checksum_cls = _CHECKSUM_CLS.get(algorithm)
+    checksum = checksum_cls()
+    checksum.update(body)
+    expected = response["headers"][header_name]
+    if checksum.digest() != base64.b64decode(expected):
+        error_msg = (
+            f"Expected checksum {expected} did not match calculated "
+            f"checksum: {checksum.b64digest()}"
+        )
+        raise FlexibleChecksumError(error_msg=error_msg)
+    return body
+
+
+_CHECKSUM_CLS = {
+    "crc32": Crc32Checksum,
+    "sha1": Sha1Checksum,
+    "sha256": Sha256Checksum,
+}
+_CRT_CHECKSUM_ALGORITHMS = ["crc32", "crc32c", "crc64nvme"]
+if HAS_CRT:
+    # Use CRT checksum implementations if available
+    _CRT_CHECKSUM_CLS = {
+        "crc32": CrtCrc32Checksum,
+        "crc32c": CrtCrc32cChecksum,
+    }
+
+    if has_minimum_crt_version((0, 23, 4)):
+        # CRC64NVME support wasn't officially added until 0.23.4
+        _CRT_CHECKSUM_CLS["crc64nvme"] = CrtCrc64NvmeChecksum
+
+    _CHECKSUM_CLS.update(_CRT_CHECKSUM_CLS)
+    # Validate this list isn't out of sync with _CRT_CHECKSUM_CLS keys
+    assert all(
+        name in _CRT_CHECKSUM_ALGORITHMS for name in _CRT_CHECKSUM_CLS.keys()
+    )
+_SUPPORTED_CHECKSUM_ALGORITHMS = list(_CHECKSUM_CLS.keys())
+_ALGORITHMS_PRIORITY_LIST = ['crc64nvme', 'crc32c', 'crc32', 'sha1', 'sha256']