diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/client.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/client.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/botocore/client.py | 1393 |
1 files changed, 1393 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/client.py b/.venv/lib/python3.12/site-packages/botocore/client.py new file mode 100644 index 00000000..05d9b25c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/client.py @@ -0,0 +1,1393 @@ +# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import logging + +from botocore import waiter, xform_name +from botocore.args import ClientArgsCreator +from botocore.auth import AUTH_TYPE_MAPS, resolve_auth_type +from botocore.awsrequest import prepare_request_dict +from botocore.compress import maybe_compress_request +from botocore.config import Config +from botocore.context import with_current_context +from botocore.credentials import RefreshableCredentials +from botocore.discovery import ( + EndpointDiscoveryHandler, + EndpointDiscoveryManager, + block_endpoint_discovery_required_operations, +) +from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring +from botocore.exceptions import ( + DataNotFoundError, + InvalidEndpointDiscoveryConfigurationError, + OperationNotPageableError, + UnknownServiceError, + UnknownSignatureVersionError, +) +from botocore.history import get_global_history_recorder +from botocore.hooks import first_non_none_response +from botocore.httpchecksum import ( + apply_request_checksum, + resolve_checksum_context, +) +from botocore.model import ServiceModel +from botocore.paginate import Paginator +from botocore.retries import adaptive, standard +from botocore.useragent import UserAgentString +from botocore.utils import ( + CachedProperty, + EventbridgeSignerSetter, + S3ControlArnParamHandlerv2, + S3ExpressIdentityResolver, + S3RegionRedirectorv2, + ensure_boolean, + get_service_module_name, +) + +# Keep these imported. There's pre-existing code that uses: +# "from botocore.client import UNSIGNED" +# "from botocore.client import ClientError" +# etc. +from botocore.exceptions import ClientError # noqa +from botocore.utils import S3ArnParamHandler # noqa +from botocore.utils import S3ControlArnParamHandler # noqa +from botocore.utils import S3ControlEndpointSetter # noqa +from botocore.utils import S3EndpointSetter # noqa +from botocore.utils import S3RegionRedirector # noqa +from botocore import UNSIGNED # noqa + + +_LEGACY_SIGNATURE_VERSIONS = frozenset( + ( + 'v2', + 'v3', + 'v3https', + 'v4', + 's3', + 's3v4', + ) +) + + +logger = logging.getLogger(__name__) +history_recorder = get_global_history_recorder() + + +class ClientCreator: + """Creates client objects for a service.""" + + def __init__( + self, + loader, + endpoint_resolver, + user_agent, + event_emitter, + retry_handler_factory, + retry_config_translator, + response_parser_factory=None, + exceptions_factory=None, + config_store=None, + user_agent_creator=None, + ): + self._loader = loader + self._endpoint_resolver = endpoint_resolver + self._user_agent = user_agent + self._event_emitter = event_emitter + self._retry_handler_factory = retry_handler_factory + self._retry_config_translator = retry_config_translator + self._response_parser_factory = response_parser_factory + self._exceptions_factory = exceptions_factory + # TODO: Migrate things away from scoped_config in favor of the + # config_store. The config store can pull things from both the scoped + # config and environment variables (and potentially more in the + # future). + self._config_store = config_store + self._user_agent_creator = user_agent_creator + + def create_client( + self, + service_name, + region_name, + is_secure=True, + endpoint_url=None, + verify=None, + credentials=None, + scoped_config=None, + api_version=None, + client_config=None, + auth_token=None, + ): + responses = self._event_emitter.emit( + 'choose-service-name', service_name=service_name + ) + service_name = first_non_none_response(responses, default=service_name) + service_model = self._load_service_model(service_name, api_version) + try: + endpoints_ruleset_data = self._load_service_endpoints_ruleset( + service_name, api_version + ) + partition_data = self._loader.load_data('partitions') + except UnknownServiceError: + endpoints_ruleset_data = None + partition_data = None + logger.info( + 'No endpoints ruleset found for service %s, falling back to ' + 'legacy endpoint routing.', + service_name, + ) + + cls = self._create_client_class(service_name, service_model) + region_name, client_config = self._normalize_fips_region( + region_name, client_config + ) + if auth := service_model.metadata.get('auth'): + service_signature_version = resolve_auth_type(auth) + else: + service_signature_version = service_model.metadata.get( + 'signatureVersion' + ) + endpoint_bridge = ClientEndpointBridge( + self._endpoint_resolver, + scoped_config, + client_config, + service_signing_name=service_model.metadata.get('signingName'), + config_store=self._config_store, + service_signature_version=service_signature_version, + ) + client_args = self._get_client_args( + service_model, + region_name, + is_secure, + endpoint_url, + verify, + credentials, + scoped_config, + client_config, + endpoint_bridge, + auth_token, + endpoints_ruleset_data, + partition_data, + ) + service_client = cls(**client_args) + self._register_retries(service_client) + self._register_s3_events( + client=service_client, + endpoint_bridge=None, + endpoint_url=None, + client_config=client_config, + scoped_config=scoped_config, + ) + self._register_s3express_events(client=service_client) + self._register_s3_control_events(client=service_client) + self._register_endpoint_discovery( + service_client, endpoint_url, client_config + ) + return service_client + + def create_client_class(self, service_name, api_version=None): + service_model = self._load_service_model(service_name, api_version) + return self._create_client_class(service_name, service_model) + + def _create_client_class(self, service_name, service_model): + class_attributes = self._create_methods(service_model) + py_name_to_operation_name = self._create_name_mapping(service_model) + class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name + bases = [BaseClient] + service_id = service_model.service_id.hyphenize() + self._event_emitter.emit( + f'creating-client-class.{service_id}', + class_attributes=class_attributes, + base_classes=bases, + ) + class_name = get_service_module_name(service_model) + cls = type(str(class_name), tuple(bases), class_attributes) + return cls + + def _normalize_fips_region(self, region_name, client_config): + if region_name is not None: + normalized_region_name = region_name.replace('fips-', '').replace( + '-fips', '' + ) + # If region has been transformed then set flag + if normalized_region_name != region_name: + config_use_fips_endpoint = Config(use_fips_endpoint=True) + if client_config: + # Keeping endpoint setting client specific + client_config = client_config.merge( + config_use_fips_endpoint + ) + else: + client_config = config_use_fips_endpoint + logger.warning( + f'transforming region from {region_name} to ' + f'{normalized_region_name} and setting ' + 'use_fips_endpoint to true. client should not ' + 'be configured with a fips psuedo region.' + ) + region_name = normalized_region_name + return region_name, client_config + + def _load_service_model(self, service_name, api_version=None): + json_model = self._loader.load_service_model( + service_name, 'service-2', api_version=api_version + ) + service_model = ServiceModel(json_model, service_name=service_name) + return service_model + + def _load_service_endpoints_ruleset(self, service_name, api_version=None): + return self._loader.load_service_model( + service_name, 'endpoint-rule-set-1', api_version=api_version + ) + + def _register_retries(self, client): + retry_mode = client.meta.config.retries['mode'] + if retry_mode == 'standard': + self._register_v2_standard_retries(client) + elif retry_mode == 'adaptive': + self._register_v2_standard_retries(client) + self._register_v2_adaptive_retries(client) + elif retry_mode == 'legacy': + self._register_legacy_retries(client) + + def _register_v2_standard_retries(self, client): + max_attempts = client.meta.config.retries.get('total_max_attempts') + kwargs = {'client': client} + if max_attempts is not None: + kwargs['max_attempts'] = max_attempts + standard.register_retry_handler(**kwargs) + + def _register_v2_adaptive_retries(self, client): + adaptive.register_retry_handler(client) + + def _register_legacy_retries(self, client): + endpoint_prefix = client.meta.service_model.endpoint_prefix + service_id = client.meta.service_model.service_id + service_event_name = service_id.hyphenize() + + # First, we load the entire retry config for all services, + # then pull out just the information we need. + original_config = self._loader.load_data('_retry') + if not original_config: + return + + retries = self._transform_legacy_retries(client.meta.config.retries) + retry_config = self._retry_config_translator.build_retry_config( + endpoint_prefix, + original_config.get('retry', {}), + original_config.get('definitions', {}), + retries, + ) + + logger.debug( + "Registering retry handlers for service: %s", + client.meta.service_model.service_name, + ) + handler = self._retry_handler_factory.create_retry_handler( + retry_config, endpoint_prefix + ) + unique_id = f'retry-config-{service_event_name}' + client.meta.events.register( + f"needs-retry.{service_event_name}", handler, unique_id=unique_id + ) + + def _transform_legacy_retries(self, retries): + if retries is None: + return + copied_args = retries.copy() + if 'total_max_attempts' in retries: + copied_args = retries.copy() + copied_args['max_attempts'] = ( + copied_args.pop('total_max_attempts') - 1 + ) + return copied_args + + def _get_retry_mode(self, client, config_store): + client_retries = client.meta.config.retries + if ( + client_retries is not None + and client_retries.get('mode') is not None + ): + return client_retries['mode'] + return config_store.get_config_variable('retry_mode') or 'legacy' + + def _register_endpoint_discovery(self, client, endpoint_url, config): + if endpoint_url is not None: + # Don't register any handlers in the case of a custom endpoint url + return + # Only attach handlers if the service supports discovery + if client.meta.service_model.endpoint_discovery_operation is None: + return + events = client.meta.events + service_id = client.meta.service_model.service_id.hyphenize() + enabled = False + if config and config.endpoint_discovery_enabled is not None: + enabled = config.endpoint_discovery_enabled + elif self._config_store: + enabled = self._config_store.get_config_variable( + 'endpoint_discovery_enabled' + ) + + enabled = self._normalize_endpoint_discovery_config(enabled) + if enabled and self._requires_endpoint_discovery(client, enabled): + discover = enabled is True + manager = EndpointDiscoveryManager( + client, always_discover=discover + ) + handler = EndpointDiscoveryHandler(manager) + handler.register(events, service_id) + else: + events.register( + 'before-parameter-build', + block_endpoint_discovery_required_operations, + ) + + def _normalize_endpoint_discovery_config(self, enabled): + """Config must either be a boolean-string or string-literal 'auto'""" + if isinstance(enabled, str): + enabled = enabled.lower().strip() + if enabled == 'auto': + return enabled + elif enabled in ('true', 'false'): + return ensure_boolean(enabled) + elif isinstance(enabled, bool): + return enabled + + raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled) + + def _requires_endpoint_discovery(self, client, enabled): + if enabled == "auto": + return client.meta.service_model.endpoint_discovery_required + return enabled + + def _register_eventbridge_events( + self, client, endpoint_bridge, endpoint_url + ): + if client.meta.service_model.service_name != 'events': + return + EventbridgeSignerSetter( + endpoint_resolver=self._endpoint_resolver, + region=client.meta.region_name, + endpoint_url=endpoint_url, + ).register(client.meta.events) + + def _register_s3express_events( + self, + client, + endpoint_bridge=None, + endpoint_url=None, + client_config=None, + scoped_config=None, + ): + if client.meta.service_model.service_name != 's3': + return + S3ExpressIdentityResolver(client, RefreshableCredentials).register() + + def _register_s3_events( + self, + client, + endpoint_bridge, + endpoint_url, + client_config, + scoped_config, + ): + if client.meta.service_model.service_name != 's3': + return + S3RegionRedirectorv2(None, client).register() + self._set_s3_presign_signature_version( + client.meta, client_config, scoped_config + ) + client.meta.events.register( + 'before-parameter-build.s3', self._inject_s3_input_parameters + ) + + def _register_s3_control_events( + self, + client, + endpoint_bridge=None, + endpoint_url=None, + client_config=None, + scoped_config=None, + ): + if client.meta.service_model.service_name != 's3control': + return + S3ControlArnParamHandlerv2().register(client.meta.events) + + def _set_s3_presign_signature_version( + self, client_meta, client_config, scoped_config + ): + # This will return the manually configured signature version, or None + # if none was manually set. If a customer manually sets the signature + # version, we always want to use what they set. + provided_signature_version = _get_configured_signature_version( + 's3', client_config, scoped_config + ) + if provided_signature_version is not None: + return + + # Check to see if the region is a region that we know about. If we + # don't know about a region, then we can safely assume it's a new + # region that is sigv4 only, since all new S3 regions only allow sigv4. + # The only exception is aws-global. This is a pseudo-region for the + # global endpoint, we should respect the signature versions it + # supports, which includes v2. + regions = self._endpoint_resolver.get_available_endpoints( + 's3', client_meta.partition + ) + if ( + client_meta.region_name != 'aws-global' + and client_meta.region_name not in regions + ): + return + + # If it is a region we know about, we want to default to sigv2, so here + # we check to see if it is available. + endpoint = self._endpoint_resolver.construct_endpoint( + 's3', client_meta.region_name + ) + signature_versions = endpoint['signatureVersions'] + if 's3' not in signature_versions: + return + + # We now know that we're in a known region that supports sigv2 and + # the customer hasn't set a signature version so we default the + # signature version to sigv2. + client_meta.events.register( + 'choose-signer.s3', self._default_s3_presign_to_sigv2 + ) + + def _inject_s3_input_parameters(self, params, context, **kwargs): + context['input_params'] = {} + inject_parameters = ('Bucket', 'Delete', 'Key', 'Prefix') + for inject_parameter in inject_parameters: + if inject_parameter in params: + context['input_params'][inject_parameter] = params[ + inject_parameter + ] + + def _default_s3_presign_to_sigv2(self, signature_version, **kwargs): + """ + Returns the 's3' (sigv2) signer if presigning an s3 request. This is + intended to be used to set the default signature version for the signer + to sigv2. Situations where an asymmetric signature is required are the + exception, for example MRAP needs v4a. + + :type signature_version: str + :param signature_version: The current client signature version. + + :type signing_name: str + :param signing_name: The signing name of the service. + + :return: 's3' if the request is an s3 presign request, None otherwise + """ + if signature_version.startswith('v4a'): + return + + if signature_version.startswith('v4-s3express'): + return signature_version + + for suffix in ['-query', '-presign-post']: + if signature_version.endswith(suffix): + return f's3{suffix}' + + def _get_client_args( + self, + service_model, + region_name, + is_secure, + endpoint_url, + verify, + credentials, + scoped_config, + client_config, + endpoint_bridge, + auth_token, + endpoints_ruleset_data, + partition_data, + ): + args_creator = ClientArgsCreator( + self._event_emitter, + self._user_agent, + self._response_parser_factory, + self._loader, + self._exceptions_factory, + config_store=self._config_store, + user_agent_creator=self._user_agent_creator, + ) + return args_creator.get_client_args( + service_model, + region_name, + is_secure, + endpoint_url, + verify, + credentials, + scoped_config, + client_config, + endpoint_bridge, + auth_token, + endpoints_ruleset_data, + partition_data, + ) + + def _create_methods(self, service_model): + op_dict = {} + for operation_name in service_model.operation_names: + py_operation_name = xform_name(operation_name) + op_dict[py_operation_name] = self._create_api_method( + py_operation_name, operation_name, service_model + ) + return op_dict + + def _create_name_mapping(self, service_model): + # py_name -> OperationName, for every operation available + # for a service. + mapping = {} + for operation_name in service_model.operation_names: + py_operation_name = xform_name(operation_name) + mapping[py_operation_name] = operation_name + return mapping + + def _create_api_method( + self, py_operation_name, operation_name, service_model + ): + def _api_call(self, *args, **kwargs): + # We're accepting *args so that we can give a more helpful + # error message than TypeError: _api_call takes exactly + # 1 argument. + if args: + raise TypeError( + f"{py_operation_name}() only accepts keyword arguments." + ) + # The "self" in this scope is referring to the BaseClient. + return self._make_api_call(operation_name, kwargs) + + _api_call.__name__ = str(py_operation_name) + + # Add the docstring to the client method + operation_model = service_model.operation_model(operation_name) + docstring = ClientMethodDocstring( + operation_model=operation_model, + method_name=operation_name, + event_emitter=self._event_emitter, + method_description=operation_model.documentation, + example_prefix=f'response = client.{py_operation_name}', + include_signature=False, + ) + _api_call.__doc__ = docstring + return _api_call + + +class ClientEndpointBridge: + """Bridges endpoint data and client creation + + This class handles taking out the relevant arguments from the endpoint + resolver and determining which values to use, taking into account any + client configuration options and scope configuration options. + + This class also handles determining what, if any, region to use if no + explicit region setting is provided. For example, Amazon S3 client will + utilize "us-east-1" by default if no region can be resolved.""" + + DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com' + _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control'] + + def __init__( + self, + endpoint_resolver, + scoped_config=None, + client_config=None, + default_endpoint=None, + service_signing_name=None, + config_store=None, + service_signature_version=None, + ): + self.service_signing_name = service_signing_name + self.endpoint_resolver = endpoint_resolver + self.scoped_config = scoped_config + self.client_config = client_config + self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT + self.config_store = config_store + self.service_signature_version = service_signature_version + + def resolve( + self, service_name, region_name=None, endpoint_url=None, is_secure=True + ): + region_name = self._check_default_region(service_name, region_name) + use_dualstack_endpoint = self._resolve_use_dualstack_endpoint( + service_name + ) + use_fips_endpoint = self._resolve_endpoint_variant_config_var( + 'use_fips_endpoint' + ) + resolved = self.endpoint_resolver.construct_endpoint( + service_name, + region_name, + use_dualstack_endpoint=use_dualstack_endpoint, + use_fips_endpoint=use_fips_endpoint, + ) + + # If we can't resolve the region, we'll attempt to get a global + # endpoint for non-regionalized services (iam, route53, etc) + if not resolved: + # TODO: fallback partition_name should be configurable in the + # future for users to define as needed. + resolved = self.endpoint_resolver.construct_endpoint( + service_name, + region_name, + partition_name='aws', + use_dualstack_endpoint=use_dualstack_endpoint, + use_fips_endpoint=use_fips_endpoint, + ) + + if resolved: + return self._create_endpoint( + resolved, service_name, region_name, endpoint_url, is_secure + ) + else: + return self._assume_endpoint( + service_name, region_name, endpoint_url, is_secure + ) + + def resolver_uses_builtin_data(self): + return self.endpoint_resolver.uses_builtin_data + + def _check_default_region(self, service_name, region_name): + if region_name is not None: + return region_name + # Use the client_config region if no explicit region was provided. + if self.client_config and self.client_config.region_name is not None: + return self.client_config.region_name + + def _create_endpoint( + self, resolved, service_name, region_name, endpoint_url, is_secure + ): + region_name, signing_region = self._pick_region_values( + resolved, region_name, endpoint_url + ) + if endpoint_url is None: + endpoint_url = self._make_url( + resolved.get('hostname'), + is_secure, + resolved.get('protocols', []), + ) + signature_version = self._resolve_signature_version( + service_name, resolved + ) + signing_name = self._resolve_signing_name(service_name, resolved) + return self._create_result( + service_name=service_name, + region_name=region_name, + signing_region=signing_region, + signing_name=signing_name, + endpoint_url=endpoint_url, + metadata=resolved, + signature_version=signature_version, + ) + + def _resolve_endpoint_variant_config_var(self, config_var): + client_config = self.client_config + config_val = False + + # Client configuration arg has precedence + if client_config and getattr(client_config, config_var) is not None: + return getattr(client_config, config_var) + elif self.config_store is not None: + # Check config store + config_val = self.config_store.get_config_variable(config_var) + return config_val + + def _resolve_use_dualstack_endpoint(self, service_name): + s3_dualstack_mode = self._is_s3_dualstack_mode(service_name) + if s3_dualstack_mode is not None: + return s3_dualstack_mode + return self._resolve_endpoint_variant_config_var( + 'use_dualstack_endpoint' + ) + + def _is_s3_dualstack_mode(self, service_name): + if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES: + return None + # TODO: This normalization logic is duplicated from the + # ClientArgsCreator class. Consolidate everything to + # ClientArgsCreator. _resolve_signature_version also has similarly + # duplicated logic. + client_config = self.client_config + if ( + client_config is not None + and client_config.s3 is not None + and 'use_dualstack_endpoint' in client_config.s3 + ): + # Client config trumps scoped config. + return client_config.s3['use_dualstack_endpoint'] + if self.scoped_config is not None: + enabled = self.scoped_config.get('s3', {}).get( + 'use_dualstack_endpoint' + ) + if enabled in [True, 'True', 'true']: + return True + + def _assume_endpoint( + self, service_name, region_name, endpoint_url, is_secure + ): + if endpoint_url is None: + # Expand the default hostname URI template. + hostname = self.default_endpoint.format( + service=service_name, region=region_name + ) + endpoint_url = self._make_url( + hostname, is_secure, ['http', 'https'] + ) + logger.debug( + f'Assuming an endpoint for {service_name}, {region_name}: {endpoint_url}' + ) + # We still want to allow the user to provide an explicit version. + signature_version = self._resolve_signature_version( + service_name, {'signatureVersions': ['v4']} + ) + signing_name = self._resolve_signing_name(service_name, resolved={}) + return self._create_result( + service_name=service_name, + region_name=region_name, + signing_region=region_name, + signing_name=signing_name, + signature_version=signature_version, + endpoint_url=endpoint_url, + metadata={}, + ) + + def _create_result( + self, + service_name, + region_name, + signing_region, + signing_name, + endpoint_url, + signature_version, + metadata, + ): + return { + 'service_name': service_name, + 'region_name': region_name, + 'signing_region': signing_region, + 'signing_name': signing_name, + 'endpoint_url': endpoint_url, + 'signature_version': signature_version, + 'metadata': metadata, + } + + def _make_url(self, hostname, is_secure, supported_protocols): + if is_secure and 'https' in supported_protocols: + scheme = 'https' + else: + scheme = 'http' + return f'{scheme}://{hostname}' + + def _resolve_signing_name(self, service_name, resolved): + # CredentialScope overrides everything else. + if ( + 'credentialScope' in resolved + and 'service' in resolved['credentialScope'] + ): + return resolved['credentialScope']['service'] + # Use the signingName from the model if present. + if self.service_signing_name: + return self.service_signing_name + # Just assume is the same as the service name. + return service_name + + def _pick_region_values(self, resolved, region_name, endpoint_url): + signing_region = region_name + if endpoint_url is None: + # Do not use the region name or signing name from the resolved + # endpoint if the user explicitly provides an endpoint_url. This + # would happen if we resolve to an endpoint where the service has + # a "defaults" section that overrides all endpoint with a single + # hostname and credentialScope. This has been the case historically + # for how STS has worked. The only way to resolve an STS endpoint + # was to provide a region_name and an endpoint_url. In that case, + # we would still resolve an endpoint, but we would not use the + # resolved endpointName or signingRegion because we want to allow + # custom endpoints. + region_name = resolved['endpointName'] + signing_region = region_name + if ( + 'credentialScope' in resolved + and 'region' in resolved['credentialScope'] + ): + signing_region = resolved['credentialScope']['region'] + return region_name, signing_region + + def _resolve_signature_version(self, service_name, resolved): + configured_version = _get_configured_signature_version( + service_name, self.client_config, self.scoped_config + ) + if configured_version is not None: + return configured_version + + potential_versions = resolved.get('signatureVersions', []) + if ( + self.service_signature_version is not None + and self.service_signature_version + not in _LEGACY_SIGNATURE_VERSIONS + ): + # Prefer the service model as most specific + # source of truth for new signature versions. + potential_versions = [self.service_signature_version] + + # Pick a signature version from the endpoint metadata if present. + if 'signatureVersions' in resolved: + if service_name == 's3': + return 's3v4' + if 'v4' in potential_versions: + return 'v4' + # Now just iterate over the signature versions in order until we + # find the first one that is known to Botocore. + for known in potential_versions: + if known in AUTH_TYPE_MAPS: + return known + raise UnknownSignatureVersionError( + signature_version=potential_versions + ) + + +class BaseClient: + # This is actually reassigned with the py->op_name mapping + # when the client creator creates the subclass. This value is used + # because calls such as client.get_paginator('list_objects') use the + # snake_case name, but we need to know the ListObjects form. + # xform_name() does the ListObjects->list_objects conversion, but + # we need the reverse mapping here. + _PY_TO_OP_NAME = {} + + def __init__( + self, + serializer, + endpoint, + response_parser, + event_emitter, + request_signer, + service_model, + loader, + client_config, + partition, + exceptions_factory, + endpoint_ruleset_resolver=None, + user_agent_creator=None, + ): + self._serializer = serializer + self._endpoint = endpoint + self._ruleset_resolver = endpoint_ruleset_resolver + self._response_parser = response_parser + self._request_signer = request_signer + self._cache = {} + self._loader = loader + self._client_config = client_config + self.meta = ClientMeta( + event_emitter, + self._client_config, + endpoint.host, + service_model, + self._PY_TO_OP_NAME, + partition, + ) + self._exceptions_factory = exceptions_factory + self._exceptions = None + self._user_agent_creator = user_agent_creator + if self._user_agent_creator is None: + self._user_agent_creator = ( + UserAgentString.from_environment().with_client_config( + self._client_config + ) + ) + self._register_handlers() + + def __getattr__(self, item): + service_id = self._service_model.service_id.hyphenize() + event_name = f'getattr.{service_id}.{item}' + + handler, event_response = self.meta.events.emit_until_response( + event_name, client=self + ) + + if event_response is not None: + return event_response + + raise AttributeError( + f"'{self.__class__.__name__}' object has no attribute '{item}'" + ) + + def close(self): + """Closes underlying endpoint connections.""" + self._endpoint.close() + + def _register_handlers(self): + # Register the handler required to sign requests. + service_id = self.meta.service_model.service_id.hyphenize() + self.meta.events.register( + f"request-created.{service_id}", self._request_signer.handler + ) + # Rebuild user agent string right before request is sent + # to ensure all registered features are included. + self.meta.events.register_last( + f"request-created.{service_id}", + self._user_agent_creator.rebuild_and_replace_user_agent_handler, + ) + + @property + def _service_model(self): + return self.meta.service_model + + @with_current_context() + def _make_api_call(self, operation_name, api_params): + operation_model = self._service_model.operation_model(operation_name) + service_name = self._service_model.service_name + history_recorder.record( + 'API_CALL', + { + 'service': service_name, + 'operation': operation_name, + 'params': api_params, + }, + ) + if operation_model.deprecated: + logger.debug( + 'Warning: %s.%s() is deprecated', service_name, operation_name + ) + request_context = { + 'client_region': self.meta.region_name, + 'client_config': self.meta.config, + 'has_streaming_input': operation_model.has_streaming_input, + 'auth_type': operation_model.resolved_auth_type, + 'unsigned_payload': operation_model.unsigned_payload, + } + + api_params = self._emit_api_params( + api_params=api_params, + operation_model=operation_model, + context=request_context, + ) + ( + endpoint_url, + additional_headers, + properties, + ) = self._resolve_endpoint_ruleset( + operation_model, api_params, request_context + ) + if properties: + # Pass arbitrary endpoint info with the Request + # for use during construction. + request_context['endpoint_properties'] = properties + request_dict = self._convert_to_request_dict( + api_params=api_params, + operation_model=operation_model, + endpoint_url=endpoint_url, + context=request_context, + headers=additional_headers, + ) + resolve_checksum_context(request_dict, operation_model, api_params) + + service_id = self._service_model.service_id.hyphenize() + handler, event_response = self.meta.events.emit_until_response( + f'before-call.{service_id}.{operation_name}', + model=operation_model, + params=request_dict, + request_signer=self._request_signer, + context=request_context, + ) + + if event_response is not None: + http, parsed_response = event_response + else: + maybe_compress_request( + self.meta.config, request_dict, operation_model + ) + apply_request_checksum(request_dict) + http, parsed_response = self._make_request( + operation_model, request_dict, request_context + ) + + self.meta.events.emit( + f'after-call.{service_id}.{operation_name}', + http_response=http, + parsed=parsed_response, + model=operation_model, + context=request_context, + ) + + if http.status_code >= 300: + error_info = parsed_response.get("Error", {}) + error_code = error_info.get("QueryErrorCode") or error_info.get( + "Code" + ) + error_class = self.exceptions.from_code(error_code) + raise error_class(parsed_response, operation_name) + else: + return parsed_response + + def _make_request(self, operation_model, request_dict, request_context): + try: + return self._endpoint.make_request(operation_model, request_dict) + except Exception as e: + self.meta.events.emit( + f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}', + exception=e, + context=request_context, + ) + raise + + def _convert_to_request_dict( + self, + api_params, + operation_model, + endpoint_url, + context=None, + headers=None, + set_user_agent_header=True, + ): + request_dict = self._serializer.serialize_to_request( + api_params, operation_model + ) + if not self._client_config.inject_host_prefix: + request_dict.pop('host_prefix', None) + if headers is not None: + request_dict['headers'].update(headers) + if set_user_agent_header: + user_agent = self._user_agent_creator.to_string() + else: + user_agent = None + prepare_request_dict( + request_dict, + endpoint_url=endpoint_url, + user_agent=user_agent, + context=context, + ) + return request_dict + + def _emit_api_params(self, api_params, operation_model, context): + # Given the API params provided by the user and the operation_model + # we can serialize the request to a request_dict. + operation_name = operation_model.name + + # Emit an event that allows users to modify the parameters at the + # beginning of the method. It allows handlers to modify existing + # parameters or return a new set of parameters to use. + service_id = self._service_model.service_id.hyphenize() + responses = self.meta.events.emit( + f'provide-client-params.{service_id}.{operation_name}', + params=api_params, + model=operation_model, + context=context, + ) + api_params = first_non_none_response(responses, default=api_params) + + self.meta.events.emit( + f'before-parameter-build.{service_id}.{operation_name}', + params=api_params, + model=operation_model, + context=context, + ) + return api_params + + def _resolve_endpoint_ruleset( + self, + operation_model, + params, + request_context, + ignore_signing_region=False, + ): + """Returns endpoint URL and list of additional headers returned from + EndpointRulesetResolver for the given operation and params. If the + ruleset resolver is not available, for example because the service has + no endpoints ruleset file, the legacy endpoint resolver's value is + returned. + + Use ignore_signing_region for generating presigned URLs or any other + situation where the signing region information from the ruleset + resolver should be ignored. + + Returns tuple of URL and headers dictionary. Additionally, the + request_context dict is modified in place with any signing information + returned from the ruleset resolver. + """ + if self._ruleset_resolver is None: + endpoint_url = self.meta.endpoint_url + additional_headers = {} + endpoint_properties = {} + else: + endpoint_info = self._ruleset_resolver.construct_endpoint( + operation_model=operation_model, + call_args=params, + request_context=request_context, + ) + endpoint_url = endpoint_info.url + additional_headers = endpoint_info.headers + endpoint_properties = endpoint_info.properties + # If authSchemes is present, overwrite default auth type and + # signing context derived from service model. + auth_schemes = endpoint_info.properties.get('authSchemes') + if auth_schemes is not None: + auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx( + auth_schemes + ) + auth_type, signing_context = auth_info + request_context['auth_type'] = auth_type + if 'region' in signing_context and ignore_signing_region: + del signing_context['region'] + if 'signing' in request_context: + request_context['signing'].update(signing_context) + else: + request_context['signing'] = signing_context + + return endpoint_url, additional_headers, endpoint_properties + + def get_paginator(self, operation_name): + """Create a paginator for an operation. + + :type operation_name: string + :param operation_name: The operation name. This is the same name + as the method name on the client. For example, if the + method name is ``create_foo``, and you'd normally invoke the + operation as ``client.create_foo(**kwargs)``, if the + ``create_foo`` operation can be paginated, you can use the + call ``client.get_paginator("create_foo")``. + + :raise OperationNotPageableError: Raised if the operation is not + pageable. You can use the ``client.can_paginate`` method to + check if an operation is pageable. + + :rtype: ``botocore.paginate.Paginator`` + :return: A paginator object. + + """ + if not self.can_paginate(operation_name): + raise OperationNotPageableError(operation_name=operation_name) + else: + actual_operation_name = self._PY_TO_OP_NAME[operation_name] + + # Create a new paginate method that will serve as a proxy to + # the underlying Paginator.paginate method. This is needed to + # attach a docstring to the method. + def paginate(self, **kwargs): + return Paginator.paginate(self, **kwargs) + + paginator_config = self._cache['page_config'][ + actual_operation_name + ] + # Add the docstring for the paginate method. + paginate.__doc__ = PaginatorDocstring( + paginator_name=actual_operation_name, + event_emitter=self.meta.events, + service_model=self.meta.service_model, + paginator_config=paginator_config, + include_signature=False, + ) + + # Rename the paginator class based on the type of paginator. + service_module_name = get_service_module_name( + self.meta.service_model + ) + paginator_class_name = ( + f"{service_module_name}.Paginator.{actual_operation_name}" + ) + + # Create the new paginator class + documented_paginator_cls = type( + paginator_class_name, (Paginator,), {'paginate': paginate} + ) + + operation_model = self._service_model.operation_model( + actual_operation_name + ) + paginator = documented_paginator_cls( + getattr(self, operation_name), + paginator_config, + operation_model, + ) + return paginator + + def can_paginate(self, operation_name): + """Check if an operation can be paginated. + + :type operation_name: string + :param operation_name: The operation name. This is the same name + as the method name on the client. For example, if the + method name is ``create_foo``, and you'd normally invoke the + operation as ``client.create_foo(**kwargs)``, if the + ``create_foo`` operation can be paginated, you can use the + call ``client.get_paginator("create_foo")``. + + :return: ``True`` if the operation can be paginated, + ``False`` otherwise. + + """ + if 'page_config' not in self._cache: + try: + page_config = self._loader.load_service_model( + self._service_model.service_name, + 'paginators-1', + self._service_model.api_version, + )['pagination'] + self._cache['page_config'] = page_config + except DataNotFoundError: + self._cache['page_config'] = {} + actual_operation_name = self._PY_TO_OP_NAME[operation_name] + return actual_operation_name in self._cache['page_config'] + + def _get_waiter_config(self): + if 'waiter_config' not in self._cache: + try: + waiter_config = self._loader.load_service_model( + self._service_model.service_name, + 'waiters-2', + self._service_model.api_version, + ) + self._cache['waiter_config'] = waiter_config + except DataNotFoundError: + self._cache['waiter_config'] = {} + return self._cache['waiter_config'] + + def get_waiter(self, waiter_name): + """Returns an object that can wait for some condition. + + :type waiter_name: str + :param waiter_name: The name of the waiter to get. See the waiters + section of the service docs for a list of available waiters. + + :returns: The specified waiter object. + :rtype: ``botocore.waiter.Waiter`` + """ + config = self._get_waiter_config() + if not config: + raise ValueError(f"Waiter does not exist: {waiter_name}") + model = waiter.WaiterModel(config) + mapping = {} + for name in model.waiter_names: + mapping[xform_name(name)] = name + if waiter_name not in mapping: + raise ValueError(f"Waiter does not exist: {waiter_name}") + + return waiter.create_waiter_with_client( + mapping[waiter_name], model, self + ) + + @CachedProperty + def waiter_names(self): + """Returns a list of all available waiters.""" + config = self._get_waiter_config() + if not config: + return [] + model = waiter.WaiterModel(config) + # Waiter configs is a dict, we just want the waiter names + # which are the keys in the dict. + return [xform_name(name) for name in model.waiter_names] + + @property + def exceptions(self): + if self._exceptions is None: + self._exceptions = self._load_exceptions() + return self._exceptions + + def _load_exceptions(self): + return self._exceptions_factory.create_client_exceptions( + self._service_model + ) + + def _get_credentials(self): + """ + This private interface is subject to abrupt breaking changes, including + removal, in any botocore release. + """ + return self._request_signer._credentials + + +class ClientMeta: + """Holds additional client methods. + + This class holds additional information for clients. It exists for + two reasons: + + * To give advanced functionality to clients + * To namespace additional client attributes from the operation + names which are mapped to methods at runtime. This avoids + ever running into collisions with operation names. + + """ + + def __init__( + self, + events, + client_config, + endpoint_url, + service_model, + method_to_api_mapping, + partition, + ): + self.events = events + self._client_config = client_config + self._endpoint_url = endpoint_url + self._service_model = service_model + self._method_to_api_mapping = method_to_api_mapping + self._partition = partition + + @property + def service_model(self): + return self._service_model + + @property + def region_name(self): + return self._client_config.region_name + + @property + def endpoint_url(self): + return self._endpoint_url + + @property + def config(self): + return self._client_config + + @property + def method_to_api_mapping(self): + return self._method_to_api_mapping + + @property + def partition(self): + return self._partition + + +def _get_configured_signature_version( + service_name, client_config, scoped_config +): + """ + Gets the manually configured signature version. + + :returns: the customer configured signature version, or None if no + signature version was configured. + """ + # Client config overrides everything. + if client_config and client_config.signature_version is not None: + return client_config.signature_version + + # Scoped config overrides picking from the endpoint metadata. + if scoped_config is not None: + # A given service may have service specific configuration in the + # config file, so we need to check there as well. + service_config = scoped_config.get(service_name) + if service_config is not None and isinstance(service_config, dict): + version = service_config.get('signature_version') + if version: + logger.debug( + "Switching signature version for service %s " + "to version %s based on config file override.", + service_name, + version, + ) + return version + return None |