aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/regions.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/regions.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/regions.py856
1 files changed, 856 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/regions.py b/.venv/lib/python3.12/site-packages/botocore/regions.py
new file mode 100644
index 00000000..4d0fc8ba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/regions.py
@@ -0,0 +1,856 @@
+# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""Resolves regions and endpoints.
+
+This module implements endpoint resolution, including resolving endpoints for a
+given service and region and resolving the available endpoints for a service
+in a specific AWS partition.
+"""
+
+import copy
+import logging
+import re
+from enum import Enum
+
+import jmespath
+
+from botocore import UNSIGNED, xform_name
+from botocore.auth import AUTH_TYPE_MAPS, HAS_CRT
+from botocore.crt import CRT_SUPPORTED_AUTH_TYPES
+from botocore.endpoint_provider import EndpointProvider
+from botocore.exceptions import (
+ EndpointProviderError,
+ EndpointVariantError,
+ InvalidEndpointConfigurationError,
+ InvalidHostLabelError,
+ MissingDependencyException,
+ NoRegionError,
+ ParamValidationError,
+ UnknownEndpointResolutionBuiltInName,
+ UnknownRegionError,
+ UnknownSignatureVersionError,
+ UnsupportedS3AccesspointConfigurationError,
+ UnsupportedS3ConfigurationError,
+ UnsupportedS3ControlArnError,
+ UnsupportedS3ControlConfigurationError,
+)
+from botocore.utils import ensure_boolean, instance_cache
+
+LOG = logging.getLogger(__name__)
+DEFAULT_URI_TEMPLATE = '{service}.{region}.{dnsSuffix}' # noqa
+DEFAULT_SERVICE_DATA = {'endpoints': {}}
+
+
+class BaseEndpointResolver:
+ """Resolves regions and endpoints. Must be subclassed."""
+
+ def construct_endpoint(self, service_name, region_name=None):
+ """Resolves an endpoint for a service and region combination.
+
+ :type service_name: string
+ :param service_name: Name of the service to resolve an endpoint for
+ (e.g., s3)
+
+ :type region_name: string
+ :param region_name: Region/endpoint name to resolve (e.g., us-east-1)
+ if no region is provided, the first found partition-wide endpoint
+ will be used if available.
+
+ :rtype: dict
+ :return: Returns a dict containing the following keys:
+ - partition: (string, required) Resolved partition name
+ - endpointName: (string, required) Resolved endpoint name
+ - hostname: (string, required) Hostname to use for this endpoint
+ - sslCommonName: (string) sslCommonName to use for this endpoint.
+ - credentialScope: (dict) Signature version 4 credential scope
+ - region: (string) region name override when signing.
+ - service: (string) service name override when signing.
+ - signatureVersions: (list<string>) A list of possible signature
+ versions, including s3, v4, v2, and s3v4
+ - protocols: (list<string>) A list of supported protocols
+ (e.g., http, https)
+ - ...: Other keys may be included as well based on the metadata
+ """
+ raise NotImplementedError
+
+ def get_available_partitions(self):
+ """Lists the partitions available to the endpoint resolver.
+
+ :return: Returns a list of partition names (e.g., ["aws", "aws-cn"]).
+ """
+ raise NotImplementedError
+
+ def get_available_endpoints(
+ self, service_name, partition_name='aws', allow_non_regional=False
+ ):
+ """Lists the endpoint names of a particular partition.
+
+ :type service_name: string
+ :param service_name: Name of a service to list endpoint for (e.g., s3)
+
+ :type partition_name: string
+ :param partition_name: Name of the partition to limit endpoints to.
+ (e.g., aws for the public AWS endpoints, aws-cn for AWS China
+ endpoints, aws-us-gov for AWS GovCloud (US) Endpoints, etc.
+
+ :type allow_non_regional: bool
+ :param allow_non_regional: Set to True to include endpoints that are
+ not regional endpoints (e.g., s3-external-1,
+ fips-us-gov-west-1, etc).
+ :return: Returns a list of endpoint names (e.g., ["us-east-1"]).
+ """
+ raise NotImplementedError
+
+
+class EndpointResolver(BaseEndpointResolver):
+ """Resolves endpoints based on partition endpoint metadata"""
+
+ _UNSUPPORTED_DUALSTACK_PARTITIONS = ['aws-iso', 'aws-iso-b']
+
+ def __init__(self, endpoint_data, uses_builtin_data=False):
+ """
+ :type endpoint_data: dict
+ :param endpoint_data: A dict of partition data.
+
+ :type uses_builtin_data: boolean
+ :param uses_builtin_data: Whether the endpoint data originates in the
+ package's data directory.
+ """
+ if 'partitions' not in endpoint_data:
+ raise ValueError('Missing "partitions" in endpoint data')
+ self._endpoint_data = endpoint_data
+ self.uses_builtin_data = uses_builtin_data
+
+ def get_service_endpoints_data(self, service_name, partition_name='aws'):
+ for partition in self._endpoint_data['partitions']:
+ if partition['partition'] != partition_name:
+ continue
+ services = partition['services']
+ if service_name not in services:
+ continue
+ return services[service_name]['endpoints']
+
+ def get_available_partitions(self):
+ result = []
+ for partition in self._endpoint_data['partitions']:
+ result.append(partition['partition'])
+ return result
+
+ def get_available_endpoints(
+ self,
+ service_name,
+ partition_name='aws',
+ allow_non_regional=False,
+ endpoint_variant_tags=None,
+ ):
+ result = []
+ for partition in self._endpoint_data['partitions']:
+ if partition['partition'] != partition_name:
+ continue
+ services = partition['services']
+ if service_name not in services:
+ continue
+ service_endpoints = services[service_name]['endpoints']
+ for endpoint_name in service_endpoints:
+ is_regional_endpoint = endpoint_name in partition['regions']
+ # Only regional endpoints can be modeled with variants
+ if endpoint_variant_tags and is_regional_endpoint:
+ variant_data = self._retrieve_variant_data(
+ service_endpoints[endpoint_name], endpoint_variant_tags
+ )
+ if variant_data:
+ result.append(endpoint_name)
+ elif allow_non_regional or is_regional_endpoint:
+ result.append(endpoint_name)
+ return result
+
+ def get_partition_dns_suffix(
+ self, partition_name, endpoint_variant_tags=None
+ ):
+ for partition in self._endpoint_data['partitions']:
+ if partition['partition'] == partition_name:
+ if endpoint_variant_tags:
+ variant = self._retrieve_variant_data(
+ partition.get('defaults'), endpoint_variant_tags
+ )
+ if variant and 'dnsSuffix' in variant:
+ return variant['dnsSuffix']
+ else:
+ return partition['dnsSuffix']
+ return None
+
+ def construct_endpoint(
+ self,
+ service_name,
+ region_name=None,
+ partition_name=None,
+ use_dualstack_endpoint=False,
+ use_fips_endpoint=False,
+ ):
+ if (
+ service_name == 's3'
+ and use_dualstack_endpoint
+ and region_name is None
+ ):
+ region_name = 'us-east-1'
+
+ if partition_name is not None:
+ valid_partition = None
+ for partition in self._endpoint_data['partitions']:
+ if partition['partition'] == partition_name:
+ valid_partition = partition
+
+ if valid_partition is not None:
+ result = self._endpoint_for_partition(
+ valid_partition,
+ service_name,
+ region_name,
+ use_dualstack_endpoint,
+ use_fips_endpoint,
+ True,
+ )
+ return result
+ return None
+
+ # Iterate over each partition until a match is found.
+ for partition in self._endpoint_data['partitions']:
+ if use_dualstack_endpoint and (
+ partition['partition']
+ in self._UNSUPPORTED_DUALSTACK_PARTITIONS
+ ):
+ continue
+ result = self._endpoint_for_partition(
+ partition,
+ service_name,
+ region_name,
+ use_dualstack_endpoint,
+ use_fips_endpoint,
+ )
+ if result:
+ return result
+
+ def get_partition_for_region(self, region_name):
+ for partition in self._endpoint_data['partitions']:
+ if self._region_match(partition, region_name):
+ return partition['partition']
+ raise UnknownRegionError(
+ region_name=region_name,
+ error_msg='No partition found for provided region_name.',
+ )
+
+ def _endpoint_for_partition(
+ self,
+ partition,
+ service_name,
+ region_name,
+ use_dualstack_endpoint,
+ use_fips_endpoint,
+ force_partition=False,
+ ):
+ partition_name = partition["partition"]
+ if (
+ use_dualstack_endpoint
+ and partition_name in self._UNSUPPORTED_DUALSTACK_PARTITIONS
+ ):
+ error_msg = (
+ "Dualstack endpoints are currently not supported"
+ f" for {partition_name} partition"
+ )
+ raise EndpointVariantError(tags=['dualstack'], error_msg=error_msg)
+
+ # Get the service from the partition, or an empty template.
+ service_data = partition['services'].get(
+ service_name, DEFAULT_SERVICE_DATA
+ )
+ # Use the partition endpoint if no region is supplied.
+ if region_name is None:
+ if 'partitionEndpoint' in service_data:
+ region_name = service_data['partitionEndpoint']
+ else:
+ raise NoRegionError()
+
+ resolve_kwargs = {
+ 'partition': partition,
+ 'service_name': service_name,
+ 'service_data': service_data,
+ 'endpoint_name': region_name,
+ 'use_dualstack_endpoint': use_dualstack_endpoint,
+ 'use_fips_endpoint': use_fips_endpoint,
+ }
+
+ # Attempt to resolve the exact region for this partition.
+ if region_name in service_data['endpoints']:
+ return self._resolve(**resolve_kwargs)
+
+ # Check to see if the endpoint provided is valid for the partition.
+ if self._region_match(partition, region_name) or force_partition:
+ # Use the partition endpoint if set and not regionalized.
+ partition_endpoint = service_data.get('partitionEndpoint')
+ is_regionalized = service_data.get('isRegionalized', True)
+ if partition_endpoint and not is_regionalized:
+ LOG.debug(
+ 'Using partition endpoint for %s, %s: %s',
+ service_name,
+ region_name,
+ partition_endpoint,
+ )
+ resolve_kwargs['endpoint_name'] = partition_endpoint
+ return self._resolve(**resolve_kwargs)
+ LOG.debug(
+ 'Creating a regex based endpoint for %s, %s',
+ service_name,
+ region_name,
+ )
+ return self._resolve(**resolve_kwargs)
+
+ def _region_match(self, partition, region_name):
+ if region_name in partition['regions']:
+ return True
+ if 'regionRegex' in partition:
+ return re.compile(partition['regionRegex']).match(region_name)
+ return False
+
+ def _retrieve_variant_data(self, endpoint_data, tags):
+ variants = endpoint_data.get('variants', [])
+ for variant in variants:
+ if set(variant['tags']) == set(tags):
+ result = variant.copy()
+ return result
+
+ def _create_tag_list(self, use_dualstack_endpoint, use_fips_endpoint):
+ tags = []
+ if use_dualstack_endpoint:
+ tags.append('dualstack')
+ if use_fips_endpoint:
+ tags.append('fips')
+ return tags
+
+ def _resolve_variant(
+ self, tags, endpoint_data, service_defaults, partition_defaults
+ ):
+ result = {}
+ for variants in [endpoint_data, service_defaults, partition_defaults]:
+ variant = self._retrieve_variant_data(variants, tags)
+ if variant:
+ self._merge_keys(variant, result)
+ return result
+
+ def _resolve(
+ self,
+ partition,
+ service_name,
+ service_data,
+ endpoint_name,
+ use_dualstack_endpoint,
+ use_fips_endpoint,
+ ):
+ endpoint_data = service_data.get('endpoints', {}).get(
+ endpoint_name, {}
+ )
+
+ if endpoint_data.get('deprecated'):
+ LOG.warning(
+ f'Client is configured with the deprecated endpoint: {endpoint_name}'
+ )
+
+ service_defaults = service_data.get('defaults', {})
+ partition_defaults = partition.get('defaults', {})
+ tags = self._create_tag_list(use_dualstack_endpoint, use_fips_endpoint)
+
+ if tags:
+ result = self._resolve_variant(
+ tags, endpoint_data, service_defaults, partition_defaults
+ )
+ if result == {}:
+ error_msg = (
+ f"Endpoint does not exist for {service_name} "
+ f"in region {endpoint_name}"
+ )
+ raise EndpointVariantError(tags=tags, error_msg=error_msg)
+ self._merge_keys(endpoint_data, result)
+ else:
+ result = endpoint_data
+
+ # If dnsSuffix has not already been consumed from a variant definition
+ if 'dnsSuffix' not in result:
+ result['dnsSuffix'] = partition['dnsSuffix']
+
+ result['partition'] = partition['partition']
+ result['endpointName'] = endpoint_name
+
+ # Merge in the service defaults then the partition defaults.
+ self._merge_keys(service_defaults, result)
+ self._merge_keys(partition_defaults, result)
+
+ result['hostname'] = self._expand_template(
+ partition,
+ result['hostname'],
+ service_name,
+ endpoint_name,
+ result['dnsSuffix'],
+ )
+ if 'sslCommonName' in result:
+ result['sslCommonName'] = self._expand_template(
+ partition,
+ result['sslCommonName'],
+ service_name,
+ endpoint_name,
+ result['dnsSuffix'],
+ )
+
+ return result
+
+ def _merge_keys(self, from_data, result):
+ for key in from_data:
+ if key not in result:
+ result[key] = from_data[key]
+
+ def _expand_template(
+ self, partition, template, service_name, endpoint_name, dnsSuffix
+ ):
+ return template.format(
+ service=service_name, region=endpoint_name, dnsSuffix=dnsSuffix
+ )
+
+
+class EndpointResolverBuiltins(str, Enum):
+ # The AWS Region configured for the SDK client (str)
+ AWS_REGION = "AWS::Region"
+ # Whether the UseFIPSEndpoint configuration option has been enabled for
+ # the SDK client (bool)
+ AWS_USE_FIPS = "AWS::UseFIPS"
+ # Whether the UseDualStackEndpoint configuration option has been enabled
+ # for the SDK client (bool)
+ AWS_USE_DUALSTACK = "AWS::UseDualStack"
+ # Whether the global endpoint should be used with STS, rather than the
+ # regional endpoint for us-east-1 (bool)
+ AWS_STS_USE_GLOBAL_ENDPOINT = "AWS::STS::UseGlobalEndpoint"
+ # Whether the global endpoint should be used with S3, rather than the
+ # regional endpoint for us-east-1 (bool)
+ AWS_S3_USE_GLOBAL_ENDPOINT = "AWS::S3::UseGlobalEndpoint"
+ # Whether S3 Transfer Acceleration has been requested (bool)
+ AWS_S3_ACCELERATE = "AWS::S3::Accelerate"
+ # Whether S3 Force Path Style has been enabled (bool)
+ AWS_S3_FORCE_PATH_STYLE = "AWS::S3::ForcePathStyle"
+ # Whether to use the ARN region or raise an error when ARN and client
+ # region differ (for s3 service only, bool)
+ AWS_S3_USE_ARN_REGION = "AWS::S3::UseArnRegion"
+ # Whether to use the ARN region or raise an error when ARN and client
+ # region differ (for s3-control service only, bool)
+ AWS_S3CONTROL_USE_ARN_REGION = 'AWS::S3Control::UseArnRegion'
+ # Whether multi-region access points (MRAP) should be disabled (bool)
+ AWS_S3_DISABLE_MRAP = "AWS::S3::DisableMultiRegionAccessPoints"
+ # Whether a custom endpoint has been configured (str)
+ SDK_ENDPOINT = "SDK::Endpoint"
+ # An AWS account ID that can be optionally configured for the SDK client (str)
+ ACCOUNT_ID = "AWS::Auth::AccountId"
+ # Whether an endpoint should include an account ID (str)
+ ACCOUNT_ID_ENDPOINT_MODE = "AWS::Auth::AccountIdEndpointMode"
+
+
+class EndpointRulesetResolver:
+ """Resolves endpoints using a service's endpoint ruleset"""
+
+ def __init__(
+ self,
+ endpoint_ruleset_data,
+ partition_data,
+ service_model,
+ builtins,
+ client_context,
+ event_emitter,
+ use_ssl=True,
+ requested_auth_scheme=None,
+ ):
+ self._provider = EndpointProvider(
+ ruleset_data=endpoint_ruleset_data,
+ partition_data=partition_data,
+ )
+ self._param_definitions = self._provider.ruleset.parameters
+ self._service_model = service_model
+ self._builtins = builtins
+ self._client_context = client_context
+ self._event_emitter = event_emitter
+ self._use_ssl = use_ssl
+ self._requested_auth_scheme = requested_auth_scheme
+ self._instance_cache = {}
+
+ def construct_endpoint(
+ self,
+ operation_model,
+ call_args,
+ request_context,
+ ):
+ """Invokes the provider with params defined in the service's ruleset"""
+ if call_args is None:
+ call_args = {}
+
+ if request_context is None:
+ request_context = {}
+
+ provider_params = self._get_provider_params(
+ operation_model, call_args, request_context
+ )
+ LOG.debug(
+ f'Calling endpoint provider with parameters: {provider_params}'
+ )
+ try:
+ provider_result = self._provider.resolve_endpoint(
+ **provider_params
+ )
+ except EndpointProviderError as ex:
+ botocore_exception = self.ruleset_error_to_botocore_exception(
+ ex, provider_params
+ )
+ if botocore_exception is None:
+ raise
+ else:
+ raise botocore_exception from ex
+ LOG.debug(f'Endpoint provider result: {provider_result.url}')
+
+ # The endpoint provider does not support non-secure transport.
+ if not self._use_ssl and provider_result.url.startswith('https://'):
+ provider_result = provider_result._replace(
+ url=f'http://{provider_result.url[8:]}'
+ )
+
+ # Multi-valued headers are not supported in botocore. Replace the list
+ # of values returned for each header with just its first entry,
+ # dropping any additionally entries.
+ provider_result = provider_result._replace(
+ headers={
+ key: val[0] for key, val in provider_result.headers.items()
+ }
+ )
+
+ return provider_result
+
+ def _get_provider_params(
+ self, operation_model, call_args, request_context
+ ):
+ """Resolve a value for each parameter defined in the service's ruleset
+
+ The resolution order for parameter values is:
+ 1. Operation-specific static context values from the service definition
+ 2. Operation-specific dynamic context values from API parameters
+ 3. Client-specific context parameters
+ 4. Built-in values such as region, FIPS usage, ...
+ """
+ provider_params = {}
+ # Builtin values can be customized for each operation by hooks
+ # subscribing to the ``before-endpoint-resolution.*`` event.
+ customized_builtins = self._get_customized_builtins(
+ operation_model, call_args, request_context
+ )
+ for param_name, param_def in self._param_definitions.items():
+ param_val = self._resolve_param_from_context(
+ param_name=param_name,
+ operation_model=operation_model,
+ call_args=call_args,
+ )
+ if param_val is None and param_def.builtin is not None:
+ param_val = self._resolve_param_as_builtin(
+ builtin_name=param_def.builtin,
+ builtins=customized_builtins,
+ )
+ if param_val is not None:
+ provider_params[param_name] = param_val
+
+ return provider_params
+
+ def _resolve_param_from_context(
+ self, param_name, operation_model, call_args
+ ):
+ static = self._resolve_param_as_static_context_param(
+ param_name, operation_model
+ )
+ if static is not None:
+ return static
+ dynamic = self._resolve_param_as_dynamic_context_param(
+ param_name, operation_model, call_args
+ )
+ if dynamic is not None:
+ return dynamic
+ operation_context_params = (
+ self._resolve_param_as_operation_context_param(
+ param_name, operation_model, call_args
+ )
+ )
+ if operation_context_params is not None:
+ return operation_context_params
+ return self._resolve_param_as_client_context_param(param_name)
+
+ def _resolve_param_as_static_context_param(
+ self, param_name, operation_model
+ ):
+ static_ctx_params = self._get_static_context_params(operation_model)
+ return static_ctx_params.get(param_name)
+
+ def _resolve_param_as_dynamic_context_param(
+ self, param_name, operation_model, call_args
+ ):
+ dynamic_ctx_params = self._get_dynamic_context_params(operation_model)
+ if param_name in dynamic_ctx_params:
+ member_name = dynamic_ctx_params[param_name]
+ return call_args.get(member_name)
+
+ def _resolve_param_as_client_context_param(self, param_name):
+ client_ctx_params = self._get_client_context_params()
+ if param_name in client_ctx_params:
+ client_ctx_varname = client_ctx_params[param_name]
+ return self._client_context.get(client_ctx_varname)
+
+ def _resolve_param_as_operation_context_param(
+ self, param_name, operation_model, call_args
+ ):
+ operation_ctx_params = operation_model.operation_context_parameters
+ if param_name in operation_ctx_params:
+ path = operation_ctx_params[param_name]['path']
+ return jmespath.search(path, call_args)
+
+ def _resolve_param_as_builtin(self, builtin_name, builtins):
+ if builtin_name not in EndpointResolverBuiltins.__members__.values():
+ raise UnknownEndpointResolutionBuiltInName(name=builtin_name)
+ builtin = builtins.get(builtin_name)
+ if callable(builtin):
+ return builtin()
+ return builtin
+
+ @instance_cache
+ def _get_static_context_params(self, operation_model):
+ """Mapping of param names to static param value for an operation"""
+ return {
+ param.name: param.value
+ for param in operation_model.static_context_parameters
+ }
+
+ @instance_cache
+ def _get_dynamic_context_params(self, operation_model):
+ """Mapping of param names to member names for an operation"""
+ return {
+ param.name: param.member_name
+ for param in operation_model.context_parameters
+ }
+
+ @instance_cache
+ def _get_client_context_params(self):
+ """Mapping of param names to client configuration variable"""
+ return {
+ param.name: xform_name(param.name)
+ for param in self._service_model.client_context_parameters
+ }
+
+ def _get_customized_builtins(
+ self, operation_model, call_args, request_context
+ ):
+ service_id = self._service_model.service_id.hyphenize()
+ customized_builtins = copy.copy(self._builtins)
+ # Handlers are expected to modify the builtins dict in place.
+ self._event_emitter.emit(
+ f'before-endpoint-resolution.{service_id}',
+ builtins=customized_builtins,
+ model=operation_model,
+ params=call_args,
+ context=request_context,
+ )
+ return customized_builtins
+
+ def auth_schemes_to_signing_ctx(self, auth_schemes):
+ """Convert an Endpoint's authSchemes property to a signing_context dict
+
+ :type auth_schemes: list
+ :param auth_schemes: A list of dictionaries taken from the
+ ``authSchemes`` property of an Endpoint object returned by
+ ``EndpointProvider``.
+
+ :rtype: str, dict
+ :return: Tuple of auth type string (to be used in
+ ``request_context['auth_type']``) and signing context dict (for use
+ in ``request_context['signing']``).
+ """
+ if not isinstance(auth_schemes, list) or len(auth_schemes) == 0:
+ raise TypeError("auth_schemes must be a non-empty list.")
+
+ LOG.debug(
+ 'Selecting from endpoint provider\'s list of auth schemes: %s. '
+ 'User selected auth scheme is: "%s"',
+ ', '.join([f'"{s.get("name")}"' for s in auth_schemes]),
+ self._requested_auth_scheme,
+ )
+
+ if self._requested_auth_scheme == UNSIGNED:
+ return 'none', {}
+
+ auth_schemes = [
+ {**scheme, 'name': self._strip_sig_prefix(scheme['name'])}
+ for scheme in auth_schemes
+ ]
+ if self._requested_auth_scheme is not None:
+ try:
+ # Use the first scheme that matches the requested scheme,
+ # after accounting for naming differences between botocore and
+ # endpoint rulesets. Keep the requested name.
+ name, scheme = next(
+ (self._requested_auth_scheme, s)
+ for s in auth_schemes
+ if self._does_botocore_authname_match_ruleset_authname(
+ self._requested_auth_scheme, s['name']
+ )
+ )
+ except StopIteration:
+ # For legacy signers, no match will be found. Do not raise an
+ # exception, instead default to the logic in botocore
+ # customizations.
+ return None, {}
+ else:
+ try:
+ name, scheme = next(
+ (s['name'], s)
+ for s in auth_schemes
+ if s['name'] in AUTH_TYPE_MAPS
+ )
+ except StopIteration:
+ # If no auth scheme was specifically requested and an
+ # authSchemes list is present in the Endpoint object but none
+ # of the entries are supported, raise an exception.
+ fixable_with_crt = False
+ auth_type_options = [s['name'] for s in auth_schemes]
+ if not HAS_CRT:
+ fixable_with_crt = any(
+ scheme in CRT_SUPPORTED_AUTH_TYPES
+ for scheme in auth_type_options
+ )
+
+ if fixable_with_crt:
+ raise MissingDependencyException(
+ msg='This operation requires an additional dependency.'
+ ' Use pip install botocore[crt] before proceeding.'
+ )
+ else:
+ raise UnknownSignatureVersionError(
+ signature_version=', '.join(auth_type_options)
+ )
+
+ signing_context = {}
+ if 'signingRegion' in scheme:
+ signing_context['region'] = scheme['signingRegion']
+ elif 'signingRegionSet' in scheme:
+ if len(scheme['signingRegionSet']) > 0:
+ signing_context['region'] = ','.join(
+ scheme['signingRegionSet']
+ )
+ if 'signingName' in scheme:
+ signing_context.update(signing_name=scheme['signingName'])
+ if 'disableDoubleEncoding' in scheme:
+ signing_context['disableDoubleEncoding'] = ensure_boolean(
+ scheme['disableDoubleEncoding']
+ )
+
+ LOG.debug(
+ 'Selected auth type "%s" as "%s" with signing context params: %s',
+ scheme['name'], # original name without "sig"
+ name, # chosen name can differ when `signature_version` is set
+ signing_context,
+ )
+ return name, signing_context
+
+ def _strip_sig_prefix(self, auth_name):
+ """Normalize auth type names by removing any "sig" prefix"""
+ return auth_name[3:] if auth_name.startswith('sig') else auth_name
+
+ def _does_botocore_authname_match_ruleset_authname(self, botoname, rsname):
+ """
+ Whether a valid string provided as signature_version parameter for
+ client construction refers to the same auth methods as a string
+ returned by the endpoint ruleset provider. This accounts for:
+
+ * The ruleset prefixes auth names with "sig"
+ * The s3 and s3control rulesets don't distinguish between v4[a] and
+ s3v4[a] signers
+ * The v2, v3, and HMAC v1 based signers (s3, s3-*) are botocore legacy
+ features and do not exist in the rulesets
+ * Only characters up to the first dash are considered
+
+ Example matches:
+ * v4, sigv4
+ * v4, v4
+ * s3v4, sigv4
+ * s3v7, sigv7 (hypothetical example)
+ * s3v4a, sigv4a
+ * s3v4-query, sigv4
+
+ Example mismatches:
+ * v4a, sigv4
+ * s3, sigv4
+ * s3-presign-post, sigv4
+ """
+ rsname = self._strip_sig_prefix(rsname)
+ botoname = botoname.split('-')[0]
+ if botoname != 's3' and botoname.startswith('s3'):
+ botoname = botoname[2:]
+ return rsname == botoname
+
+ def ruleset_error_to_botocore_exception(self, ruleset_exception, params):
+ """Attempts to translate ruleset errors to pre-existing botocore
+ exception types by string matching exception strings.
+ """
+ msg = ruleset_exception.kwargs.get('msg')
+ if msg is None:
+ return
+
+ if msg.startswith('Invalid region in ARN: '):
+ # Example message:
+ # "Invalid region in ARN: `us-we$t-2` (invalid DNS name)"
+ try:
+ label = msg.split('`')[1]
+ except IndexError:
+ label = msg
+ return InvalidHostLabelError(label=label)
+
+ service_name = self._service_model.service_name
+ if service_name == 's3':
+ if (
+ msg == 'S3 Object Lambda does not support S3 Accelerate'
+ or msg == 'Accelerate cannot be used with FIPS'
+ ):
+ return UnsupportedS3ConfigurationError(msg=msg)
+ if (
+ msg.startswith('S3 Outposts does not support')
+ or msg.startswith('S3 MRAP does not support')
+ or msg.startswith('S3 Object Lambda does not support')
+ or msg.startswith('Access Points do not support')
+ or msg.startswith('Invalid configuration:')
+ or msg.startswith('Client was configured for partition')
+ ):
+ return UnsupportedS3AccesspointConfigurationError(msg=msg)
+ if msg.lower().startswith('invalid arn:'):
+ return ParamValidationError(report=msg)
+ if service_name == 's3control':
+ if msg.startswith('Invalid ARN:'):
+ arn = params.get('Bucket')
+ return UnsupportedS3ControlArnError(arn=arn, msg=msg)
+ if msg.startswith('Invalid configuration:') or msg.startswith(
+ 'Client was configured for partition'
+ ):
+ return UnsupportedS3ControlConfigurationError(msg=msg)
+ if msg == "AccountId is required but not set":
+ return ParamValidationError(report=msg)
+ if service_name == 'events':
+ if msg.startswith(
+ 'Invalid Configuration: FIPS is not supported with '
+ 'EventBridge multi-region endpoints.'
+ ):
+ return InvalidEndpointConfigurationError(msg=msg)
+ if msg == 'EndpointId must be a valid host label.':
+ return InvalidEndpointConfigurationError(msg=msg)
+ return None