From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .../lib/python3.12/site-packages/botocore/model.py | 982 +++++++++++++++++++++ 1 file changed, 982 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/botocore/model.py (limited to '.venv/lib/python3.12/site-packages/botocore/model.py') diff --git a/.venv/lib/python3.12/site-packages/botocore/model.py b/.venv/lib/python3.12/site-packages/botocore/model.py new file mode 100644 index 00000000..263aa548 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/model.py @@ -0,0 +1,982 @@ +# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Abstractions to interact with service models.""" + +from collections import defaultdict +from typing import NamedTuple, Union + +from botocore.auth import resolve_auth_type +from botocore.compat import OrderedDict +from botocore.exceptions import ( + MissingServiceIdError, + UndefinedModelAttributeError, +) +from botocore.utils import CachedProperty, hyphenize_service_id, instance_cache + +NOT_SET = object() + + +class NoShapeFoundError(Exception): + pass + + +class InvalidShapeError(Exception): + pass + + +class OperationNotFoundError(Exception): + pass + + +class InvalidShapeReferenceError(Exception): + pass + + +class ServiceId(str): + def hyphenize(self): + return hyphenize_service_id(self) + + +class Shape: + """Object representing a shape from the service model.""" + + # To simplify serialization logic, all shape params that are + # related to serialization are moved from the top level hash into + # a 'serialization' hash. This list below contains the names of all + # the attributes that should be moved. + SERIALIZED_ATTRS = [ + 'locationName', + 'queryName', + 'flattened', + 'location', + 'payload', + 'streaming', + 'timestampFormat', + 'xmlNamespace', + 'resultWrapper', + 'xmlAttribute', + 'eventstream', + 'event', + 'eventheader', + 'eventpayload', + 'jsonvalue', + 'timestampFormat', + 'hostLabel', + ] + METADATA_ATTRS = [ + 'required', + 'min', + 'max', + 'pattern', + 'sensitive', + 'enum', + 'idempotencyToken', + 'error', + 'exception', + 'endpointdiscoveryid', + 'retryable', + 'document', + 'union', + 'contextParam', + 'clientContextParams', + 'requiresLength', + ] + MAP_TYPE = OrderedDict + + def __init__(self, shape_name, shape_model, shape_resolver=None): + """ + + :type shape_name: string + :param shape_name: The name of the shape. + + :type shape_model: dict + :param shape_model: The shape model. This would be the value + associated with the key in the "shapes" dict of the + service model (i.e ``model['shapes'][shape_name]``) + + :type shape_resolver: botocore.model.ShapeResolver + :param shape_resolver: A shape resolver object. This is used to + resolve references to other shapes. For scalar shape types + (string, integer, boolean, etc.), this argument is not + required. If a shape_resolver is not provided for a complex + type, then a ``ValueError`` will be raised when an attempt + to resolve a shape is made. + + """ + self.name = shape_name + self.type_name = shape_model['type'] + self.documentation = shape_model.get('documentation', '') + self._shape_model = shape_model + if shape_resolver is None: + # If a shape_resolver is not provided, we create an object + # that will throw errors if you attempt to resolve + # a shape. This is actually ok for scalar shapes + # because they don't need to resolve shapes and shouldn't + # be required to provide an object they won't use. + shape_resolver = UnresolvableShapeMap() + self._shape_resolver = shape_resolver + self._cache = {} + + @CachedProperty + def serialization(self): + """Serialization information about the shape. + + This contains information that may be needed for input serialization + or response parsing. This can include: + + * name + * queryName + * flattened + * location + * payload + * streaming + * xmlNamespace + * resultWrapper + * xmlAttribute + * jsonvalue + * timestampFormat + + :rtype: dict + :return: Serialization information about the shape. + + """ + model = self._shape_model + serialization = {} + for attr in self.SERIALIZED_ATTRS: + if attr in self._shape_model: + serialization[attr] = model[attr] + # For consistency, locationName is renamed to just 'name'. + if 'locationName' in serialization: + serialization['name'] = serialization.pop('locationName') + return serialization + + @CachedProperty + def metadata(self): + """Metadata about the shape. + + This requires optional information about the shape, including: + + * min + * max + * pattern + * enum + * sensitive + * required + * idempotencyToken + * document + * union + * contextParam + * clientContextParams + * requiresLength + + :rtype: dict + :return: Metadata about the shape. + + """ + model = self._shape_model + metadata = {} + for attr in self.METADATA_ATTRS: + if attr in self._shape_model: + metadata[attr] = model[attr] + return metadata + + @CachedProperty + def required_members(self): + """A list of members that are required. + + A structure shape can define members that are required. + This value will return a list of required members. If there + are no required members an empty list is returned. + + """ + return self.metadata.get('required', []) + + def _resolve_shape_ref(self, shape_ref): + return self._shape_resolver.resolve_shape_ref(shape_ref) + + def __repr__(self): + return f"<{self.__class__.__name__}({self.name})>" + + @property + def event_stream_name(self): + return None + + +class StructureShape(Shape): + @CachedProperty + def members(self): + members = self._shape_model.get('members', self.MAP_TYPE()) + # The members dict looks like: + # 'members': { + # 'MemberName': {'shape': 'shapeName'}, + # 'MemberName2': {'shape': 'shapeName'}, + # } + # We return a dict of member name to Shape object. + shape_members = self.MAP_TYPE() + for name, shape_ref in members.items(): + shape_members[name] = self._resolve_shape_ref(shape_ref) + return shape_members + + @CachedProperty + def event_stream_name(self): + for member_name, member in self.members.items(): + if member.serialization.get('eventstream'): + return member_name + return None + + @CachedProperty + def error_code(self): + if not self.metadata.get('exception', False): + return None + error_metadata = self.metadata.get("error", {}) + code = error_metadata.get("code") + if code: + return code + # Use the exception name if there is no explicit code modeled + return self.name + + @CachedProperty + def is_document_type(self): + return self.metadata.get('document', False) + + @CachedProperty + def is_tagged_union(self): + return self.metadata.get('union', False) + + +class ListShape(Shape): + @CachedProperty + def member(self): + return self._resolve_shape_ref(self._shape_model['member']) + + +class MapShape(Shape): + @CachedProperty + def key(self): + return self._resolve_shape_ref(self._shape_model['key']) + + @CachedProperty + def value(self): + return self._resolve_shape_ref(self._shape_model['value']) + + +class StringShape(Shape): + @CachedProperty + def enum(self): + return self.metadata.get('enum', []) + + +class StaticContextParameter(NamedTuple): + name: str + value: Union[bool, str] + + +class ContextParameter(NamedTuple): + name: str + member_name: str + + +class ClientContextParameter(NamedTuple): + name: str + type: str + documentation: str + + +class ServiceModel: + """ + + :ivar service_description: The parsed service description dictionary. + + """ + + def __init__(self, service_description, service_name=None): + """ + + :type service_description: dict + :param service_description: The service description model. This value + is obtained from a botocore.loader.Loader, or from directly loading + the file yourself:: + + service_description = json.load( + open('/path/to/service-description-model.json')) + model = ServiceModel(service_description) + + :type service_name: str + :param service_name: The name of the service. Normally this is + the endpoint prefix defined in the service_description. However, + you can override this value to provide a more convenient name. + This is done in a few places in botocore (ses instead of email, + emr instead of elasticmapreduce). If this value is not provided, + it will default to the endpointPrefix defined in the model. + + """ + self._service_description = service_description + # We want clients to be able to access metadata directly. + self.metadata = service_description.get('metadata', {}) + self._shape_resolver = ShapeResolver( + service_description.get('shapes', {}) + ) + self._signature_version = NOT_SET + self._service_name = service_name + self._instance_cache = {} + + def shape_for(self, shape_name, member_traits=None): + return self._shape_resolver.get_shape_by_name( + shape_name, member_traits + ) + + def shape_for_error_code(self, error_code): + return self._error_code_cache.get(error_code, None) + + @CachedProperty + def _error_code_cache(self): + error_code_cache = {} + for error_shape in self.error_shapes: + code = error_shape.error_code + error_code_cache[code] = error_shape + return error_code_cache + + def resolve_shape_ref(self, shape_ref): + return self._shape_resolver.resolve_shape_ref(shape_ref) + + @CachedProperty + def shape_names(self): + return list(self._service_description.get('shapes', {})) + + @CachedProperty + def error_shapes(self): + error_shapes = [] + for shape_name in self.shape_names: + error_shape = self.shape_for(shape_name) + if error_shape.metadata.get('exception', False): + error_shapes.append(error_shape) + return error_shapes + + @instance_cache + def operation_model(self, operation_name): + try: + model = self._service_description['operations'][operation_name] + except KeyError: + raise OperationNotFoundError(operation_name) + return OperationModel(model, self, operation_name) + + @CachedProperty + def documentation(self): + return self._service_description.get('documentation', '') + + @CachedProperty + def operation_names(self): + return list(self._service_description.get('operations', [])) + + @CachedProperty + def service_name(self): + """The name of the service. + + This defaults to the endpointPrefix defined in the service model. + However, this value can be overriden when a ``ServiceModel`` is + created. If a service_name was not provided when the ``ServiceModel`` + was created and if there is no endpointPrefix defined in the + service model, then an ``UndefinedModelAttributeError`` exception + will be raised. + + """ + if self._service_name is not None: + return self._service_name + else: + return self.endpoint_prefix + + @CachedProperty + def service_id(self): + try: + return ServiceId(self._get_metadata_property('serviceId')) + except UndefinedModelAttributeError: + raise MissingServiceIdError(service_name=self._service_name) + + @CachedProperty + def signing_name(self): + """The name to use when computing signatures. + + If the model does not define a signing name, this + value will be the endpoint prefix defined in the model. + """ + signing_name = self.metadata.get('signingName') + if signing_name is None: + signing_name = self.endpoint_prefix + return signing_name + + @CachedProperty + def api_version(self): + return self._get_metadata_property('apiVersion') + + @CachedProperty + def protocol(self): + return self._get_metadata_property('protocol') + + @CachedProperty + def protocols(self): + return self._get_metadata_property('protocols') + + @CachedProperty + def endpoint_prefix(self): + return self._get_metadata_property('endpointPrefix') + + @CachedProperty + def endpoint_discovery_operation(self): + for operation in self.operation_names: + model = self.operation_model(operation) + if model.is_endpoint_discovery_operation: + return model + + @CachedProperty + def endpoint_discovery_required(self): + for operation in self.operation_names: + model = self.operation_model(operation) + if ( + model.endpoint_discovery is not None + and model.endpoint_discovery.get('required') + ): + return True + return False + + @CachedProperty + def client_context_parameters(self): + params = self._service_description.get('clientContextParams', {}) + return [ + ClientContextParameter( + name=param_name, + type=param_val['type'], + documentation=param_val['documentation'], + ) + for param_name, param_val in params.items() + ] + + def _get_metadata_property(self, name): + try: + return self.metadata[name] + except KeyError: + raise UndefinedModelAttributeError( + f'"{name}" not defined in the metadata of the model: {self}' + ) + + # Signature version is one of the rare properties + # that can be modified so a CachedProperty is not used here. + + @property + def signature_version(self): + if self._signature_version is NOT_SET: + signature_version = self.metadata.get('signatureVersion') + self._signature_version = signature_version + return self._signature_version + + @signature_version.setter + def signature_version(self, value): + self._signature_version = value + + @CachedProperty + def is_query_compatible(self): + return 'awsQueryCompatible' in self.metadata + + def __repr__(self): + return f'{self.__class__.__name__}({self.service_name})' + + +class OperationModel: + def __init__(self, operation_model, service_model, name=None): + """ + + :type operation_model: dict + :param operation_model: The operation model. This comes from the + service model, and is the value associated with the operation + name in the service model (i.e ``model['operations'][op_name]``). + + :type service_model: botocore.model.ServiceModel + :param service_model: The service model associated with the operation. + + :type name: string + :param name: The operation name. This is the operation name exposed to + the users of this model. This can potentially be different from + the "wire_name", which is the operation name that *must* by + provided over the wire. For example, given:: + + "CreateCloudFrontOriginAccessIdentity":{ + "name":"CreateCloudFrontOriginAccessIdentity2014_11_06", + ... + } + + The ``name`` would be ``CreateCloudFrontOriginAccessIdentity``, + but the ``self.wire_name`` would be + ``CreateCloudFrontOriginAccessIdentity2014_11_06``, which is the + value we must send in the corresponding HTTP request. + + """ + self._operation_model = operation_model + self._service_model = service_model + self._api_name = name + # Clients can access '.name' to get the operation name + # and '.metadata' to get the top level metdata of the service. + self._wire_name = operation_model.get('name') + self.metadata = service_model.metadata + self.http = operation_model.get('http', {}) + + @CachedProperty + def name(self): + if self._api_name is not None: + return self._api_name + else: + return self.wire_name + + @property + def wire_name(self): + """The wire name of the operation. + + In many situations this is the same value as the + ``name``, value, but in some services, the operation name + exposed to the user is different from the operation name + we send across the wire (e.g cloudfront). + + Any serialization code should use ``wire_name``. + + """ + return self._operation_model.get('name') + + @property + def service_model(self): + return self._service_model + + @CachedProperty + def documentation(self): + return self._operation_model.get('documentation', '') + + @CachedProperty + def deprecated(self): + return self._operation_model.get('deprecated', False) + + @CachedProperty + def endpoint_discovery(self): + # Explicit None default. An empty dictionary for this trait means it is + # enabled but not required to be used. + return self._operation_model.get('endpointdiscovery', None) + + @CachedProperty + def is_endpoint_discovery_operation(self): + return self._operation_model.get('endpointoperation', False) + + @CachedProperty + def input_shape(self): + if 'input' not in self._operation_model: + # Some operations do not accept any input and do not define an + # input shape. + return None + return self._service_model.resolve_shape_ref( + self._operation_model['input'] + ) + + @CachedProperty + def output_shape(self): + if 'output' not in self._operation_model: + # Some operations do not define an output shape, + # in which case we return None to indicate the + # operation has no expected output. + return None + return self._service_model.resolve_shape_ref( + self._operation_model['output'] + ) + + @CachedProperty + def idempotent_members(self): + input_shape = self.input_shape + if not input_shape: + return [] + + return [ + name + for (name, shape) in input_shape.members.items() + if 'idempotencyToken' in shape.metadata + and shape.metadata['idempotencyToken'] + ] + + @CachedProperty + def static_context_parameters(self): + params = self._operation_model.get('staticContextParams', {}) + return [ + StaticContextParameter(name=name, value=props.get('value')) + for name, props in params.items() + ] + + @CachedProperty + def context_parameters(self): + if not self.input_shape: + return [] + + return [ + ContextParameter( + name=shape.metadata['contextParam']['name'], + member_name=name, + ) + for name, shape in self.input_shape.members.items() + if 'contextParam' in shape.metadata + and 'name' in shape.metadata['contextParam'] + ] + + @CachedProperty + def operation_context_parameters(self): + return self._operation_model.get('operationContextParams', []) + + @CachedProperty + def request_compression(self): + return self._operation_model.get('requestcompression') + + @CachedProperty + def auth(self): + return self._operation_model.get('auth') + + @CachedProperty + def auth_type(self): + return self._operation_model.get('authtype') + + @CachedProperty + def resolved_auth_type(self): + if self.auth: + return resolve_auth_type(self.auth) + return self.auth_type + + @CachedProperty + def unsigned_payload(self): + return self._operation_model.get('unsignedPayload') + + @CachedProperty + def error_shapes(self): + shapes = self._operation_model.get("errors", []) + return list(self._service_model.resolve_shape_ref(s) for s in shapes) + + @CachedProperty + def endpoint(self): + return self._operation_model.get('endpoint') + + @CachedProperty + def http_checksum_required(self): + return self._operation_model.get('httpChecksumRequired', False) + + @CachedProperty + def http_checksum(self): + return self._operation_model.get('httpChecksum', {}) + + @CachedProperty + def has_event_stream_input(self): + return self.get_event_stream_input() is not None + + @CachedProperty + def has_event_stream_output(self): + return self.get_event_stream_output() is not None + + def get_event_stream_input(self): + return self._get_event_stream(self.input_shape) + + def get_event_stream_output(self): + return self._get_event_stream(self.output_shape) + + def _get_event_stream(self, shape): + """Returns the event stream member's shape if any or None otherwise.""" + if shape is None: + return None + event_name = shape.event_stream_name + if event_name: + return shape.members[event_name] + return None + + @CachedProperty + def has_streaming_input(self): + return self.get_streaming_input() is not None + + @CachedProperty + def has_streaming_output(self): + return self.get_streaming_output() is not None + + def get_streaming_input(self): + return self._get_streaming_body(self.input_shape) + + def get_streaming_output(self): + return self._get_streaming_body(self.output_shape) + + def _get_streaming_body(self, shape): + """Returns the streaming member's shape if any; or None otherwise.""" + if shape is None: + return None + payload = shape.serialization.get('payload') + if payload is not None: + payload_shape = shape.members[payload] + if payload_shape.type_name == 'blob': + return payload_shape + return None + + def __repr__(self): + return f'{self.__class__.__name__}(name={self.name})' + + +class ShapeResolver: + """Resolves shape references.""" + + # Any type not in this mapping will default to the Shape class. + SHAPE_CLASSES = { + 'structure': StructureShape, + 'list': ListShape, + 'map': MapShape, + 'string': StringShape, + } + + def __init__(self, shape_map): + self._shape_map = shape_map + self._shape_cache = {} + + def get_shape_by_name(self, shape_name, member_traits=None): + try: + shape_model = self._shape_map[shape_name] + except KeyError: + raise NoShapeFoundError(shape_name) + try: + shape_cls = self.SHAPE_CLASSES.get(shape_model['type'], Shape) + except KeyError: + raise InvalidShapeError( + f"Shape is missing required key 'type': {shape_model}" + ) + if member_traits: + shape_model = shape_model.copy() + shape_model.update(member_traits) + result = shape_cls(shape_name, shape_model, self) + return result + + def resolve_shape_ref(self, shape_ref): + # A shape_ref is a dict that has a 'shape' key that + # refers to a shape name as well as any additional + # member traits that are then merged over the shape + # definition. For example: + # {"shape": "StringType", "locationName": "Foobar"} + if len(shape_ref) == 1 and 'shape' in shape_ref: + # It's just a shape ref with no member traits, we can avoid + # a .copy(). This is the common case so it's specifically + # called out here. + return self.get_shape_by_name(shape_ref['shape']) + else: + member_traits = shape_ref.copy() + try: + shape_name = member_traits.pop('shape') + except KeyError: + raise InvalidShapeReferenceError( + f"Invalid model, missing shape reference: {shape_ref}" + ) + return self.get_shape_by_name(shape_name, member_traits) + + +class UnresolvableShapeMap: + """A ShapeResolver that will throw ValueErrors when shapes are resolved.""" + + def get_shape_by_name(self, shape_name, member_traits=None): + raise ValueError( + f"Attempted to lookup shape '{shape_name}', but no shape map was provided." + ) + + def resolve_shape_ref(self, shape_ref): + raise ValueError( + f"Attempted to resolve shape '{shape_ref}', but no shape " + f"map was provided." + ) + + +class DenormalizedStructureBuilder: + """Build a StructureShape from a denormalized model. + + This is a convenience builder class that makes it easy to construct + ``StructureShape``s based on a denormalized model. + + It will handle the details of creating unique shape names and creating + the appropriate shape map needed by the ``StructureShape`` class. + + Example usage:: + + builder = DenormalizedStructureBuilder() + shape = builder.with_members({ + 'A': { + 'type': 'structure', + 'members': { + 'B': { + 'type': 'structure', + 'members': { + 'C': { + 'type': 'string', + } + } + } + } + } + }).build_model() + # ``shape`` is now an instance of botocore.model.StructureShape + + :type dict_type: class + :param dict_type: The dictionary type to use, allowing you to opt-in + to using OrderedDict or another dict type. This can + be particularly useful for testing when order + matters, such as for documentation. + + """ + + SCALAR_TYPES = ( + 'string', + 'integer', + 'boolean', + 'blob', + 'float', + 'timestamp', + 'long', + 'double', + 'char', + ) + + def __init__(self, name=None): + self.members = OrderedDict() + self._name_generator = ShapeNameGenerator() + if name is None: + self.name = self._name_generator.new_shape_name('structure') + + def with_members(self, members): + """ + + :type members: dict + :param members: The denormalized members. + + :return: self + + """ + self._members = members + return self + + def build_model(self): + """Build the model based on the provided members. + + :rtype: botocore.model.StructureShape + :return: The built StructureShape object. + + """ + shapes = OrderedDict() + denormalized = { + 'type': 'structure', + 'members': self._members, + } + self._build_model(denormalized, shapes, self.name) + resolver = ShapeResolver(shape_map=shapes) + return StructureShape( + shape_name=self.name, + shape_model=shapes[self.name], + shape_resolver=resolver, + ) + + def _build_model(self, model, shapes, shape_name): + if model['type'] == 'structure': + shapes[shape_name] = self._build_structure(model, shapes) + elif model['type'] == 'list': + shapes[shape_name] = self._build_list(model, shapes) + elif model['type'] == 'map': + shapes[shape_name] = self._build_map(model, shapes) + elif model['type'] in self.SCALAR_TYPES: + shapes[shape_name] = self._build_scalar(model) + else: + raise InvalidShapeError(f"Unknown shape type: {model['type']}") + + def _build_structure(self, model, shapes): + members = OrderedDict() + shape = self._build_initial_shape(model) + shape['members'] = members + + for name, member_model in model.get('members', OrderedDict()).items(): + member_shape_name = self._get_shape_name(member_model) + members[name] = {'shape': member_shape_name} + self._build_model(member_model, shapes, member_shape_name) + return shape + + def _build_list(self, model, shapes): + member_shape_name = self._get_shape_name(model) + shape = self._build_initial_shape(model) + shape['member'] = {'shape': member_shape_name} + self._build_model(model['member'], shapes, member_shape_name) + return shape + + def _build_map(self, model, shapes): + key_shape_name = self._get_shape_name(model['key']) + value_shape_name = self._get_shape_name(model['value']) + shape = self._build_initial_shape(model) + shape['key'] = {'shape': key_shape_name} + shape['value'] = {'shape': value_shape_name} + self._build_model(model['key'], shapes, key_shape_name) + self._build_model(model['value'], shapes, value_shape_name) + return shape + + def _build_initial_shape(self, model): + shape = { + 'type': model['type'], + } + if 'documentation' in model: + shape['documentation'] = model['documentation'] + for attr in Shape.METADATA_ATTRS: + if attr in model: + shape[attr] = model[attr] + return shape + + def _build_scalar(self, model): + return self._build_initial_shape(model) + + def _get_shape_name(self, model): + if 'shape_name' in model: + return model['shape_name'] + else: + return self._name_generator.new_shape_name(model['type']) + + +class ShapeNameGenerator: + """Generate unique shape names for a type. + + This class can be used in conjunction with the DenormalizedStructureBuilder + to generate unique shape names for a given type. + + """ + + def __init__(self): + self._name_cache = defaultdict(int) + + def new_shape_name(self, type_name): + """Generate a unique shape name. + + This method will guarantee a unique shape name each time it is + called with the same type. + + :: + + >>> s = ShapeNameGenerator() + >>> s.new_shape_name('structure') + 'StructureType1' + >>> s.new_shape_name('structure') + 'StructureType2' + >>> s.new_shape_name('list') + 'ListType1' + >>> s.new_shape_name('list') + 'ListType2' + + + :type type_name: string + :param type_name: The type name (structure, list, map, string, etc.) + + :rtype: string + :return: A unique shape name for the given type + + """ + self._name_cache[type_name] += 1 + current_index = self._name_cache[type_name] + return f'{type_name.capitalize()}Type{current_index}' -- cgit v1.2.3