aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/auth.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/auth.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/auth.py1188
1 files changed, 1188 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/auth.py b/.venv/lib/python3.12/site-packages/botocore/auth.py
new file mode 100644
index 00000000..bacbd39d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/auth.py
@@ -0,0 +1,1188 @@
+# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
+# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import base64
+import calendar
+import datetime
+import functools
+import hmac
+import json
+import logging
+import time
+from collections.abc import Mapping
+from email.utils import formatdate
+from hashlib import sha1, sha256
+from operator import itemgetter
+
+from botocore.compat import (
+ HAS_CRT,
+ HTTPHeaders,
+ encodebytes,
+ ensure_unicode,
+ parse_qs,
+ quote,
+ unquote,
+ urlsplit,
+ urlunsplit,
+)
+from botocore.exceptions import (
+ NoAuthTokenError,
+ NoCredentialsError,
+ UnknownSignatureVersionError,
+ UnsupportedSignatureVersionError,
+)
+from botocore.utils import (
+ is_valid_ipv6_endpoint_url,
+ normalize_url_path,
+ percent_encode_sequence,
+)
+
+# Imports for backwards compatibility
+from botocore.compat import MD5_AVAILABLE # noqa
+
+
+logger = logging.getLogger(__name__)
+
+
+EMPTY_SHA256_HASH = (
+ 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
+)
+# This is the buffer size used when calculating sha256 checksums.
+# Experimenting with various buffer sizes showed that this value generally
+# gave the best result (in terms of performance).
+PAYLOAD_BUFFER = 1024 * 1024
+ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
+SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ'
+SIGNED_HEADERS_BLACKLIST = [
+ 'expect',
+ 'transfer-encoding',
+ 'user-agent',
+ 'x-amzn-trace-id',
+]
+UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD'
+STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER'
+
+
+def _host_from_url(url):
+ # Given URL, derive value for host header. Ensure that value:
+ # 1) is lowercase
+ # 2) excludes port, if it was the default port
+ # 3) excludes userinfo
+ url_parts = urlsplit(url)
+ host = url_parts.hostname # urlsplit's hostname is always lowercase
+ if is_valid_ipv6_endpoint_url(url):
+ host = f'[{host}]'
+ default_ports = {
+ 'http': 80,
+ 'https': 443,
+ }
+ if url_parts.port is not None:
+ if url_parts.port != default_ports.get(url_parts.scheme):
+ host = '%s:%d' % (host, url_parts.port)
+ return host
+
+
+def _get_body_as_dict(request):
+ # For query services, request.data is form-encoded and is already a
+ # dict, but for other services such as rest-json it could be a json
+ # string or bytes. In those cases we attempt to load the data as a
+ # dict.
+ data = request.data
+ if isinstance(data, bytes):
+ data = json.loads(data.decode('utf-8'))
+ elif isinstance(data, str):
+ data = json.loads(data)
+ return data
+
+
+class BaseSigner:
+ REQUIRES_REGION = False
+ REQUIRES_TOKEN = False
+
+ def add_auth(self, request):
+ raise NotImplementedError("add_auth")
+
+
+class TokenSigner(BaseSigner):
+ REQUIRES_TOKEN = True
+ """
+ Signers that expect an authorization token to perform the authorization
+ """
+
+ def __init__(self, auth_token):
+ self.auth_token = auth_token
+
+
+class SigV2Auth(BaseSigner):
+ """
+ Sign a request with Signature V2.
+ """
+
+ def __init__(self, credentials):
+ self.credentials = credentials
+
+ def calc_signature(self, request, params):
+ logger.debug("Calculating signature using v2 auth.")
+ split = urlsplit(request.url)
+ path = split.path
+ if len(path) == 0:
+ path = '/'
+ string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n"
+ lhmac = hmac.new(
+ self.credentials.secret_key.encode("utf-8"), digestmod=sha256
+ )
+ pairs = []
+ for key in sorted(params):
+ # Any previous signature should not be a part of this
+ # one, so we skip that particular key. This prevents
+ # issues during retries.
+ if key == 'Signature':
+ continue
+ value = str(params[key])
+ quoted_key = quote(key.encode('utf-8'), safe='')
+ quoted_value = quote(value.encode('utf-8'), safe='-_~')
+ pairs.append(f'{quoted_key}={quoted_value}')
+ qs = '&'.join(pairs)
+ string_to_sign += qs
+ logger.debug('String to sign: %s', string_to_sign)
+ lhmac.update(string_to_sign.encode('utf-8'))
+ b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8')
+ return (qs, b64)
+
+ def add_auth(self, request):
+ # The auth handler is the last thing called in the
+ # preparation phase of a prepared request.
+ # Because of this we have to parse the query params
+ # from the request body so we can update them with
+ # the sigv2 auth params.
+ if self.credentials is None:
+ raise NoCredentialsError()
+ if request.data:
+ # POST
+ params = request.data
+ else:
+ # GET
+ params = request.params
+ params['AWSAccessKeyId'] = self.credentials.access_key
+ params['SignatureVersion'] = '2'
+ params['SignatureMethod'] = 'HmacSHA256'
+ params['Timestamp'] = time.strftime(ISO8601, time.gmtime())
+ if self.credentials.token:
+ params['SecurityToken'] = self.credentials.token
+ qs, signature = self.calc_signature(request, params)
+ params['Signature'] = signature
+ return request
+
+
+class SigV3Auth(BaseSigner):
+ def __init__(self, credentials):
+ self.credentials = credentials
+
+ def add_auth(self, request):
+ if self.credentials is None:
+ raise NoCredentialsError()
+ if 'Date' in request.headers:
+ del request.headers['Date']
+ request.headers['Date'] = formatdate(usegmt=True)
+ if self.credentials.token:
+ if 'X-Amz-Security-Token' in request.headers:
+ del request.headers['X-Amz-Security-Token']
+ request.headers['X-Amz-Security-Token'] = self.credentials.token
+ new_hmac = hmac.new(
+ self.credentials.secret_key.encode('utf-8'), digestmod=sha256
+ )
+ new_hmac.update(request.headers['Date'].encode('utf-8'))
+ encoded_signature = encodebytes(new_hmac.digest()).strip()
+ signature = (
+ f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key},"
+ f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}"
+ )
+ if 'X-Amzn-Authorization' in request.headers:
+ del request.headers['X-Amzn-Authorization']
+ request.headers['X-Amzn-Authorization'] = signature
+
+
+class SigV4Auth(BaseSigner):
+ """
+ Sign a request with Signature V4.
+ """
+
+ REQUIRES_REGION = True
+
+ def __init__(self, credentials, service_name, region_name):
+ self.credentials = credentials
+ # We initialize these value here so the unit tests can have
+ # valid values. But these will get overriden in ``add_auth``
+ # later for real requests.
+ self._region_name = region_name
+ self._service_name = service_name
+
+ def _sign(self, key, msg, hex=False):
+ if hex:
+ sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()
+ else:
+ sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()
+ return sig
+
+ def headers_to_sign(self, request):
+ """
+ Select the headers from the request that need to be included
+ in the StringToSign.
+ """
+ header_map = HTTPHeaders()
+ for name, value in request.headers.items():
+ lname = name.lower()
+ if lname not in SIGNED_HEADERS_BLACKLIST:
+ header_map[lname] = value
+ if 'host' not in header_map:
+ # TODO: We should set the host ourselves, instead of relying on our
+ # HTTP client to set it for us.
+ header_map['host'] = _host_from_url(request.url)
+ return header_map
+
+ def canonical_query_string(self, request):
+ # The query string can come from two parts. One is the
+ # params attribute of the request. The other is from the request
+ # url (in which case we have to re-split the url into its components
+ # and parse out the query string component).
+ if request.params:
+ return self._canonical_query_string_params(request.params)
+ else:
+ return self._canonical_query_string_url(urlsplit(request.url))
+
+ def _canonical_query_string_params(self, params):
+ # [(key, value), (key2, value2)]
+ key_val_pairs = []
+ if isinstance(params, Mapping):
+ params = params.items()
+ for key, value in params:
+ key_val_pairs.append(
+ (quote(key, safe='-_.~'), quote(str(value), safe='-_.~'))
+ )
+ sorted_key_vals = []
+ # Sort by the URI-encoded key names, and in the case of
+ # repeated keys, then sort by the value.
+ for key, value in sorted(key_val_pairs):
+ sorted_key_vals.append(f'{key}={value}')
+ canonical_query_string = '&'.join(sorted_key_vals)
+ return canonical_query_string
+
+ def _canonical_query_string_url(self, parts):
+ canonical_query_string = ''
+ if parts.query:
+ # [(key, value), (key2, value2)]
+ key_val_pairs = []
+ for pair in parts.query.split('&'):
+ key, _, value = pair.partition('=')
+ key_val_pairs.append((key, value))
+ sorted_key_vals = []
+ # Sort by the URI-encoded key names, and in the case of
+ # repeated keys, then sort by the value.
+ for key, value in sorted(key_val_pairs):
+ sorted_key_vals.append(f'{key}={value}')
+ canonical_query_string = '&'.join(sorted_key_vals)
+ return canonical_query_string
+
+ def canonical_headers(self, headers_to_sign):
+ """
+ Return the headers that need to be included in the StringToSign
+ in their canonical form by converting all header keys to lower
+ case, sorting them in alphabetical order and then joining
+ them into a string, separated by newlines.
+ """
+ headers = []
+ sorted_header_names = sorted(set(headers_to_sign))
+ for key in sorted_header_names:
+ value = ','.join(
+ self._header_value(v) for v in headers_to_sign.get_all(key)
+ )
+ headers.append(f'{key}:{ensure_unicode(value)}')
+ return '\n'.join(headers)
+
+ def _header_value(self, value):
+ # From the sigv4 docs:
+ # Lowercase(HeaderName) + ':' + Trimall(HeaderValue)
+ #
+ # The Trimall function removes excess white space before and after
+ # values, and converts sequential spaces to a single space.
+ return ' '.join(value.split())
+
+ def signed_headers(self, headers_to_sign):
+ headers = sorted(n.lower().strip() for n in set(headers_to_sign))
+ return ';'.join(headers)
+
+ def _is_streaming_checksum_payload(self, request):
+ checksum_context = request.context.get('checksum', {})
+ algorithm = checksum_context.get('request_algorithm')
+ return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer'
+
+ def payload(self, request):
+ if self._is_streaming_checksum_payload(request):
+ return STREAMING_UNSIGNED_PAYLOAD_TRAILER
+ elif not self._should_sha256_sign_payload(request):
+ # When payload signing is disabled, we use this static string in
+ # place of the payload checksum.
+ return UNSIGNED_PAYLOAD
+ request_body = request.body
+ if request_body and hasattr(request_body, 'seek'):
+ position = request_body.tell()
+ read_chunksize = functools.partial(
+ request_body.read, PAYLOAD_BUFFER
+ )
+ checksum = sha256()
+ for chunk in iter(read_chunksize, b''):
+ checksum.update(chunk)
+ hex_checksum = checksum.hexdigest()
+ request_body.seek(position)
+ return hex_checksum
+ elif request_body:
+ # The request serialization has ensured that
+ # request.body is a bytes() type.
+ return sha256(request_body).hexdigest()
+ else:
+ return EMPTY_SHA256_HASH
+
+ def _should_sha256_sign_payload(self, request):
+ # Payloads will always be signed over insecure connections.
+ if not request.url.startswith('https'):
+ return True
+
+ # Certain operations may have payload signing disabled by default.
+ # Since we don't have access to the operation model, we pass in this
+ # bit of metadata through the request context.
+ return request.context.get('payload_signing_enabled', True)
+
+ def canonical_request(self, request):
+ cr = [request.method.upper()]
+ path = self._normalize_url_path(urlsplit(request.url).path)
+ cr.append(path)
+ cr.append(self.canonical_query_string(request))
+ headers_to_sign = self.headers_to_sign(request)
+ cr.append(self.canonical_headers(headers_to_sign) + '\n')
+ cr.append(self.signed_headers(headers_to_sign))
+ if 'X-Amz-Content-SHA256' in request.headers:
+ body_checksum = request.headers['X-Amz-Content-SHA256']
+ else:
+ body_checksum = self.payload(request)
+ cr.append(body_checksum)
+ return '\n'.join(cr)
+
+ def _normalize_url_path(self, path):
+ normalized_path = quote(normalize_url_path(path), safe='/~')
+ return normalized_path
+
+ def scope(self, request):
+ scope = [self.credentials.access_key]
+ scope.append(request.context['timestamp'][0:8])
+ scope.append(self._region_name)
+ scope.append(self._service_name)
+ scope.append('aws4_request')
+ return '/'.join(scope)
+
+ def credential_scope(self, request):
+ scope = []
+ scope.append(request.context['timestamp'][0:8])
+ scope.append(self._region_name)
+ scope.append(self._service_name)
+ scope.append('aws4_request')
+ return '/'.join(scope)
+
+ def string_to_sign(self, request, canonical_request):
+ """
+ Return the canonical StringToSign as well as a dict
+ containing the original version of all headers that
+ were included in the StringToSign.
+ """
+ sts = ['AWS4-HMAC-SHA256']
+ sts.append(request.context['timestamp'])
+ sts.append(self.credential_scope(request))
+ sts.append(sha256(canonical_request.encode('utf-8')).hexdigest())
+ return '\n'.join(sts)
+
+ def signature(self, string_to_sign, request):
+ key = self.credentials.secret_key
+ k_date = self._sign(
+ (f"AWS4{key}").encode(), request.context["timestamp"][0:8]
+ )
+ k_region = self._sign(k_date, self._region_name)
+ k_service = self._sign(k_region, self._service_name)
+ k_signing = self._sign(k_service, 'aws4_request')
+ return self._sign(k_signing, string_to_sign, hex=True)
+
+ def add_auth(self, request):
+ if self.credentials is None:
+ raise NoCredentialsError()
+ datetime_now = datetime.datetime.utcnow()
+ request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
+ # This could be a retry. Make sure the previous
+ # authorization header is removed first.
+ self._modify_request_before_signing(request)
+ canonical_request = self.canonical_request(request)
+ logger.debug("Calculating signature using v4 auth.")
+ logger.debug('CanonicalRequest:\n%s', canonical_request)
+ string_to_sign = self.string_to_sign(request, canonical_request)
+ logger.debug('StringToSign:\n%s', string_to_sign)
+ signature = self.signature(string_to_sign, request)
+ logger.debug('Signature:\n%s', signature)
+
+ self._inject_signature_to_request(request, signature)
+
+ def _inject_signature_to_request(self, request, signature):
+ auth_str = [f'AWS4-HMAC-SHA256 Credential={self.scope(request)}']
+ headers_to_sign = self.headers_to_sign(request)
+ auth_str.append(
+ f"SignedHeaders={self.signed_headers(headers_to_sign)}"
+ )
+ auth_str.append(f'Signature={signature}')
+ request.headers['Authorization'] = ', '.join(auth_str)
+ return request
+
+ def _modify_request_before_signing(self, request):
+ if 'Authorization' in request.headers:
+ del request.headers['Authorization']
+ self._set_necessary_date_headers(request)
+ if self.credentials.token:
+ if 'X-Amz-Security-Token' in request.headers:
+ del request.headers['X-Amz-Security-Token']
+ request.headers['X-Amz-Security-Token'] = self.credentials.token
+
+ if not request.context.get('payload_signing_enabled', True):
+ if 'X-Amz-Content-SHA256' in request.headers:
+ del request.headers['X-Amz-Content-SHA256']
+ request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD
+
+ def _set_necessary_date_headers(self, request):
+ # The spec allows for either the Date _or_ the X-Amz-Date value to be
+ # used so we check both. If there's a Date header, we use the date
+ # header. Otherwise we use the X-Amz-Date header.
+ if 'Date' in request.headers:
+ del request.headers['Date']
+ datetime_timestamp = datetime.datetime.strptime(
+ request.context['timestamp'], SIGV4_TIMESTAMP
+ )
+ request.headers['Date'] = formatdate(
+ int(calendar.timegm(datetime_timestamp.timetuple()))
+ )
+ if 'X-Amz-Date' in request.headers:
+ del request.headers['X-Amz-Date']
+ else:
+ if 'X-Amz-Date' in request.headers:
+ del request.headers['X-Amz-Date']
+ request.headers['X-Amz-Date'] = request.context['timestamp']
+
+
+class S3SigV4Auth(SigV4Auth):
+ def _modify_request_before_signing(self, request):
+ super()._modify_request_before_signing(request)
+ if 'X-Amz-Content-SHA256' in request.headers:
+ del request.headers['X-Amz-Content-SHA256']
+
+ request.headers['X-Amz-Content-SHA256'] = self.payload(request)
+
+ def _should_sha256_sign_payload(self, request):
+ # S3 allows optional body signing, so to minimize the performance
+ # impact, we opt to not SHA256 sign the body on streaming uploads,
+ # provided that we're on https.
+ client_config = request.context.get('client_config')
+ s3_config = getattr(client_config, 's3', None)
+
+ # The config could be None if it isn't set, or if the customer sets it
+ # to None.
+ if s3_config is None:
+ s3_config = {}
+
+ # The explicit configuration takes precedence over any implicit
+ # configuration.
+ sign_payload = s3_config.get('payload_signing_enabled', None)
+ if sign_payload is not None:
+ return sign_payload
+
+ # We require that both a checksum be present and https be enabled
+ # to implicitly disable body signing. The combination of TLS and
+ # a checksum is sufficiently secure and durable for us to be
+ # confident in the request without body signing.
+ checksum_header = 'Content-MD5'
+ checksum_context = request.context.get('checksum', {})
+ algorithm = checksum_context.get('request_algorithm')
+ if isinstance(algorithm, dict) and algorithm.get('in') == 'header':
+ checksum_header = algorithm['name']
+ if (
+ not request.url.startswith("https")
+ or checksum_header not in request.headers
+ ):
+ return True
+
+ # If the input is streaming we disable body signing by default.
+ if request.context.get('has_streaming_input', False):
+ return False
+
+ # If the S3-specific checks had no results, delegate to the generic
+ # checks.
+ return super()._should_sha256_sign_payload(request)
+
+ def _normalize_url_path(self, path):
+ # For S3, we do not normalize the path.
+ return path
+
+
+class S3ExpressAuth(S3SigV4Auth):
+ REQUIRES_IDENTITY_CACHE = True
+
+ def __init__(
+ self, credentials, service_name, region_name, *, identity_cache
+ ):
+ super().__init__(credentials, service_name, region_name)
+ self._identity_cache = identity_cache
+
+ def add_auth(self, request):
+ super().add_auth(request)
+
+ def _modify_request_before_signing(self, request):
+ super()._modify_request_before_signing(request)
+ if 'x-amz-s3session-token' not in request.headers:
+ request.headers['x-amz-s3session-token'] = self.credentials.token
+ # S3Express does not support STS' X-Amz-Security-Token
+ if 'X-Amz-Security-Token' in request.headers:
+ del request.headers['X-Amz-Security-Token']
+
+
+class S3ExpressPostAuth(S3ExpressAuth):
+ REQUIRES_IDENTITY_CACHE = True
+
+ def add_auth(self, request):
+ datetime_now = datetime.datetime.utcnow()
+ request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
+
+ fields = {}
+ if request.context.get('s3-presign-post-fields', None) is not None:
+ fields = request.context['s3-presign-post-fields']
+
+ policy = {}
+ conditions = []
+ if request.context.get('s3-presign-post-policy', None) is not None:
+ policy = request.context['s3-presign-post-policy']
+ if policy.get('conditions', None) is not None:
+ conditions = policy['conditions']
+
+ policy['conditions'] = conditions
+
+ fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
+ fields['x-amz-credential'] = self.scope(request)
+ fields['x-amz-date'] = request.context['timestamp']
+
+ conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
+ conditions.append({'x-amz-credential': self.scope(request)})
+ conditions.append({'x-amz-date': request.context['timestamp']})
+
+ if self.credentials.token is not None:
+ fields['X-Amz-S3session-Token'] = self.credentials.token
+ conditions.append(
+ {'X-Amz-S3session-Token': self.credentials.token}
+ )
+
+ # Dump the base64 encoded policy into the fields dictionary.
+ fields['policy'] = base64.b64encode(
+ json.dumps(policy).encode('utf-8')
+ ).decode('utf-8')
+
+ fields['x-amz-signature'] = self.signature(fields['policy'], request)
+
+ request.context['s3-presign-post-fields'] = fields
+ request.context['s3-presign-post-policy'] = policy
+
+
+class S3ExpressQueryAuth(S3ExpressAuth):
+ DEFAULT_EXPIRES = 300
+ REQUIRES_IDENTITY_CACHE = True
+
+ def __init__(
+ self,
+ credentials,
+ service_name,
+ region_name,
+ *,
+ identity_cache,
+ expires=DEFAULT_EXPIRES,
+ ):
+ super().__init__(
+ credentials,
+ service_name,
+ region_name,
+ identity_cache=identity_cache,
+ )
+ self._expires = expires
+
+ def _modify_request_before_signing(self, request):
+ # We automatically set this header, so if it's the auto-set value we
+ # want to get rid of it since it doesn't make sense for presigned urls.
+ content_type = request.headers.get('content-type')
+ blocklisted_content_type = (
+ 'application/x-www-form-urlencoded; charset=utf-8'
+ )
+ if content_type == blocklisted_content_type:
+ del request.headers['content-type']
+
+ # Note that we're not including X-Amz-Signature.
+ # From the docs: "The Canonical Query String must include all the query
+ # parameters from the preceding table except for X-Amz-Signature.
+ signed_headers = self.signed_headers(self.headers_to_sign(request))
+
+ auth_params = {
+ 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
+ 'X-Amz-Credential': self.scope(request),
+ 'X-Amz-Date': request.context['timestamp'],
+ 'X-Amz-Expires': self._expires,
+ 'X-Amz-SignedHeaders': signed_headers,
+ }
+ if self.credentials.token is not None:
+ auth_params['X-Amz-S3session-Token'] = self.credentials.token
+ # Now parse the original query string to a dict, inject our new query
+ # params, and serialize back to a query string.
+ url_parts = urlsplit(request.url)
+ # parse_qs makes each value a list, but in our case we know we won't
+ # have repeated keys so we know we have single element lists which we
+ # can convert back to scalar values.
+ query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
+ query_dict = {k: v[0] for k, v in query_string_parts.items()}
+
+ if request.params:
+ query_dict.update(request.params)
+ request.params = {}
+ # The spec is particular about this. It *has* to be:
+ # https://<endpoint>?<operation params>&<auth params>
+ # You can't mix the two types of params together, i.e just keep doing
+ # new_query_params.update(op_params)
+ # new_query_params.update(auth_params)
+ # percent_encode_sequence(new_query_params)
+ operation_params = ''
+ if request.data:
+ # We also need to move the body params into the query string. To
+ # do this, we first have to convert it to a dict.
+ query_dict.update(_get_body_as_dict(request))
+ request.data = ''
+ if query_dict:
+ operation_params = percent_encode_sequence(query_dict) + '&'
+ new_query_string = (
+ f"{operation_params}{percent_encode_sequence(auth_params)}"
+ )
+ # url_parts is a tuple (and therefore immutable) so we need to create
+ # a new url_parts with the new query string.
+ # <part> - <index>
+ # scheme - 0
+ # netloc - 1
+ # path - 2
+ # query - 3 <-- we're replacing this.
+ # fragment - 4
+ p = url_parts
+ new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
+ request.url = urlunsplit(new_url_parts)
+
+ def _inject_signature_to_request(self, request, signature):
+ # Rather than calculating an "Authorization" header, for the query
+ # param quth, we just append an 'X-Amz-Signature' param to the end
+ # of the query string.
+ request.url += f'&X-Amz-Signature={signature}'
+
+ def _normalize_url_path(self, path):
+ # For S3, we do not normalize the path.
+ return path
+
+ def payload(self, request):
+ # From the doc link above:
+ # "You don't include a payload hash in the Canonical Request, because
+ # when you create a presigned URL, you don't know anything about the
+ # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
+ return UNSIGNED_PAYLOAD
+
+
+class SigV4QueryAuth(SigV4Auth):
+ DEFAULT_EXPIRES = 3600
+
+ def __init__(
+ self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES
+ ):
+ super().__init__(credentials, service_name, region_name)
+ self._expires = expires
+
+ def _modify_request_before_signing(self, request):
+ # We automatically set this header, so if it's the auto-set value we
+ # want to get rid of it since it doesn't make sense for presigned urls.
+ content_type = request.headers.get('content-type')
+ blacklisted_content_type = (
+ 'application/x-www-form-urlencoded; charset=utf-8'
+ )
+ if content_type == blacklisted_content_type:
+ del request.headers['content-type']
+
+ # Note that we're not including X-Amz-Signature.
+ # From the docs: "The Canonical Query String must include all the query
+ # parameters from the preceding table except for X-Amz-Signature.
+ signed_headers = self.signed_headers(self.headers_to_sign(request))
+
+ auth_params = {
+ 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
+ 'X-Amz-Credential': self.scope(request),
+ 'X-Amz-Date': request.context['timestamp'],
+ 'X-Amz-Expires': self._expires,
+ 'X-Amz-SignedHeaders': signed_headers,
+ }
+ if self.credentials.token is not None:
+ auth_params['X-Amz-Security-Token'] = self.credentials.token
+ # Now parse the original query string to a dict, inject our new query
+ # params, and serialize back to a query string.
+ url_parts = urlsplit(request.url)
+ # parse_qs makes each value a list, but in our case we know we won't
+ # have repeated keys so we know we have single element lists which we
+ # can convert back to scalar values.
+ query_string_parts = parse_qs(url_parts.query, keep_blank_values=True)
+ query_dict = {k: v[0] for k, v in query_string_parts.items()}
+
+ if request.params:
+ query_dict.update(request.params)
+ request.params = {}
+ # The spec is particular about this. It *has* to be:
+ # https://<endpoint>?<operation params>&<auth params>
+ # You can't mix the two types of params together, i.e just keep doing
+ # new_query_params.update(op_params)
+ # new_query_params.update(auth_params)
+ # percent_encode_sequence(new_query_params)
+ operation_params = ''
+ if request.data:
+ # We also need to move the body params into the query string. To
+ # do this, we first have to convert it to a dict.
+ query_dict.update(_get_body_as_dict(request))
+ request.data = ''
+ if query_dict:
+ operation_params = percent_encode_sequence(query_dict) + '&'
+ new_query_string = (
+ f"{operation_params}{percent_encode_sequence(auth_params)}"
+ )
+ # url_parts is a tuple (and therefore immutable) so we need to create
+ # a new url_parts with the new query string.
+ # <part> - <index>
+ # scheme - 0
+ # netloc - 1
+ # path - 2
+ # query - 3 <-- we're replacing this.
+ # fragment - 4
+ p = url_parts
+ new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
+ request.url = urlunsplit(new_url_parts)
+
+ def _inject_signature_to_request(self, request, signature):
+ # Rather than calculating an "Authorization" header, for the query
+ # param quth, we just append an 'X-Amz-Signature' param to the end
+ # of the query string.
+ request.url += f'&X-Amz-Signature={signature}'
+
+
+class S3SigV4QueryAuth(SigV4QueryAuth):
+ """S3 SigV4 auth using query parameters.
+
+ This signer will sign a request using query parameters and signature
+ version 4, i.e a "presigned url" signer.
+
+ Based off of:
+
+ http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
+
+ """
+
+ def _normalize_url_path(self, path):
+ # For S3, we do not normalize the path.
+ return path
+
+ def payload(self, request):
+ # From the doc link above:
+ # "You don't include a payload hash in the Canonical Request, because
+ # when you create a presigned URL, you don't know anything about the
+ # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".
+ return UNSIGNED_PAYLOAD
+
+
+class S3SigV4PostAuth(SigV4Auth):
+ """
+ Presigns a s3 post
+
+ Implementation doc here:
+ http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html
+ """
+
+ def add_auth(self, request):
+ datetime_now = datetime.datetime.utcnow()
+ request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)
+
+ fields = {}
+ if request.context.get('s3-presign-post-fields', None) is not None:
+ fields = request.context['s3-presign-post-fields']
+
+ policy = {}
+ conditions = []
+ if request.context.get('s3-presign-post-policy', None) is not None:
+ policy = request.context['s3-presign-post-policy']
+ if policy.get('conditions', None) is not None:
+ conditions = policy['conditions']
+
+ policy['conditions'] = conditions
+
+ fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256'
+ fields['x-amz-credential'] = self.scope(request)
+ fields['x-amz-date'] = request.context['timestamp']
+
+ conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'})
+ conditions.append({'x-amz-credential': self.scope(request)})
+ conditions.append({'x-amz-date': request.context['timestamp']})
+
+ if self.credentials.token is not None:
+ fields['x-amz-security-token'] = self.credentials.token
+ conditions.append({'x-amz-security-token': self.credentials.token})
+
+ # Dump the base64 encoded policy into the fields dictionary.
+ fields['policy'] = base64.b64encode(
+ json.dumps(policy).encode('utf-8')
+ ).decode('utf-8')
+
+ fields['x-amz-signature'] = self.signature(fields['policy'], request)
+
+ request.context['s3-presign-post-fields'] = fields
+ request.context['s3-presign-post-policy'] = policy
+
+
+class HmacV1Auth(BaseSigner):
+ # List of Query String Arguments of Interest
+ QSAOfInterest = [
+ 'accelerate',
+ 'acl',
+ 'cors',
+ 'defaultObjectAcl',
+ 'location',
+ 'logging',
+ 'partNumber',
+ 'policy',
+ 'requestPayment',
+ 'torrent',
+ 'versioning',
+ 'versionId',
+ 'versions',
+ 'website',
+ 'uploads',
+ 'uploadId',
+ 'response-content-type',
+ 'response-content-language',
+ 'response-expires',
+ 'response-cache-control',
+ 'response-content-disposition',
+ 'response-content-encoding',
+ 'delete',
+ 'lifecycle',
+ 'tagging',
+ 'restore',
+ 'storageClass',
+ 'notification',
+ 'replication',
+ 'requestPayment',
+ 'analytics',
+ 'metrics',
+ 'inventory',
+ 'select',
+ 'select-type',
+ 'object-lock',
+ ]
+
+ def __init__(self, credentials, service_name=None, region_name=None):
+ self.credentials = credentials
+
+ def sign_string(self, string_to_sign):
+ new_hmac = hmac.new(
+ self.credentials.secret_key.encode('utf-8'), digestmod=sha1
+ )
+ new_hmac.update(string_to_sign.encode('utf-8'))
+ return encodebytes(new_hmac.digest()).strip().decode('utf-8')
+
+ def canonical_standard_headers(self, headers):
+ interesting_headers = ['content-md5', 'content-type', 'date']
+ hoi = []
+ if 'Date' in headers:
+ del headers['Date']
+ headers['Date'] = self._get_date()
+ for ih in interesting_headers:
+ found = False
+ for key in headers:
+ lk = key.lower()
+ if headers[key] is not None and lk == ih:
+ hoi.append(headers[key].strip())
+ found = True
+ if not found:
+ hoi.append('')
+ return '\n'.join(hoi)
+
+ def canonical_custom_headers(self, headers):
+ hoi = []
+ custom_headers = {}
+ for key in headers:
+ lk = key.lower()
+ if headers[key] is not None:
+ if lk.startswith('x-amz-'):
+ custom_headers[lk] = ','.join(
+ v.strip() for v in headers.get_all(key)
+ )
+ sorted_header_keys = sorted(custom_headers.keys())
+ for key in sorted_header_keys:
+ hoi.append(f"{key}:{custom_headers[key]}")
+ return '\n'.join(hoi)
+
+ def unquote_v(self, nv):
+ """
+ TODO: Do we need this?
+ """
+ if len(nv) == 1:
+ return nv
+ else:
+ return (nv[0], unquote(nv[1]))
+
+ def canonical_resource(self, split, auth_path=None):
+ # don't include anything after the first ? in the resource...
+ # unless it is one of the QSA of interest, defined above
+ # NOTE:
+ # The path in the canonical resource should always be the
+ # full path including the bucket name, even for virtual-hosting
+ # style addressing. The ``auth_path`` keeps track of the full
+ # path for the canonical resource and would be passed in if
+ # the client was using virtual-hosting style.
+ if auth_path is not None:
+ buf = auth_path
+ else:
+ buf = split.path
+ if split.query:
+ qsa = split.query.split('&')
+ qsa = [a.split('=', 1) for a in qsa]
+ qsa = [
+ self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest
+ ]
+ if len(qsa) > 0:
+ qsa.sort(key=itemgetter(0))
+ qsa = ['='.join(a) for a in qsa]
+ buf += '?'
+ buf += '&'.join(qsa)
+ return buf
+
+ def canonical_string(
+ self, method, split, headers, expires=None, auth_path=None
+ ):
+ cs = method.upper() + '\n'
+ cs += self.canonical_standard_headers(headers) + '\n'
+ custom_headers = self.canonical_custom_headers(headers)
+ if custom_headers:
+ cs += custom_headers + '\n'
+ cs += self.canonical_resource(split, auth_path=auth_path)
+ return cs
+
+ def get_signature(
+ self, method, split, headers, expires=None, auth_path=None
+ ):
+ if self.credentials.token:
+ del headers['x-amz-security-token']
+ headers['x-amz-security-token'] = self.credentials.token
+ string_to_sign = self.canonical_string(
+ method, split, headers, auth_path=auth_path
+ )
+ logger.debug(f'StringToSign:\n{string_to_sign}')
+ return self.sign_string(string_to_sign)
+
+ def add_auth(self, request):
+ if self.credentials is None:
+ raise NoCredentialsError
+ logger.debug("Calculating signature using hmacv1 auth.")
+ split = urlsplit(request.url)
+ logger.debug(f'HTTP request method: {request.method}')
+ signature = self.get_signature(
+ request.method, split, request.headers, auth_path=request.auth_path
+ )
+ self._inject_signature(request, signature)
+
+ def _get_date(self):
+ return formatdate(usegmt=True)
+
+ def _inject_signature(self, request, signature):
+ if 'Authorization' in request.headers:
+ # We have to do this because request.headers is not
+ # normal dictionary. It has the (unintuitive) behavior
+ # of aggregating repeated setattr calls for the same
+ # key value. For example:
+ # headers['foo'] = 'a'; headers['foo'] = 'b'
+ # list(headers) will print ['foo', 'foo'].
+ del request.headers['Authorization']
+
+ auth_header = f"AWS {self.credentials.access_key}:{signature}"
+ request.headers['Authorization'] = auth_header
+
+
+class HmacV1QueryAuth(HmacV1Auth):
+ """
+ Generates a presigned request for s3.
+
+ Spec from this document:
+
+ http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
+ #RESTAuthenticationQueryStringAuth
+
+ """
+
+ DEFAULT_EXPIRES = 3600
+
+ def __init__(self, credentials, expires=DEFAULT_EXPIRES):
+ self.credentials = credentials
+ self._expires = expires
+
+ def _get_date(self):
+ return str(int(time.time() + int(self._expires)))
+
+ def _inject_signature(self, request, signature):
+ query_dict = {}
+ query_dict['AWSAccessKeyId'] = self.credentials.access_key
+ query_dict['Signature'] = signature
+
+ for header_key in request.headers:
+ lk = header_key.lower()
+ # For query string requests, Expires is used instead of the
+ # Date header.
+ if header_key == 'Date':
+ query_dict['Expires'] = request.headers['Date']
+ # We only want to include relevant headers in the query string.
+ # These can be anything that starts with x-amz, is Content-MD5,
+ # or is Content-Type.
+ elif lk.startswith('x-amz-') or lk in (
+ 'content-md5',
+ 'content-type',
+ ):
+ query_dict[lk] = request.headers[lk]
+ # Combine all of the identified headers into an encoded
+ # query string
+ new_query_string = percent_encode_sequence(query_dict)
+
+ # Create a new url with the presigned url.
+ p = urlsplit(request.url)
+ if p[3]:
+ # If there was a pre-existing query string, we should
+ # add that back before injecting the new query string.
+ new_query_string = f'{p[3]}&{new_query_string}'
+ new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])
+ request.url = urlunsplit(new_url_parts)
+
+
+class HmacV1PostAuth(HmacV1Auth):
+ """
+ Generates a presigned post for s3.
+
+ Spec from this document:
+
+ http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html
+ """
+
+ def add_auth(self, request):
+ fields = {}
+ if request.context.get('s3-presign-post-fields', None) is not None:
+ fields = request.context['s3-presign-post-fields']
+
+ policy = {}
+ conditions = []
+ if request.context.get('s3-presign-post-policy', None) is not None:
+ policy = request.context['s3-presign-post-policy']
+ if policy.get('conditions', None) is not None:
+ conditions = policy['conditions']
+
+ policy['conditions'] = conditions
+
+ fields['AWSAccessKeyId'] = self.credentials.access_key
+
+ if self.credentials.token is not None:
+ fields['x-amz-security-token'] = self.credentials.token
+ conditions.append({'x-amz-security-token': self.credentials.token})
+
+ # Dump the base64 encoded policy into the fields dictionary.
+ fields['policy'] = base64.b64encode(
+ json.dumps(policy).encode('utf-8')
+ ).decode('utf-8')
+
+ fields['signature'] = self.sign_string(fields['policy'])
+
+ request.context['s3-presign-post-fields'] = fields
+ request.context['s3-presign-post-policy'] = policy
+
+
+class BearerAuth(TokenSigner):
+ """
+ Performs bearer token authorization by placing the bearer token in the
+ Authorization header as specified by Section 2.1 of RFC 6750.
+
+ https://datatracker.ietf.org/doc/html/rfc6750#section-2.1
+ """
+
+ def add_auth(self, request):
+ if self.auth_token is None:
+ raise NoAuthTokenError()
+
+ auth_header = f'Bearer {self.auth_token.token}'
+ if 'Authorization' in request.headers:
+ del request.headers['Authorization']
+ request.headers['Authorization'] = auth_header
+
+
+def resolve_auth_type(auth_trait):
+ for auth_type in auth_trait:
+ if auth_type == 'smithy.api#noAuth':
+ return AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
+ elif auth_type in AUTH_TYPE_TO_SIGNATURE_VERSION:
+ signature_version = AUTH_TYPE_TO_SIGNATURE_VERSION[auth_type]
+ if signature_version in AUTH_TYPE_MAPS:
+ return signature_version
+ else:
+ raise UnknownSignatureVersionError(signature_version=auth_type)
+ raise UnsupportedSignatureVersionError(signature_version=auth_trait)
+
+
+AUTH_TYPE_MAPS = {
+ 'v2': SigV2Auth,
+ 'v3': SigV3Auth,
+ 'v3https': SigV3Auth,
+ 's3': HmacV1Auth,
+ 's3-query': HmacV1QueryAuth,
+ 's3-presign-post': HmacV1PostAuth,
+ 's3v4-presign-post': S3SigV4PostAuth,
+ 'v4-s3express': S3ExpressAuth,
+ 'v4-s3express-query': S3ExpressQueryAuth,
+ 'v4-s3express-presign-post': S3ExpressPostAuth,
+ 'bearer': BearerAuth,
+}
+
+# Define v4 signers depending on if CRT is present
+if HAS_CRT:
+ from botocore.crt.auth import CRT_AUTH_TYPE_MAPS
+
+ AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS)
+else:
+ AUTH_TYPE_MAPS.update(
+ {
+ 'v4': SigV4Auth,
+ 'v4-query': SigV4QueryAuth,
+ 's3v4': S3SigV4Auth,
+ 's3v4-query': S3SigV4QueryAuth,
+ }
+ )
+
+AUTH_TYPE_TO_SIGNATURE_VERSION = {
+ 'aws.auth#sigv4': 'v4',
+ 'aws.auth#sigv4a': 'v4a',
+ 'smithy.api#httpBearerAuth': 'bearer',
+ 'smithy.api#noAuth': 'none',
+}