aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/httpchecksum.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/httpchecksum.py550
1 files changed, 550 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py b/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py
new file mode 100644
index 00000000..929e147e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/httpchecksum.py
@@ -0,0 +1,550 @@
+# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+"""The interfaces in this module are not intended for public use.
+
+This module defines interfaces for applying checksums to HTTP requests within
+the context of botocore. This involves both resolving the checksum to be used
+based on client configuration and environment, as well as application of the
+checksum to the request.
+"""
+
+import base64
+import io
+import logging
+from binascii import crc32
+from hashlib import sha1, sha256
+
+from botocore.compat import HAS_CRT, has_minimum_crt_version, urlparse
+from botocore.exceptions import (
+ AwsChunkedWrapperError,
+ FlexibleChecksumError,
+ MissingDependencyException,
+)
+from botocore.model import StructureShape
+from botocore.response import StreamingBody
+from botocore.utils import (
+ conditionally_calculate_md5,
+ determine_content_length,
+ has_checksum_header,
+)
+
+if HAS_CRT:
+ from awscrt import checksums as crt_checksums
+else:
+ crt_checksums = None
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_CHECKSUM_ALGORITHM = "CRC32"
+
+
+class BaseChecksum:
+ _CHUNK_SIZE = 1024 * 1024
+
+ def update(self, chunk):
+ pass
+
+ def digest(self):
+ pass
+
+ def b64digest(self):
+ bs = self.digest()
+ return base64.b64encode(bs).decode("ascii")
+
+ def _handle_fileobj(self, fileobj):
+ start_position = fileobj.tell()
+ for chunk in iter(lambda: fileobj.read(self._CHUNK_SIZE), b""):
+ self.update(chunk)
+ fileobj.seek(start_position)
+
+ def handle(self, body):
+ if isinstance(body, (bytes, bytearray)):
+ self.update(body)
+ else:
+ self._handle_fileobj(body)
+ return self.b64digest()
+
+
+class Crc32Checksum(BaseChecksum):
+ def __init__(self):
+ self._int_crc32 = 0
+
+ def update(self, chunk):
+ self._int_crc32 = crc32(chunk, self._int_crc32) & 0xFFFFFFFF
+
+ def digest(self):
+ return self._int_crc32.to_bytes(4, byteorder="big")
+
+
+class CrtCrc32Checksum(BaseChecksum):
+ # Note: This class is only used if the CRT is available
+ def __init__(self):
+ self._int_crc32 = 0
+
+ def update(self, chunk):
+ new_checksum = crt_checksums.crc32(chunk, self._int_crc32)
+ self._int_crc32 = new_checksum & 0xFFFFFFFF
+
+ def digest(self):
+ return self._int_crc32.to_bytes(4, byteorder="big")
+
+
+class CrtCrc32cChecksum(BaseChecksum):
+ # Note: This class is only used if the CRT is available
+ def __init__(self):
+ self._int_crc32c = 0
+
+ def update(self, chunk):
+ new_checksum = crt_checksums.crc32c(chunk, self._int_crc32c)
+ self._int_crc32c = new_checksum & 0xFFFFFFFF
+
+ def digest(self):
+ return self._int_crc32c.to_bytes(4, byteorder="big")
+
+
+class CrtCrc64NvmeChecksum(BaseChecksum):
+ # Note: This class is only used if the CRT is available
+ def __init__(self):
+ self._int_crc64nvme = 0
+
+ def update(self, chunk):
+ new_checksum = crt_checksums.crc64nvme(chunk, self._int_crc64nvme)
+ self._int_crc64nvme = new_checksum & 0xFFFFFFFFFFFFFFFF
+
+ def digest(self):
+ return self._int_crc64nvme.to_bytes(8, byteorder="big")
+
+
+class Sha1Checksum(BaseChecksum):
+ def __init__(self):
+ self._checksum = sha1()
+
+ def update(self, chunk):
+ self._checksum.update(chunk)
+
+ def digest(self):
+ return self._checksum.digest()
+
+
+class Sha256Checksum(BaseChecksum):
+ def __init__(self):
+ self._checksum = sha256()
+
+ def update(self, chunk):
+ self._checksum.update(chunk)
+
+ def digest(self):
+ return self._checksum.digest()
+
+
+class AwsChunkedWrapper:
+ _DEFAULT_CHUNK_SIZE = 1024 * 1024
+
+ def __init__(
+ self,
+ raw,
+ checksum_cls=None,
+ checksum_name="x-amz-checksum",
+ chunk_size=None,
+ ):
+ self._raw = raw
+ self._checksum_name = checksum_name
+ self._checksum_cls = checksum_cls
+ self._reset()
+
+ if chunk_size is None:
+ chunk_size = self._DEFAULT_CHUNK_SIZE
+ self._chunk_size = chunk_size
+
+ def _reset(self):
+ self._remaining = b""
+ self._complete = False
+ self._checksum = None
+ if self._checksum_cls:
+ self._checksum = self._checksum_cls()
+
+ def seek(self, offset, whence=0):
+ if offset != 0 or whence != 0:
+ raise AwsChunkedWrapperError(
+ error_msg="Can only seek to start of stream"
+ )
+ self._reset()
+ self._raw.seek(0)
+
+ def read(self, size=None):
+ # Normalize "read all" size values to None
+ if size is not None and size <= 0:
+ size = None
+
+ # If the underlying body is done and we have nothing left then
+ # end the stream
+ if self._complete and not self._remaining:
+ return b""
+
+ # While we're not done and want more bytes
+ want_more_bytes = size is None or size > len(self._remaining)
+ while not self._complete and want_more_bytes:
+ self._remaining += self._make_chunk()
+ want_more_bytes = size is None or size > len(self._remaining)
+
+ # If size was None, we want to return everything
+ if size is None:
+ size = len(self._remaining)
+
+ # Return a chunk up to the size asked for
+ to_return = self._remaining[:size]
+ self._remaining = self._remaining[size:]
+ return to_return
+
+ def _make_chunk(self):
+ # NOTE: Chunk size is not deterministic as read could return less. This
+ # means we cannot know the content length of the encoded aws-chunked
+ # stream ahead of time without ensuring a consistent chunk size
+ raw_chunk = self._raw.read(self._chunk_size)
+ hex_len = hex(len(raw_chunk))[2:].encode("ascii")
+ self._complete = not raw_chunk
+
+ if self._checksum:
+ self._checksum.update(raw_chunk)
+
+ if self._checksum and self._complete:
+ name = self._checksum_name.encode("ascii")
+ checksum = self._checksum.b64digest().encode("ascii")
+ return b"0\r\n%s:%s\r\n\r\n" % (name, checksum)
+
+ return b"%s\r\n%s\r\n" % (hex_len, raw_chunk)
+
+ def __iter__(self):
+ while not self._complete:
+ yield self._make_chunk()
+
+
+class StreamingChecksumBody(StreamingBody):
+ def __init__(self, raw_stream, content_length, checksum, expected):
+ super().__init__(raw_stream, content_length)
+ self._checksum = checksum
+ self._expected = expected
+
+ def read(self, amt=None):
+ chunk = super().read(amt=amt)
+ self._checksum.update(chunk)
+ if amt is None or (not chunk and amt > 0):
+ self._validate_checksum()
+ return chunk
+
+ def _validate_checksum(self):
+ if self._checksum.digest() != base64.b64decode(self._expected):
+ error_msg = (
+ f"Expected checksum {self._expected} did not match calculated "
+ f"checksum: {self._checksum.b64digest()}"
+ )
+ raise FlexibleChecksumError(error_msg=error_msg)
+
+
+def resolve_checksum_context(request, operation_model, params):
+ resolve_request_checksum_algorithm(request, operation_model, params)
+ resolve_response_checksum_algorithms(request, operation_model, params)
+
+
+def resolve_request_checksum_algorithm(
+ request,
+ operation_model,
+ params,
+ supported_algorithms=None,
+):
+ # If the header is already set by the customer, skip calculation
+ if has_checksum_header(request):
+ return
+
+ checksum_context = request["context"].get("checksum", {})
+ request_checksum_calculation = request["context"][
+ "client_config"
+ ].request_checksum_calculation
+ http_checksum = operation_model.http_checksum
+ request_checksum_required = (
+ operation_model.http_checksum_required
+ or http_checksum.get("requestChecksumRequired")
+ )
+ algorithm_member = http_checksum.get("requestAlgorithmMember")
+ if algorithm_member and algorithm_member in params:
+ # If the client has opted into using flexible checksums and the
+ # request supports it, use that instead of checksum required
+ if supported_algorithms is None:
+ supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS
+
+ algorithm_name = params[algorithm_member].lower()
+ if algorithm_name not in supported_algorithms:
+ if not HAS_CRT and algorithm_name in _CRT_CHECKSUM_ALGORITHMS:
+ raise MissingDependencyException(
+ msg=(
+ f"Using {algorithm_name.upper()} requires an "
+ "additional dependency. You will need to pip install "
+ "botocore[crt] before proceeding."
+ )
+ )
+ raise FlexibleChecksumError(
+ error_msg=f"Unsupported checksum algorithm: {algorithm_name}"
+ )
+ elif request_checksum_required or (
+ algorithm_member and request_checksum_calculation == "when_supported"
+ ):
+ # Don't use a default checksum for presigned requests.
+ if request["context"].get("is_presign_request"):
+ return
+ algorithm_name = DEFAULT_CHECKSUM_ALGORITHM.lower()
+ algorithm_member_header = _get_request_algorithm_member_header(
+ operation_model, request, algorithm_member
+ )
+ if algorithm_member_header is not None:
+ checksum_context["request_algorithm_header"] = {
+ "name": algorithm_member_header,
+ "value": DEFAULT_CHECKSUM_ALGORITHM,
+ }
+ else:
+ return
+
+ location_type = "header"
+ if (
+ operation_model.has_streaming_input
+ and urlparse(request["url"]).scheme == "https"
+ ):
+ if request["context"]["client_config"].signature_version != 's3':
+ # Operations with streaming input must support trailers.
+ # We only support unsigned trailer checksums currently. As this
+ # disables payload signing we'll only use trailers over TLS.
+ location_type = "trailer"
+
+ algorithm = {
+ "algorithm": algorithm_name,
+ "in": location_type,
+ "name": f"x-amz-checksum-{algorithm_name}",
+ }
+
+ checksum_context["request_algorithm"] = algorithm
+ request["context"]["checksum"] = checksum_context
+
+
+def _get_request_algorithm_member_header(
+ operation_model, request, algorithm_member
+):
+ """Get the name of the header targeted by the "requestAlgorithmMember"."""
+ operation_input_shape = operation_model.input_shape
+ if not isinstance(operation_input_shape, StructureShape):
+ return
+
+ algorithm_member_shape = operation_input_shape.members.get(
+ algorithm_member
+ )
+
+ if algorithm_member_shape:
+ return algorithm_member_shape.serialization.get("name")
+
+
+def apply_request_checksum(request):
+ checksum_context = request.get("context", {}).get("checksum", {})
+ algorithm = checksum_context.get("request_algorithm")
+
+ if not algorithm:
+ return
+
+ if algorithm == "conditional-md5":
+ # Special case to handle the http checksum required trait
+ conditionally_calculate_md5(request)
+ elif algorithm["in"] == "header":
+ _apply_request_header_checksum(request)
+ elif algorithm["in"] == "trailer":
+ _apply_request_trailer_checksum(request)
+ else:
+ raise FlexibleChecksumError(
+ error_msg="Unknown checksum variant: {}".format(algorithm["in"])
+ )
+ if "request_algorithm_header" in checksum_context:
+ request_algorithm_header = checksum_context["request_algorithm_header"]
+ request["headers"][request_algorithm_header["name"]] = (
+ request_algorithm_header["value"]
+ )
+
+
+def _apply_request_header_checksum(request):
+ checksum_context = request.get("context", {}).get("checksum", {})
+ algorithm = checksum_context.get("request_algorithm")
+ location_name = algorithm["name"]
+ if location_name in request["headers"]:
+ # If the header is already set by the customer, skip calculation
+ return
+ checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"])
+ digest = checksum_cls().handle(request["body"])
+ request["headers"][location_name] = digest
+
+
+def _apply_request_trailer_checksum(request):
+ checksum_context = request.get("context", {}).get("checksum", {})
+ algorithm = checksum_context.get("request_algorithm")
+ location_name = algorithm["name"]
+ checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"])
+
+ headers = request["headers"]
+ body = request["body"]
+
+ if location_name in headers:
+ # If the header is already set by the customer, skip calculation
+ return
+
+ headers["Transfer-Encoding"] = "chunked"
+ if "Content-Encoding" in headers:
+ # We need to preserve the existing content encoding and add
+ # aws-chunked as a new content encoding.
+ headers["Content-Encoding"] += ",aws-chunked"
+ else:
+ headers["Content-Encoding"] = "aws-chunked"
+ headers["X-Amz-Trailer"] = location_name
+
+ content_length = determine_content_length(body)
+ if content_length is not None:
+ # Send the decoded content length if we can determine it. Some
+ # services such as S3 may require the decoded content length
+ headers["X-Amz-Decoded-Content-Length"] = str(content_length)
+
+ if "Content-Length" in headers:
+ del headers["Content-Length"]
+ logger.debug(
+ "Removing the Content-Length header since 'chunked' is specified for Transfer-Encoding."
+ )
+
+ if isinstance(body, (bytes, bytearray)):
+ body = io.BytesIO(body)
+
+ request["body"] = AwsChunkedWrapper(
+ body,
+ checksum_cls=checksum_cls,
+ checksum_name=location_name,
+ )
+
+
+def resolve_response_checksum_algorithms(
+ request, operation_model, params, supported_algorithms=None
+):
+ http_checksum = operation_model.http_checksum
+ mode_member = http_checksum.get("requestValidationModeMember")
+ if mode_member and mode_member in params:
+ if supported_algorithms is None:
+ supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS
+ response_algorithms = {
+ a.lower() for a in http_checksum.get("responseAlgorithms", [])
+ }
+
+ usable_algorithms = []
+ for algorithm in _ALGORITHMS_PRIORITY_LIST:
+ if algorithm not in response_algorithms:
+ continue
+ if algorithm in supported_algorithms:
+ usable_algorithms.append(algorithm)
+
+ checksum_context = request["context"].get("checksum", {})
+ checksum_context["response_algorithms"] = usable_algorithms
+ request["context"]["checksum"] = checksum_context
+
+
+def handle_checksum_body(http_response, response, context, operation_model):
+ headers = response["headers"]
+ checksum_context = context.get("checksum", {})
+ algorithms = checksum_context.get("response_algorithms")
+
+ if not algorithms:
+ return
+
+ for algorithm in algorithms:
+ header_name = f"x-amz-checksum-{algorithm}"
+ # If the header is not found, check the next algorithm
+ if header_name not in headers:
+ continue
+
+ # If a - is in the checksum this is not valid Base64. S3 returns
+ # checksums that include a -# suffix to indicate a checksum derived
+ # from the hash of all part checksums. We cannot wrap this response
+ if "-" in headers[header_name]:
+ continue
+
+ if operation_model.has_streaming_output:
+ response["body"] = _handle_streaming_response(
+ http_response, response, algorithm
+ )
+ else:
+ response["body"] = _handle_bytes_response(
+ http_response, response, algorithm
+ )
+
+ # Expose metadata that the checksum check actually occurred
+ checksum_context = response["context"].get("checksum", {})
+ checksum_context["response_algorithm"] = algorithm
+ response["context"]["checksum"] = checksum_context
+ return
+
+ logger.debug(
+ f'Skipping checksum validation. Response did not contain one of the '
+ f'following algorithms: {algorithms}.'
+ )
+
+
+def _handle_streaming_response(http_response, response, algorithm):
+ checksum_cls = _CHECKSUM_CLS.get(algorithm)
+ header_name = f"x-amz-checksum-{algorithm}"
+ return StreamingChecksumBody(
+ http_response.raw,
+ response["headers"].get("content-length"),
+ checksum_cls(),
+ response["headers"][header_name],
+ )
+
+
+def _handle_bytes_response(http_response, response, algorithm):
+ body = http_response.content
+ header_name = f"x-amz-checksum-{algorithm}"
+ checksum_cls = _CHECKSUM_CLS.get(algorithm)
+ checksum = checksum_cls()
+ checksum.update(body)
+ expected = response["headers"][header_name]
+ if checksum.digest() != base64.b64decode(expected):
+ error_msg = (
+ f"Expected checksum {expected} did not match calculated "
+ f"checksum: {checksum.b64digest()}"
+ )
+ raise FlexibleChecksumError(error_msg=error_msg)
+ return body
+
+
+_CHECKSUM_CLS = {
+ "crc32": Crc32Checksum,
+ "sha1": Sha1Checksum,
+ "sha256": Sha256Checksum,
+}
+_CRT_CHECKSUM_ALGORITHMS = ["crc32", "crc32c", "crc64nvme"]
+if HAS_CRT:
+ # Use CRT checksum implementations if available
+ _CRT_CHECKSUM_CLS = {
+ "crc32": CrtCrc32Checksum,
+ "crc32c": CrtCrc32cChecksum,
+ }
+
+ if has_minimum_crt_version((0, 23, 4)):
+ # CRC64NVME support wasn't officially added until 0.23.4
+ _CRT_CHECKSUM_CLS["crc64nvme"] = CrtCrc64NvmeChecksum
+
+ _CHECKSUM_CLS.update(_CRT_CHECKSUM_CLS)
+ # Validate this list isn't out of sync with _CRT_CHECKSUM_CLS keys
+ assert all(
+ name in _CRT_CHECKSUM_ALGORITHMS for name in _CRT_CHECKSUM_CLS.keys()
+ )
+_SUPPORTED_CHECKSUM_ALGORITHMS = list(_CHECKSUM_CLS.keys())
+_ALGORITHMS_PRIORITY_LIST = ['crc64nvme', 'crc32c', 'crc32', 'sha1', 'sha256']