about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/botocore/crt/auth.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/crt/auth.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/crt/auth.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/crt/auth.py631
1 files changed, 631 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/crt/auth.py b/.venv/lib/python3.12/site-packages/botocore/crt/auth.py
new file mode 100644
index 00000000..9919e20f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/crt/auth.py
@@ -0,0 +1,631 @@
+# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+import datetime
+from io import BytesIO
+
+from botocore.auth import (
+    SIGNED_HEADERS_BLACKLIST,
+    STREAMING_UNSIGNED_PAYLOAD_TRAILER,
+    UNSIGNED_PAYLOAD,
+    BaseSigner,
+    _get_body_as_dict,
+    _host_from_url,
+)
+from botocore.compat import HTTPHeaders, awscrt, parse_qs, urlsplit, urlunsplit
+from botocore.exceptions import NoCredentialsError
+from botocore.useragent import register_feature_id
+from botocore.utils import percent_encode_sequence
+
+
+class CrtSigV4Auth(BaseSigner):
+    REQUIRES_REGION = True
+    _PRESIGNED_HEADERS_BLOCKLIST = [
+        'Authorization',
+        'X-Amz-Date',
+        'X-Amz-Content-SHA256',
+        'X-Amz-Security-Token',
+    ]
+    _SIGNATURE_TYPE = awscrt.auth.AwsSignatureType.HTTP_REQUEST_HEADERS
+    _USE_DOUBLE_URI_ENCODE = True
+    _SHOULD_NORMALIZE_URI_PATH = True
+
+    def __init__(self, credentials, service_name, region_name):
+        self.credentials = credentials
+        self._service_name = service_name
+        self._region_name = region_name
+        self._expiration_in_seconds = None
+
+    def _is_streaming_checksum_payload(self, request):
+        checksum_context = request.context.get('checksum', {})
+        algorithm = checksum_context.get('request_algorithm')
+        return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer'
+
+    def add_auth(self, request):
+        if self.credentials is None:
+            raise NoCredentialsError()
+
+        # Use utcnow() because that's what gets mocked by tests, but set
+        # timezone because CRT assumes naive datetime is local time.
+        datetime_now = datetime.datetime.utcnow().replace(
+            tzinfo=datetime.timezone.utc
+        )
+
+        # Use existing 'X-Amz-Content-SHA256' header if able
+        existing_sha256 = self._get_existing_sha256(request)
+
+        self._modify_request_before_signing(request)
+
+        credentials_provider = awscrt.auth.AwsCredentialsProvider.new_static(
+            access_key_id=self.credentials.access_key,
+            secret_access_key=self.credentials.secret_key,
+            session_token=self.credentials.token,
+        )
+
+        if self._is_streaming_checksum_payload(request):
+            explicit_payload = STREAMING_UNSIGNED_PAYLOAD_TRAILER
+        elif self._should_sha256_sign_payload(request):
+            if existing_sha256:
+                explicit_payload = existing_sha256
+            else:
+                explicit_payload = None  # to be calculated during signing
+        else:
+            explicit_payload = UNSIGNED_PAYLOAD
+
+        if self._should_add_content_sha256_header(explicit_payload):
+            body_header = (
+                awscrt.auth.AwsSignedBodyHeaderType.X_AMZ_CONTENT_SHA_256
+            )
+        else:
+            body_header = awscrt.auth.AwsSignedBodyHeaderType.NONE
+
+        signing_config = awscrt.auth.AwsSigningConfig(
+            algorithm=awscrt.auth.AwsSigningAlgorithm.V4,
+            signature_type=self._SIGNATURE_TYPE,
+            credentials_provider=credentials_provider,
+            region=self._region_name,
+            service=self._service_name,
+            date=datetime_now,
+            should_sign_header=self._should_sign_header,
+            use_double_uri_encode=self._USE_DOUBLE_URI_ENCODE,
+            should_normalize_uri_path=self._SHOULD_NORMALIZE_URI_PATH,
+            signed_body_value=explicit_payload,
+            signed_body_header_type=body_header,
+            expiration_in_seconds=self._expiration_in_seconds,
+        )
+        crt_request = self._crt_request_from_aws_request(request)
+        future = awscrt.auth.aws_sign_request(crt_request, signing_config)
+        future.result()
+        self._apply_signing_changes(request, crt_request)
+
+    def _crt_request_from_aws_request(self, aws_request):
+        url_parts = urlsplit(aws_request.url)
+        crt_path = url_parts.path if url_parts.path else '/'
+        if aws_request.params:
+            array = []
+            for param, value in aws_request.params.items():
+                value = str(value)
+                array.append(f'{param}={value}')
+            crt_path = crt_path + '?' + '&'.join(array)
+        elif url_parts.query:
+            crt_path = f'{crt_path}?{url_parts.query}'
+
+        crt_headers = awscrt.http.HttpHeaders(aws_request.headers.items())
+
+        # CRT requires body (if it exists) to be an I/O stream.
+        crt_body_stream = None
+        if aws_request.body:
+            if hasattr(aws_request.body, 'seek'):
+                crt_body_stream = aws_request.body
+            else:
+                crt_body_stream = BytesIO(aws_request.body)
+
+        crt_request = awscrt.http.HttpRequest(
+            method=aws_request.method,
+            path=crt_path,
+            headers=crt_headers,
+            body_stream=crt_body_stream,
+        )
+        return crt_request
+
+    def _apply_signing_changes(self, aws_request, signed_crt_request):
+        # Apply changes from signed CRT request to the AWSRequest
+        aws_request.headers = HTTPHeaders.from_pairs(
+            list(signed_crt_request.headers)
+        )
+
+    def _should_sign_header(self, name, **kwargs):
+        return name.lower() not in SIGNED_HEADERS_BLACKLIST
+
+    def _modify_request_before_signing(self, request):
+        # This could be a retry. Make sure the previous
+        # authorization headers are removed first.
+        for h in self._PRESIGNED_HEADERS_BLOCKLIST:
+            if h in request.headers:
+                del request.headers[h]
+        # If necessary, add the host header
+        if 'host' not in request.headers:
+            request.headers['host'] = _host_from_url(request.url)
+
+    def _get_existing_sha256(self, request):
+        return request.headers.get('X-Amz-Content-SHA256')
+
+    def _should_sha256_sign_payload(self, request):
+        # Payloads will always be signed over insecure connections.
+        if not request.url.startswith('https'):
+            return True
+
+        # Certain operations may have payload signing disabled by default.
+        # Since we don't have access to the operation model, we pass in this
+        # bit of metadata through the request context.
+        return request.context.get('payload_signing_enabled', True)
+
+    def _should_add_content_sha256_header(self, explicit_payload):
+        # only add X-Amz-Content-SHA256 header if payload is explicitly set
+        return explicit_payload is not None
+
+
+class CrtS3SigV4Auth(CrtSigV4Auth):
+    # For S3, we do not normalize the path.
+    _USE_DOUBLE_URI_ENCODE = False
+    _SHOULD_NORMALIZE_URI_PATH = False
+
+    def _get_existing_sha256(self, request):
+        # always recalculate
+        return None
+
+    def _should_sha256_sign_payload(self, request):
+        # S3 allows optional body signing, so to minimize the performance
+        # impact, we opt to not SHA256 sign the body on streaming uploads,
+        # provided that we're on https.
+        client_config = request.context.get('client_config')
+        s3_config = getattr(client_config, 's3', None)
+
+        # The config could be None if it isn't set, or if the customer sets it
+        # to None.
+        if s3_config is None:
+            s3_config = {}
+
+        # The explicit configuration takes precedence over any implicit
+        # configuration.
+        sign_payload = s3_config.get('payload_signing_enabled', None)
+        if sign_payload is not None:
+            return sign_payload
+
+        # We require that both a checksum be present and https be enabled
+        # to implicitly disable body signing. The combination of TLS and
+        # a checksum is sufficiently secure and durable for us to be
+        # confident in the request without body signing.
+        checksum_header = 'Content-MD5'
+        checksum_context = request.context.get('checksum', {})
+        algorithm = checksum_context.get('request_algorithm')
+        if isinstance(algorithm, dict) and algorithm.get('in') == 'header':
+            checksum_header = algorithm['name']
+        if (
+            not request.url.startswith('https')
+            or checksum_header not in request.headers
+        ):
+            return True
+
+        # If the input is streaming we disable body signing by default.
+        if request.context.get('has_streaming_input', False):
+            return False
+
+        # If the S3-specific checks had no results, delegate to the generic
+        # checks.
+        return super()._should_sha256_sign_payload(request)
+
+    def _should_add_content_sha256_header(self, explicit_payload):
+        # Always add X-Amz-Content-SHA256 header
+        return True
+
+
+class CrtSigV4AsymAuth(BaseSigner):
+    REQUIRES_REGION = True
+    _PRESIGNED_HEADERS_BLOCKLIST = [
+        'Authorization',
+        'X-Amz-Date',
+        'X-Amz-Content-SHA256',
+        'X-Amz-Security-Token',
+    ]
+    _SIGNATURE_TYPE = awscrt.auth.AwsSignatureType.HTTP_REQUEST_HEADERS
+    _USE_DOUBLE_URI_ENCODE = True
+    _SHOULD_NORMALIZE_URI_PATH = True
+
+    def __init__(self, credentials, service_name, region_name):
+        self.credentials = credentials
+        self._service_name = service_name
+        self._region_name = region_name
+        self._expiration_in_seconds = None
+
+    def add_auth(self, request):
+        register_feature_id("SIGV4A_SIGNING")
+        if self.credentials is None:
+            raise NoCredentialsError()
+
+        # Use utcnow() because that's what gets mocked by tests, but set
+        # timezone because CRT assumes naive datetime is local time.
+        datetime_now = datetime.datetime.utcnow().replace(
+            tzinfo=datetime.timezone.utc
+        )
+
+        # Use existing 'X-Amz-Content-SHA256' header if able
+        existing_sha256 = self._get_existing_sha256(request)
+
+        self._modify_request_before_signing(request)
+
+        credentials_provider = awscrt.auth.AwsCredentialsProvider.new_static(
+            access_key_id=self.credentials.access_key,
+            secret_access_key=self.credentials.secret_key,
+            session_token=self.credentials.token,
+        )
+
+        if self._is_streaming_checksum_payload(request):
+            explicit_payload = STREAMING_UNSIGNED_PAYLOAD_TRAILER
+        elif self._should_sha256_sign_payload(request):
+            if existing_sha256:
+                explicit_payload = existing_sha256
+            else:
+                explicit_payload = None  # to be calculated during signing
+        else:
+            explicit_payload = UNSIGNED_PAYLOAD
+
+        if self._should_add_content_sha256_header(explicit_payload):
+            body_header = (
+                awscrt.auth.AwsSignedBodyHeaderType.X_AMZ_CONTENT_SHA_256
+            )
+        else:
+            body_header = awscrt.auth.AwsSignedBodyHeaderType.NONE
+
+        signing_config = awscrt.auth.AwsSigningConfig(
+            algorithm=awscrt.auth.AwsSigningAlgorithm.V4_ASYMMETRIC,
+            signature_type=self._SIGNATURE_TYPE,
+            credentials_provider=credentials_provider,
+            region=self._region_name,
+            service=self._service_name,
+            date=datetime_now,
+            should_sign_header=self._should_sign_header,
+            use_double_uri_encode=self._USE_DOUBLE_URI_ENCODE,
+            should_normalize_uri_path=self._SHOULD_NORMALIZE_URI_PATH,
+            signed_body_value=explicit_payload,
+            signed_body_header_type=body_header,
+            expiration_in_seconds=self._expiration_in_seconds,
+        )
+        crt_request = self._crt_request_from_aws_request(request)
+        future = awscrt.auth.aws_sign_request(crt_request, signing_config)
+        future.result()
+        self._apply_signing_changes(request, crt_request)
+
+    def _crt_request_from_aws_request(self, aws_request):
+        url_parts = urlsplit(aws_request.url)
+        crt_path = url_parts.path if url_parts.path else '/'
+        if aws_request.params:
+            array = []
+            for param, value in aws_request.params.items():
+                value = str(value)
+                array.append(f'{param}={value}')
+            crt_path = crt_path + '?' + '&'.join(array)
+        elif url_parts.query:
+            crt_path = f'{crt_path}?{url_parts.query}'
+
+        crt_headers = awscrt.http.HttpHeaders(aws_request.headers.items())
+
+        # CRT requires body (if it exists) to be an I/O stream.
+        crt_body_stream = None
+        if aws_request.body:
+            if hasattr(aws_request.body, 'seek'):
+                crt_body_stream = aws_request.body
+            else:
+                crt_body_stream = BytesIO(aws_request.body)
+
+        crt_request = awscrt.http.HttpRequest(
+            method=aws_request.method,
+            path=crt_path,
+            headers=crt_headers,
+            body_stream=crt_body_stream,
+        )
+        return crt_request
+
+    def _apply_signing_changes(self, aws_request, signed_crt_request):
+        # Apply changes from signed CRT request to the AWSRequest
+        aws_request.headers = HTTPHeaders.from_pairs(
+            list(signed_crt_request.headers)
+        )
+
+    def _should_sign_header(self, name, **kwargs):
+        return name.lower() not in SIGNED_HEADERS_BLACKLIST
+
+    def _modify_request_before_signing(self, request):
+        # This could be a retry. Make sure the previous
+        # authorization headers are removed first.
+        for h in self._PRESIGNED_HEADERS_BLOCKLIST:
+            if h in request.headers:
+                del request.headers[h]
+        # If necessary, add the host header
+        if 'host' not in request.headers:
+            request.headers['host'] = _host_from_url(request.url)
+
+    def _get_existing_sha256(self, request):
+        return request.headers.get('X-Amz-Content-SHA256')
+
+    def _is_streaming_checksum_payload(self, request):
+        checksum_context = request.context.get('checksum', {})
+        algorithm = checksum_context.get('request_algorithm')
+        return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer'
+
+    def _should_sha256_sign_payload(self, request):
+        # Payloads will always be signed over insecure connections.
+        if not request.url.startswith('https'):
+            return True
+
+        # Certain operations may have payload signing disabled by default.
+        # Since we don't have access to the operation model, we pass in this
+        # bit of metadata through the request context.
+        return request.context.get('payload_signing_enabled', True)
+
+    def _should_add_content_sha256_header(self, explicit_payload):
+        # only add X-Amz-Content-SHA256 header if payload is explicitly set
+        return explicit_payload is not None
+
+
+class CrtS3SigV4AsymAuth(CrtSigV4AsymAuth):
+    # For S3, we do not normalize the path.
+    _USE_DOUBLE_URI_ENCODE = False
+    _SHOULD_NORMALIZE_URI_PATH = False
+
+    def _get_existing_sha256(self, request):
+        # always recalculate
+        return None
+
+    def _should_sha256_sign_payload(self, request):
+        # S3 allows optional body signing, so to minimize the performance
+        # impact, we opt to not SHA256 sign the body on streaming uploads,
+        # provided that we're on https.
+        client_config = request.context.get('client_config')
+        s3_config = getattr(client_config, 's3', None)
+
+        # The config could be None if it isn't set, or if the customer sets it
+        # to None.
+        if s3_config is None:
+            s3_config = {}
+
+        # The explicit configuration takes precedence over any implicit
+        # configuration.
+        sign_payload = s3_config.get('payload_signing_enabled', None)
+        if sign_payload is not None:
+            return sign_payload
+
+        # We require that both content-md5 be present and https be enabled
+        # to implicitly disable body signing. The combination of TLS and
+        # content-md5 is sufficiently secure and durable for us to be
+        # confident in the request without body signing.
+        if (
+            not request.url.startswith('https')
+            or 'Content-MD5' not in request.headers
+        ):
+            return True
+
+        # If the input is streaming we disable body signing by default.
+        if request.context.get('has_streaming_input', False):
+            return False
+
+        # If the S3-specific checks had no results, delegate to the generic
+        # checks.
+        return super()._should_sha256_sign_payload(request)
+
+    def _should_add_content_sha256_header(self, explicit_payload):
+        # Always add X-Amz-Content-SHA256 header
+        return True
+
+
+class CrtSigV4AsymQueryAuth(CrtSigV4AsymAuth):
+    DEFAULT_EXPIRES = 3600
+    _SIGNATURE_TYPE = awscrt.auth.AwsSignatureType.HTTP_REQUEST_QUERY_PARAMS
+
+    def __init__(
+        self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES
+    ):
+        super().__init__(credentials, service_name, region_name)
+        self._expiration_in_seconds = expires
+
+    def _modify_request_before_signing(self, request):
+        super()._modify_request_before_signing(request)
+
+        # We automatically set this header, so if it's the auto-set value we
+        # want to get rid of it since it doesn't make sense for presigned urls.
+        content_type = request.headers.get('content-type')
+        if content_type == 'application/x-www-form-urlencoded; charset=utf-8':
+            del request.headers['content-type']
+
+        # Now parse the original query string to a dict, inject our new query
+        # params, and serialize back to a query string.
+        url_parts = urlsplit(request.url)
+        # parse_qs makes each value a list, but in our case we know we won't
+        # have repeated keys so we know we have single element lists which we
+        # can convert back to scalar values.
+        query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
+        query_dict = {k: v[0] for k, v in query_string_parts.items()}
+
+        # The spec is particular about this.  It *has* to be:
+        # https://<endpoint>?<operation params>&<auth params>
+        # You can't mix the two types of params together, i.e just keep doing
+        # new_query_params.update(op_params)
+        # new_query_params.update(auth_params)
+        # percent_encode_sequence(new_query_params)
+        if request.data:
+            # We also need to move the body params into the query string. To
+            # do this, we first have to convert it to a dict.
+            query_dict.update(_get_body_as_dict(request))
+            request.data = ''
+        new_query_string = percent_encode_sequence(query_dict)
+        # url_parts is a tuple (and therefore immutable) so we need to create
+        # a new url_parts with the new query string.
+        # <part>   - <index>
+        # scheme   - 0
+        # netloc   - 1
+        # path     - 2
+        # query    - 3  <-- we're replacing this.
+        # fragment - 4
+        p = url_parts
+        new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
+        request.url = urlunsplit(new_url_parts)
+
+    def _apply_signing_changes(self, aws_request, signed_crt_request):
+        # Apply changes from signed CRT request to the AWSRequest
+        super()._apply_signing_changes(aws_request, signed_crt_request)
+
+        signed_query = urlsplit(signed_crt_request.path).query
+        p = urlsplit(aws_request.url)
+        # urlsplit() returns a tuple (and therefore immutable) so we
+        # need to create new url with the new query string.
+        # <part>   - <index>
+        # scheme   - 0
+        # netloc   - 1
+        # path     - 2
+        # query    - 3  <-- we're replacing this.
+        # fragment - 4
+        aws_request.url = urlunsplit((p[0], p[1], p[2], signed_query, p[4]))
+
+
+class CrtS3SigV4AsymQueryAuth(CrtSigV4AsymQueryAuth):
+    """S3 SigV4A auth using query parameters.
+    This signer will sign a request using query parameters and signature
+    version 4A, i.e a "presigned url" signer.
+    """
+
+    # For S3, we do not normalize the path.
+    _USE_DOUBLE_URI_ENCODE = False
+    _SHOULD_NORMALIZE_URI_PATH = False
+
+    def _should_sha256_sign_payload(self, request):
+        # From the doc link above:
+        # "You don't include a payload hash in the Canonical Request, because
+        # when you create a presigned URL, you don't know anything about the
+        # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
+        return False
+
+    def _should_add_content_sha256_header(self, explicit_payload):
+        # Never add X-Amz-Content-SHA256 header
+        return False
+
+
+class CrtSigV4QueryAuth(CrtSigV4Auth):
+    DEFAULT_EXPIRES = 3600
+    _SIGNATURE_TYPE = awscrt.auth.AwsSignatureType.HTTP_REQUEST_QUERY_PARAMS
+
+    def __init__(
+        self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES
+    ):
+        super().__init__(credentials, service_name, region_name)
+        self._expiration_in_seconds = expires
+
+    def _modify_request_before_signing(self, request):
+        super()._modify_request_before_signing(request)
+
+        # We automatically set this header, so if it's the auto-set value we
+        # want to get rid of it since it doesn't make sense for presigned urls.
+        content_type = request.headers.get('content-type')
+        if content_type == 'application/x-www-form-urlencoded; charset=utf-8':
+            del request.headers['content-type']
+
+        # Now parse the original query string to a dict, inject our new query
+        # params, and serialize back to a query string.
+        url_parts = urlsplit(request.url)
+        # parse_qs makes each value a list, but in our case we know we won't
+        # have repeated keys so we know we have single element lists which we
+        # can convert back to scalar values.
+        query_dict = {
+            k: v[0]
+            for k, v in parse_qs(
+                url_parts.query, keep_blank_values=True
+            ).items()
+        }
+        if request.params:
+            query_dict.update(request.params)
+            request.params = {}
+        # The spec is particular about this.  It *has* to be:
+        # https://<endpoint>?<operation params>&<auth params>
+        # You can't mix the two types of params together, i.e just keep doing
+        # new_query_params.update(op_params)
+        # new_query_params.update(auth_params)
+        # percent_encode_sequence(new_query_params)
+        if request.data:
+            # We also need to move the body params into the query string. To
+            # do this, we first have to convert it to a dict.
+            query_dict.update(_get_body_as_dict(request))
+            request.data = ''
+        new_query_string = percent_encode_sequence(query_dict)
+        # url_parts is a tuple (and therefore immutable) so we need to create
+        # a new url_parts with the new query string.
+        # <part>   - <index>
+        # scheme   - 0
+        # netloc   - 1
+        # path     - 2
+        # query    - 3  <-- we're replacing this.
+        # fragment - 4
+        p = url_parts
+        new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
+        request.url = urlunsplit(new_url_parts)
+
+    def _apply_signing_changes(self, aws_request, signed_crt_request):
+        # Apply changes from signed CRT request to the AWSRequest
+        super()._apply_signing_changes(aws_request, signed_crt_request)
+
+        signed_query = urlsplit(signed_crt_request.path).query
+        p = urlsplit(aws_request.url)
+        # urlsplit() returns a tuple (and therefore immutable) so we
+        # need to create new url with the new query string.
+        # <part>   - <index>
+        # scheme   - 0
+        # netloc   - 1
+        # path     - 2
+        # query    - 3  <-- we're replacing this.
+        # fragment - 4
+        aws_request.url = urlunsplit((p[0], p[1], p[2], signed_query, p[4]))
+
+
+class CrtS3SigV4QueryAuth(CrtSigV4QueryAuth):
+    """S3 SigV4 auth using query parameters.
+    This signer will sign a request using query parameters and signature
+    version 4, i.e a "presigned url" signer.
+    Based off of:
+    http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
+    """
+
+    # For S3, we do not normalize the path.
+    _USE_DOUBLE_URI_ENCODE = False
+    _SHOULD_NORMALIZE_URI_PATH = False
+
+    def _should_sha256_sign_payload(self, request):
+        # From the doc link above:
+        # "You don't include a payload hash in the Canonical Request, because
+        # when you create a presigned URL, you don't know anything about the
+        # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
+        return False
+
+    def _should_add_content_sha256_header(self, explicit_payload):
+        # Never add X-Amz-Content-SHA256 header
+        return False
+
+
+# Defined at the bottom of module to ensure all Auth
+# classes are defined.
+CRT_AUTH_TYPE_MAPS = {
+    'v4': CrtSigV4Auth,
+    'v4-query': CrtSigV4QueryAuth,
+    'v4a': CrtSigV4AsymAuth,
+    's3v4': CrtS3SigV4Auth,
+    's3v4-query': CrtS3SigV4QueryAuth,
+    's3v4a': CrtS3SigV4AsymAuth,
+    's3v4a-query': CrtS3SigV4AsymQueryAuth,
+}