aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/serialize.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/serialize.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/serialize.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/serialize.py1119
1 files changed, 1119 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/serialize.py b/.venv/lib/python3.12/site-packages/botocore/serialize.py
new file mode 100644
index 00000000..7d2a61d0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/serialize.py
@@ -0,0 +1,1119 @@
+# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""Protocol input serializes.
+
+This module contains classes that implement input serialization
+for the various AWS protocol types.
+
+These classes essentially take user input, a model object that
+represents what the expected input should look like, and it returns
+a dictionary that contains the various parts of a request. A few
+high level design decisions:
+
+
+* Each protocol type maps to a separate class, all inherit from
+ ``Serializer``.
+* The return value for ``serialize_to_request`` (the main entry
+ point) returns a dictionary that represents a request. This
+ will have keys like ``url_path``, ``query_string``, etc. This
+ is done so that it's a) easy to test and b) not tied to a
+ particular HTTP library. See the ``serialize_to_request`` docstring
+ for more details.
+
+Unicode
+-------
+
+The input to the serializers should be text (str/unicode), not bytes,
+with the exception of blob types. Those are assumed to be binary,
+and if a str/unicode type is passed in, it will be encoded as utf-8.
+"""
+
+import base64
+import calendar
+import datetime
+import json
+import math
+import re
+import struct
+from xml.etree import ElementTree
+
+from botocore import validate
+from botocore.compat import formatdate
+from botocore.exceptions import ParamValidationError
+from botocore.utils import (
+ has_header,
+ is_json_value_header,
+ parse_to_aware_datetime,
+ percent_encode,
+)
+
+# From the spec, the default timestamp format if not specified is iso8601.
+DEFAULT_TIMESTAMP_FORMAT = 'iso8601'
+ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
+# Same as ISO8601, but with microsecond precision.
+ISO8601_MICRO = '%Y-%m-%dT%H:%M:%S.%fZ'
+HOST_PREFIX_RE = re.compile(r"^[A-Za-z0-9\.\-]+$")
+
+
+def create_serializer(protocol_name, include_validation=True):
+ # TODO: Unknown protocols.
+ serializer = SERIALIZERS[protocol_name]()
+ if include_validation:
+ validator = validate.ParamValidator()
+ serializer = validate.ParamValidationDecorator(validator, serializer)
+ return serializer
+
+
+class Serializer:
+ DEFAULT_METHOD = 'POST'
+ # Clients can change this to a different MutableMapping
+ # (i.e OrderedDict) if they want. This is used in the
+ # compliance test to match the hash ordering used in the
+ # tests.
+ MAP_TYPE = dict
+ DEFAULT_ENCODING = 'utf-8'
+
+ def serialize_to_request(self, parameters, operation_model):
+ """Serialize parameters into an HTTP request.
+
+ This method takes user provided parameters and a shape
+ model and serializes the parameters to an HTTP request.
+ More specifically, this method returns information about
+ parts of the HTTP request, it does not enforce a particular
+ interface or standard for an HTTP request. It instead returns
+ a dictionary of:
+
+ * 'url_path'
+ * 'host_prefix'
+ * 'query_string'
+ * 'headers'
+ * 'body'
+ * 'method'
+
+ It is then up to consumers to decide how to map this to a Request
+ object of their HTTP library of choice. Below is an example
+ return value::
+
+ {'body': {'Action': 'OperationName',
+ 'Bar': 'val2',
+ 'Foo': 'val1',
+ 'Version': '2014-01-01'},
+ 'headers': {},
+ 'method': 'POST',
+ 'query_string': '',
+ 'host_prefix': 'value.',
+ 'url_path': '/'}
+
+ :param parameters: The dictionary input parameters for the
+ operation (i.e the user input).
+ :param operation_model: The OperationModel object that describes
+ the operation.
+ """
+ raise NotImplementedError("serialize_to_request")
+
+ def _create_default_request(self):
+ # Creates a boilerplate default request dict that subclasses
+ # can use as a starting point.
+ serialized = {
+ 'url_path': '/',
+ 'query_string': '',
+ 'method': self.DEFAULT_METHOD,
+ 'headers': {},
+ # An empty body is represented as an empty byte string.
+ 'body': b'',
+ }
+ return serialized
+
+ # Some extra utility methods subclasses can use.
+
+ def _timestamp_iso8601(self, value):
+ if value.microsecond > 0:
+ timestamp_format = ISO8601_MICRO
+ else:
+ timestamp_format = ISO8601
+ return value.strftime(timestamp_format)
+
+ def _timestamp_unixtimestamp(self, value):
+ return int(calendar.timegm(value.timetuple()))
+
+ def _timestamp_rfc822(self, value):
+ if isinstance(value, datetime.datetime):
+ value = self._timestamp_unixtimestamp(value)
+ return formatdate(value, usegmt=True)
+
+ def _convert_timestamp_to_str(self, value, timestamp_format=None):
+ if timestamp_format is None:
+ timestamp_format = self.TIMESTAMP_FORMAT
+ timestamp_format = timestamp_format.lower()
+ datetime_obj = parse_to_aware_datetime(value)
+ converter = getattr(self, f'_timestamp_{timestamp_format}')
+ final_value = converter(datetime_obj)
+ return final_value
+
+ def _get_serialized_name(self, shape, default_name):
+ # Returns the serialized name for the shape if it exists.
+ # Otherwise it will return the passed in default_name.
+ return shape.serialization.get('name', default_name)
+
+ def _get_base64(self, value):
+ # Returns the base64-encoded version of value, handling
+ # both strings and bytes. The returned value is a string
+ # via the default encoding.
+ if isinstance(value, str):
+ value = value.encode(self.DEFAULT_ENCODING)
+ return base64.b64encode(value).strip().decode(self.DEFAULT_ENCODING)
+
+ def _expand_host_prefix(self, parameters, operation_model):
+ operation_endpoint = operation_model.endpoint
+ if (
+ operation_endpoint is None
+ or 'hostPrefix' not in operation_endpoint
+ ):
+ return None
+
+ host_prefix_expression = operation_endpoint['hostPrefix']
+ input_members = operation_model.input_shape.members
+ host_labels = [
+ member
+ for member, shape in input_members.items()
+ if shape.serialization.get('hostLabel')
+ ]
+ format_kwargs = {}
+ bad_labels = []
+ for name in host_labels:
+ param = parameters[name]
+ if not HOST_PREFIX_RE.match(param):
+ bad_labels.append(name)
+ format_kwargs[name] = param
+ if bad_labels:
+ raise ParamValidationError(
+ report=(
+ f"Invalid value for parameter(s): {', '.join(bad_labels)}. "
+ "Must contain only alphanumeric characters, hyphen, "
+ "or period."
+ )
+ )
+ return host_prefix_expression.format(**format_kwargs)
+
+
+class QuerySerializer(Serializer):
+ TIMESTAMP_FORMAT = 'iso8601'
+
+ def serialize_to_request(self, parameters, operation_model):
+ shape = operation_model.input_shape
+ serialized = self._create_default_request()
+ serialized['method'] = operation_model.http.get(
+ 'method', self.DEFAULT_METHOD
+ )
+ serialized['headers'] = {
+ 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8'
+ }
+ # The query serializer only deals with body params so
+ # that's what we hand off the _serialize_* methods.
+ body_params = self.MAP_TYPE()
+ body_params['Action'] = operation_model.name
+ body_params['Version'] = operation_model.metadata['apiVersion']
+ if shape is not None:
+ self._serialize(body_params, parameters, shape)
+ serialized['body'] = body_params
+
+ host_prefix = self._expand_host_prefix(parameters, operation_model)
+ if host_prefix is not None:
+ serialized['host_prefix'] = host_prefix
+
+ return serialized
+
+ def _serialize(self, serialized, value, shape, prefix=''):
+ # serialized: The dict that is incrementally added to with the
+ # final serialized parameters.
+ # value: The current user input value.
+ # shape: The shape object that describes the structure of the
+ # input.
+ # prefix: The incrementally built up prefix for the serialized
+ # key (i.e Foo.bar.members.1).
+ method = getattr(
+ self,
+ f'_serialize_type_{shape.type_name}',
+ self._default_serialize,
+ )
+ method(serialized, value, shape, prefix=prefix)
+
+ def _serialize_type_structure(self, serialized, value, shape, prefix=''):
+ members = shape.members
+ for key, value in value.items():
+ member_shape = members[key]
+ member_prefix = self._get_serialized_name(member_shape, key)
+ if prefix:
+ member_prefix = f'{prefix}.{member_prefix}'
+ self._serialize(serialized, value, member_shape, member_prefix)
+
+ def _serialize_type_list(self, serialized, value, shape, prefix=''):
+ if not value:
+ # The query protocol serializes empty lists.
+ serialized[prefix] = ''
+ return
+ if self._is_shape_flattened(shape):
+ list_prefix = prefix
+ if shape.member.serialization.get('name'):
+ name = self._get_serialized_name(shape.member, default_name='')
+ # Replace '.Original' with '.{name}'.
+ list_prefix = '.'.join(prefix.split('.')[:-1] + [name])
+ else:
+ list_name = shape.member.serialization.get('name', 'member')
+ list_prefix = f'{prefix}.{list_name}'
+ for i, element in enumerate(value, 1):
+ element_prefix = f'{list_prefix}.{i}'
+ element_shape = shape.member
+ self._serialize(serialized, element, element_shape, element_prefix)
+
+ def _serialize_type_map(self, serialized, value, shape, prefix=''):
+ if self._is_shape_flattened(shape):
+ full_prefix = prefix
+ else:
+ full_prefix = f'{prefix}.entry'
+ template = full_prefix + '.{i}.{suffix}'
+ key_shape = shape.key
+ value_shape = shape.value
+ key_suffix = self._get_serialized_name(key_shape, default_name='key')
+ value_suffix = self._get_serialized_name(value_shape, 'value')
+ for i, key in enumerate(value, 1):
+ key_prefix = template.format(i=i, suffix=key_suffix)
+ value_prefix = template.format(i=i, suffix=value_suffix)
+ self._serialize(serialized, key, key_shape, key_prefix)
+ self._serialize(serialized, value[key], value_shape, value_prefix)
+
+ def _serialize_type_blob(self, serialized, value, shape, prefix=''):
+ # Blob args must be base64 encoded.
+ serialized[prefix] = self._get_base64(value)
+
+ def _serialize_type_timestamp(self, serialized, value, shape, prefix=''):
+ serialized[prefix] = self._convert_timestamp_to_str(
+ value, shape.serialization.get('timestampFormat')
+ )
+
+ def _serialize_type_boolean(self, serialized, value, shape, prefix=''):
+ if value:
+ serialized[prefix] = 'true'
+ else:
+ serialized[prefix] = 'false'
+
+ def _default_serialize(self, serialized, value, shape, prefix=''):
+ serialized[prefix] = value
+
+ def _is_shape_flattened(self, shape):
+ return shape.serialization.get('flattened')
+
+
+class EC2Serializer(QuerySerializer):
+ """EC2 specific customizations to the query protocol serializers.
+
+ The EC2 model is almost, but not exactly, similar to the query protocol
+ serializer. This class encapsulates those differences. The model
+ will have be marked with a ``protocol`` of ``ec2``, so you don't need
+ to worry about wiring this class up correctly.
+
+ """
+
+ def _get_serialized_name(self, shape, default_name):
+ # Returns the serialized name for the shape if it exists.
+ # Otherwise it will return the passed in default_name.
+ if 'queryName' in shape.serialization:
+ return shape.serialization['queryName']
+ elif 'name' in shape.serialization:
+ # A locationName is always capitalized
+ # on input for the ec2 protocol.
+ name = shape.serialization['name']
+ return name[0].upper() + name[1:]
+ else:
+ return default_name
+
+ def _serialize_type_list(self, serialized, value, shape, prefix=''):
+ for i, element in enumerate(value, 1):
+ element_prefix = f'{prefix}.{i}'
+ element_shape = shape.member
+ self._serialize(serialized, element, element_shape, element_prefix)
+
+
+class JSONSerializer(Serializer):
+ TIMESTAMP_FORMAT = 'unixtimestamp'
+
+ def serialize_to_request(self, parameters, operation_model):
+ target = '{}.{}'.format(
+ operation_model.metadata['targetPrefix'],
+ operation_model.name,
+ )
+ json_version = operation_model.metadata['jsonVersion']
+ serialized = self._create_default_request()
+ serialized['method'] = operation_model.http.get(
+ 'method', self.DEFAULT_METHOD
+ )
+ serialized['headers'] = {
+ 'X-Amz-Target': target,
+ 'Content-Type': f'application/x-amz-json-{json_version}',
+ }
+ body = self.MAP_TYPE()
+ input_shape = operation_model.input_shape
+ if input_shape is not None:
+ self._serialize(body, parameters, input_shape)
+ serialized['body'] = json.dumps(body).encode(self.DEFAULT_ENCODING)
+
+ host_prefix = self._expand_host_prefix(parameters, operation_model)
+ if host_prefix is not None:
+ serialized['host_prefix'] = host_prefix
+
+ return serialized
+
+ def _serialize(self, serialized, value, shape, key=None):
+ method = getattr(
+ self,
+ f'_serialize_type_{shape.type_name}',
+ self._default_serialize,
+ )
+ method(serialized, value, shape, key)
+
+ def _serialize_type_structure(self, serialized, value, shape, key):
+ if shape.is_document_type:
+ serialized[key] = value
+ else:
+ if key is not None:
+ # If a key is provided, this is a result of a recursive
+ # call so we need to add a new child dict as the value
+ # of the passed in serialized dict. We'll then add
+ # all the structure members as key/vals in the new serialized
+ # dictionary we just created.
+ new_serialized = self.MAP_TYPE()
+ serialized[key] = new_serialized
+ serialized = new_serialized
+ members = shape.members
+ for member_key, member_value in value.items():
+ member_shape = members[member_key]
+ if 'name' in member_shape.serialization:
+ member_key = member_shape.serialization['name']
+ self._serialize(
+ serialized, member_value, member_shape, member_key
+ )
+
+ def _serialize_type_map(self, serialized, value, shape, key):
+ map_obj = self.MAP_TYPE()
+ serialized[key] = map_obj
+ for sub_key, sub_value in value.items():
+ self._serialize(map_obj, sub_value, shape.value, sub_key)
+
+ def _serialize_type_list(self, serialized, value, shape, key):
+ list_obj = []
+ serialized[key] = list_obj
+ for list_item in value:
+ wrapper = {}
+ # The JSON list serialization is the only case where we aren't
+ # setting a key on a dict. We handle this by using
+ # a __current__ key on a wrapper dict to serialize each
+ # list item before appending it to the serialized list.
+ self._serialize(wrapper, list_item, shape.member, "__current__")
+ list_obj.append(wrapper["__current__"])
+
+ def _default_serialize(self, serialized, value, shape, key):
+ serialized[key] = value
+
+ def _serialize_type_timestamp(self, serialized, value, shape, key):
+ serialized[key] = self._convert_timestamp_to_str(
+ value, shape.serialization.get('timestampFormat')
+ )
+
+ def _serialize_type_blob(self, serialized, value, shape, key):
+ serialized[key] = self._get_base64(value)
+
+
+class CBORSerializer(Serializer):
+ UNSIGNED_INT_MAJOR_TYPE = 0
+ NEGATIVE_INT_MAJOR_TYPE = 1
+ BLOB_MAJOR_TYPE = 2
+ STRING_MAJOR_TYPE = 3
+ LIST_MAJOR_TYPE = 4
+ MAP_MAJOR_TYPE = 5
+ TAG_MAJOR_TYPE = 6
+ FLOAT_AND_SIMPLE_MAJOR_TYPE = 7
+
+ def _serialize_data_item(self, serialized, value, shape, key=None):
+ method = getattr(self, f'_serialize_type_{shape.type_name}')
+ if method is None:
+ raise ValueError(
+ f"Unrecognized C2J type: {shape.type_name}, unable to "
+ f"serialize request"
+ )
+ method(serialized, value, shape, key)
+
+ def _serialize_type_integer(self, serialized, value, shape, key):
+ if value >= 0:
+ major_type = self.UNSIGNED_INT_MAJOR_TYPE
+ else:
+ major_type = self.NEGATIVE_INT_MAJOR_TYPE
+ # The only differences in serializing negative and positive integers is
+ # that for negative, we set the major type to 1 and set the value to -1
+ # minus the value
+ value = -1 - value
+ additional_info, num_bytes = self._get_additional_info_and_num_bytes(
+ value
+ )
+ initial_byte = self._get_initial_byte(major_type, additional_info)
+ if num_bytes == 0:
+ serialized.extend(initial_byte)
+ else:
+ serialized.extend(initial_byte + value.to_bytes(num_bytes, "big"))
+
+ def _serialize_type_long(self, serialized, value, shape, key):
+ self._serialize_type_integer(serialized, value, shape, key)
+
+ def _serialize_type_blob(self, serialized, value, shape, key):
+ if isinstance(value, str):
+ value = value.encode('utf-8')
+ elif not isinstance(value, (bytes, bytearray)):
+ # We support file-like objects for blobs; these already have been
+ # validated to ensure they have a read method
+ value = value.read()
+ length = len(value)
+ additional_info, num_bytes = self._get_additional_info_and_num_bytes(
+ length
+ )
+ initial_byte = self._get_initial_byte(
+ self.BLOB_MAJOR_TYPE, additional_info
+ )
+ if num_bytes == 0:
+ serialized.extend(initial_byte)
+ else:
+ serialized.extend(initial_byte + length.to_bytes(num_bytes, "big"))
+ serialized.extend(value)
+
+ def _serialize_type_string(self, serialized, value, shape, key):
+ encoded = value.encode('utf-8')
+ length = len(encoded)
+ additional_info, num_bytes = self._get_additional_info_and_num_bytes(
+ length
+ )
+ initial_byte = self._get_initial_byte(
+ self.STRING_MAJOR_TYPE, additional_info
+ )
+ if num_bytes == 0:
+ serialized.extend(initial_byte + encoded)
+ else:
+ serialized.extend(
+ initial_byte + length.to_bytes(num_bytes, "big") + encoded
+ )
+
+ def _serialize_type_list(self, serialized, value, shape, key):
+ length = len(value)
+ additional_info, num_bytes = self._get_additional_info_and_num_bytes(
+ length
+ )
+ initial_byte = self._get_initial_byte(
+ self.LIST_MAJOR_TYPE, additional_info
+ )
+ if num_bytes == 0:
+ serialized.extend(initial_byte)
+ else:
+ serialized.extend(initial_byte + length.to_bytes(num_bytes, "big"))
+ for item in value:
+ self._serialize_data_item(serialized, item, shape.member)
+
+ def _serialize_type_map(self, serialized, value, shape, key):
+ length = len(value)
+ additional_info, num_bytes = self._get_additional_info_and_num_bytes(
+ length
+ )
+ initial_byte = self._get_initial_byte(
+ self.MAP_MAJOR_TYPE, additional_info
+ )
+ if num_bytes == 0:
+ serialized.extend(initial_byte)
+ else:
+ serialized.extend(initial_byte + length.to_bytes(num_bytes, "big"))
+ for key_item, item in value.items():
+ self._serialize_data_item(serialized, key_item, shape.key)
+ self._serialize_data_item(serialized, item, shape.value)
+
+ def _serialize_type_structure(self, serialized, value, shape, key):
+ if key is not None:
+ # For nested structures, we need to serialize the key first
+ self._serialize_data_item(serialized, key, shape.key_shape)
+
+ # Remove `None` values from the dictionary
+ value = {k: v for k, v in value.items() if v is not None}
+
+ map_length = len(value)
+ additional_info, num_bytes = self._get_additional_info_and_num_bytes(
+ map_length
+ )
+ initial_byte = self._get_initial_byte(
+ self.MAP_MAJOR_TYPE, additional_info
+ )
+ if num_bytes == 0:
+ serialized.extend(initial_byte)
+ else:
+ serialized.extend(
+ initial_byte + map_length.to_bytes(num_bytes, "big")
+ )
+
+ members = shape.members
+ for member_key, member_value in value.items():
+ member_shape = members[member_key]
+ if 'name' in member_shape.serialization:
+ member_key = member_shape.serialization['name']
+ if member_value is not None:
+ self._serialize_type_string(serialized, member_key, None, None)
+ self._serialize_data_item(
+ serialized, member_value, member_shape
+ )
+
+ def _serialize_type_timestamp(self, serialized, value, shape, key):
+ timestamp = self._convert_timestamp_to_str(value)
+ tag = 1 # Use tag 1 for unix timestamp
+ initial_byte = self._get_initial_byte(self.TAG_MAJOR_TYPE, tag)
+ serialized.extend(initial_byte) # Tagging the timestamp
+ additional_info, num_bytes = self._get_additional_info_and_num_bytes(
+ timestamp
+ )
+
+ if num_bytes == 0:
+ initial_byte = self._get_initial_byte(
+ self.UNSIGNED_INT_MAJOR_TYPE, timestamp
+ )
+ serialized.extend(initial_byte)
+ else:
+ initial_byte = self._get_initial_byte(
+ self.UNSIGNED_INT_MAJOR_TYPE, additional_info
+ )
+ serialized.extend(
+ initial_byte + timestamp.to_bytes(num_bytes, "big")
+ )
+
+ def _serialize_type_float(self, serialized, value, shape, key):
+ if self._is_special_number(value):
+ serialized.extend(
+ self._get_bytes_for_special_numbers(value)
+ ) # Handle special values like NaN or Infinity
+ else:
+ initial_byte = self._get_initial_byte(
+ self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 26
+ )
+ serialized.extend(initial_byte + struct.pack(">f", value))
+
+ def _serialize_type_double(self, serialized, value, shape, key):
+ if self._is_special_number(value):
+ serialized.extend(
+ self._get_bytes_for_special_numbers(value)
+ ) # Handle special values like NaN or Infinity
+ else:
+ initial_byte = self._get_initial_byte(
+ self.FLOAT_AND_SIMPLE_MAJOR_TYPE, 27
+ )
+ serialized.extend(initial_byte + struct.pack(">d", value))
+
+ def _serialize_type_boolean(self, serialized, value, shape, key):
+ additional_info = 21 if value else 20
+ serialized.extend(
+ self._get_initial_byte(
+ self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info
+ )
+ )
+
+ def _get_additional_info_and_num_bytes(self, value):
+ # Values under 24 can be stored in the initial byte and don't need further
+ # encoding
+ if value < 24:
+ return value, 0
+ # Values between 24 and 255 (inclusive) can be stored in 1 byte and
+ # correspond to additional info 24
+ elif value < 256:
+ return 24, 1
+ # Values up to 65535 can be stored in two bytes and correspond to additional
+ # info 25
+ elif value < 65536:
+ return 25, 2
+ # Values up to 4294967296 can be stored in four bytes and correspond to
+ # additional info 26
+ elif value < 4294967296:
+ return 26, 4
+ # The maximum number of bytes in a definite length data items is 8 which
+ # to additional info 27
+ else:
+ return 27, 8
+
+ def _get_initial_byte(self, major_type, additional_info):
+ # The highest order three bits are the major type, so we need to bitshift the
+ # major type by 5
+ major_type_bytes = major_type << 5
+ return (major_type_bytes | additional_info).to_bytes(1, "big")
+
+ def _is_special_number(self, value):
+ return any(
+ [
+ value == float('inf'),
+ value == float('-inf'),
+ math.isnan(value),
+ ]
+ )
+
+ def _get_bytes_for_special_numbers(self, value):
+ additional_info = 25
+ initial_byte = self._get_initial_byte(
+ self.FLOAT_AND_SIMPLE_MAJOR_TYPE, additional_info
+ )
+ if value == float('inf'):
+ return initial_byte + struct.pack(">H", 0x7C00)
+ elif value == float('-inf'):
+ return initial_byte + struct.pack(">H", 0xFC00)
+ elif math.isnan(value):
+ return initial_byte + struct.pack(">H", 0x7E00)
+
+
+class BaseRestSerializer(Serializer):
+ """Base class for rest protocols.
+
+ The only variance between the various rest protocols is the
+ way that the body is serialized. All other aspects (headers, uri, etc.)
+ are the same and logic for serializing those aspects lives here.
+
+ Subclasses must implement the ``_serialize_body_params`` method.
+
+ """
+
+ QUERY_STRING_TIMESTAMP_FORMAT = 'iso8601'
+ HEADER_TIMESTAMP_FORMAT = 'rfc822'
+ # This is a list of known values for the "location" key in the
+ # serialization dict. The location key tells us where on the request
+ # to put the serialized value.
+ KNOWN_LOCATIONS = ['uri', 'querystring', 'header', 'headers']
+
+ def serialize_to_request(self, parameters, operation_model):
+ serialized = self._create_default_request()
+ serialized['method'] = operation_model.http.get(
+ 'method', self.DEFAULT_METHOD
+ )
+ shape = operation_model.input_shape
+ if shape is None:
+ serialized['url_path'] = operation_model.http['requestUri']
+ return serialized
+ shape_members = shape.members
+ # While the ``serialized`` key holds the final serialized request
+ # data, we need interim dicts for the various locations of the
+ # request. We need this for the uri_path_kwargs and the
+ # query_string_kwargs because they are templated, so we need
+ # to gather all the needed data for the string template,
+ # then we render the template. The body_kwargs is needed
+ # because once we've collected them all, we run them through
+ # _serialize_body_params, which for rest-json, creates JSON,
+ # and for rest-xml, will create XML. This is what the
+ # ``partitioned`` dict below is for.
+ partitioned = {
+ 'uri_path_kwargs': self.MAP_TYPE(),
+ 'query_string_kwargs': self.MAP_TYPE(),
+ 'body_kwargs': self.MAP_TYPE(),
+ 'headers': self.MAP_TYPE(),
+ }
+ for param_name, param_value in parameters.items():
+ if param_value is None:
+ # Don't serialize any parameter with a None value.
+ continue
+ self._partition_parameters(
+ partitioned, param_name, param_value, shape_members
+ )
+ serialized['url_path'] = self._render_uri_template(
+ operation_model.http['requestUri'], partitioned['uri_path_kwargs']
+ )
+
+ if 'authPath' in operation_model.http:
+ serialized['auth_path'] = self._render_uri_template(
+ operation_model.http['authPath'],
+ partitioned['uri_path_kwargs'],
+ )
+ # Note that we lean on the http implementation to handle the case
+ # where the requestUri path already has query parameters.
+ # The bundled http client, requests, already supports this.
+ serialized['query_string'] = partitioned['query_string_kwargs']
+ if partitioned['headers']:
+ serialized['headers'] = partitioned['headers']
+ self._serialize_payload(
+ partitioned, parameters, serialized, shape, shape_members
+ )
+ self._serialize_content_type(serialized, shape, shape_members)
+
+ host_prefix = self._expand_host_prefix(parameters, operation_model)
+ if host_prefix is not None:
+ serialized['host_prefix'] = host_prefix
+
+ return serialized
+
+ def _render_uri_template(self, uri_template, params):
+ # We need to handle two cases::
+ #
+ # /{Bucket}/foo
+ # /{Key+}/bar
+ # A label ending with '+' is greedy. There can only
+ # be one greedy key.
+ encoded_params = {}
+ for template_param in re.findall(r'{(.*?)}', uri_template):
+ if template_param.endswith('+'):
+ encoded_params[template_param] = percent_encode(
+ params[template_param[:-1]], safe='/~'
+ )
+ else:
+ encoded_params[template_param] = percent_encode(
+ params[template_param]
+ )
+ return uri_template.format(**encoded_params)
+
+ def _serialize_payload(
+ self, partitioned, parameters, serialized, shape, shape_members
+ ):
+ # partitioned - The user input params partitioned by location.
+ # parameters - The user input params.
+ # serialized - The final serialized request dict.
+ # shape - Describes the expected input shape
+ # shape_members - The members of the input struct shape
+ payload_member = shape.serialization.get('payload')
+ if self._has_streaming_payload(payload_member, shape_members):
+ # If it's streaming, then the body is just the
+ # value of the payload.
+ body_payload = parameters.get(payload_member, b'')
+ body_payload = self._encode_payload(body_payload)
+ serialized['body'] = body_payload
+ elif payload_member is not None:
+ # If there's a payload member, we serialized that
+ # member to they body.
+ body_params = parameters.get(payload_member)
+ if body_params is not None:
+ serialized['body'] = self._serialize_body_params(
+ body_params, shape_members[payload_member]
+ )
+ else:
+ serialized['body'] = self._serialize_empty_body()
+ elif partitioned['body_kwargs']:
+ serialized['body'] = self._serialize_body_params(
+ partitioned['body_kwargs'], shape
+ )
+ elif self._requires_empty_body(shape):
+ serialized['body'] = self._serialize_empty_body()
+
+ def _serialize_empty_body(self):
+ return b''
+
+ def _serialize_content_type(self, serialized, shape, shape_members):
+ """
+ Some protocols require varied Content-Type headers
+ depending on user input. This allows subclasses to apply
+ this conditionally.
+ """
+ pass
+
+ def _requires_empty_body(self, shape):
+ """
+ Some protocols require a specific body to represent an empty
+ payload. This allows subclasses to apply this conditionally.
+ """
+ return False
+
+ def _has_streaming_payload(self, payload, shape_members):
+ """Determine if payload is streaming (a blob or string)."""
+ return payload is not None and shape_members[payload].type_name in (
+ 'blob',
+ 'string',
+ )
+
+ def _encode_payload(self, body):
+ if isinstance(body, str):
+ return body.encode(self.DEFAULT_ENCODING)
+ return body
+
+ def _partition_parameters(
+ self, partitioned, param_name, param_value, shape_members
+ ):
+ # This takes the user provided input parameter (``param``)
+ # and figures out where they go in the request dict.
+ # Some params are HTTP headers, some are used in the URI, some
+ # are in the request body. This method deals with this.
+ member = shape_members[param_name]
+ location = member.serialization.get('location')
+ key_name = member.serialization.get('name', param_name)
+ if location == 'uri':
+ partitioned['uri_path_kwargs'][key_name] = param_value
+ elif location == 'querystring':
+ if isinstance(param_value, dict):
+ partitioned['query_string_kwargs'].update(param_value)
+ elif isinstance(param_value, bool):
+ bool_str = str(param_value).lower()
+ partitioned['query_string_kwargs'][key_name] = bool_str
+ elif member.type_name == 'timestamp':
+ timestamp_format = member.serialization.get(
+ 'timestampFormat', self.QUERY_STRING_TIMESTAMP_FORMAT
+ )
+ timestamp = self._convert_timestamp_to_str(
+ param_value, timestamp_format
+ )
+ partitioned['query_string_kwargs'][key_name] = timestamp
+ else:
+ partitioned['query_string_kwargs'][key_name] = param_value
+ elif location == 'header':
+ shape = shape_members[param_name]
+ if not param_value and shape.type_name == 'list':
+ # Empty lists should not be set on the headers
+ return
+ value = self._convert_header_value(shape, param_value)
+ partitioned['headers'][key_name] = str(value)
+ elif location == 'headers':
+ # 'headers' is a bit of an oddball. The ``key_name``
+ # is actually really a prefix for the header names:
+ header_prefix = key_name
+ # The value provided by the user is a dict so we'll be
+ # creating multiple header key/val pairs. The key
+ # name to use for each header is the header_prefix (``key_name``)
+ # plus the key provided by the user.
+ self._do_serialize_header_map(
+ header_prefix, partitioned['headers'], param_value
+ )
+ else:
+ partitioned['body_kwargs'][param_name] = param_value
+
+ def _do_serialize_header_map(self, header_prefix, headers, user_input):
+ for key, val in user_input.items():
+ full_key = header_prefix + key
+ headers[full_key] = val
+
+ def _serialize_body_params(self, params, shape):
+ raise NotImplementedError('_serialize_body_params')
+
+ def _convert_header_value(self, shape, value):
+ if shape.type_name == 'timestamp':
+ datetime_obj = parse_to_aware_datetime(value)
+ timestamp = calendar.timegm(datetime_obj.utctimetuple())
+ timestamp_format = shape.serialization.get(
+ 'timestampFormat', self.HEADER_TIMESTAMP_FORMAT
+ )
+ return self._convert_timestamp_to_str(timestamp, timestamp_format)
+ elif shape.type_name == 'list':
+ converted_value = [
+ self._convert_header_value(shape.member, v)
+ for v in value
+ if v is not None
+ ]
+ return ",".join(converted_value)
+ elif is_json_value_header(shape):
+ # Serialize with no spaces after separators to save space in
+ # the header.
+ return self._get_base64(json.dumps(value, separators=(',', ':')))
+ else:
+ return value
+
+
+class BaseRpcV2Serializer(Serializer):
+ """Base class for RPCv2 protocols.
+
+ The only variance between the various RPCv2 protocols is the
+ way that the body is serialized. All other aspects (headers, uri, etc.)
+ are the same and logic for serializing those aspects lives here.
+
+ Subclasses must implement the ``_serialize_body_params`` and
+ ``_serialize_headers`` methods.
+
+ """
+
+ def serialize_to_request(self, parameters, operation_model):
+ serialized = self._create_default_request()
+ service_name = operation_model.service_model.metadata['targetPrefix']
+ operation_name = operation_model.name
+ serialized['url_path'] = (
+ f'/service/{service_name}/operation/{operation_name}'
+ )
+
+ input_shape = operation_model.input_shape
+ if input_shape is not None:
+ self._serialize_payload(parameters, serialized, input_shape)
+
+ self._serialize_headers(serialized, operation_model)
+
+ return serialized
+
+ def _serialize_payload(self, parameters, serialized, shape):
+ body_payload = self._serialize_body_params(parameters, shape)
+ serialized['body'] = body_payload
+
+ def _serialize_headers(self, serialized, operation_model):
+ raise NotImplementedError("_serialize_headers")
+
+ def _serialize_body_params(self, parameters, shape):
+ raise NotImplementedError("_serialize_body_params")
+
+
+class RestJSONSerializer(BaseRestSerializer, JSONSerializer):
+ def _serialize_empty_body(self):
+ return b'{}'
+
+ def _requires_empty_body(self, shape):
+ """
+ Serialize an empty JSON object whenever the shape has
+ members not targeting a location.
+ """
+ for member, val in shape.members.items():
+ if 'location' not in val.serialization:
+ return True
+ return False
+
+ def _serialize_content_type(self, serialized, shape, shape_members):
+ """Set Content-Type to application/json for all structured bodies."""
+ payload = shape.serialization.get('payload')
+ if self._has_streaming_payload(payload, shape_members):
+ # Don't apply content-type to streaming bodies
+ return
+
+ has_body = serialized['body'] != b''
+ has_content_type = has_header('Content-Type', serialized['headers'])
+ if has_body and not has_content_type:
+ serialized['headers']['Content-Type'] = 'application/json'
+
+ def _serialize_body_params(self, params, shape):
+ serialized_body = self.MAP_TYPE()
+ self._serialize(serialized_body, params, shape)
+ return json.dumps(serialized_body).encode(self.DEFAULT_ENCODING)
+
+
+class RestXMLSerializer(BaseRestSerializer):
+ TIMESTAMP_FORMAT = 'iso8601'
+
+ def _serialize_body_params(self, params, shape):
+ root_name = shape.serialization['name']
+ pseudo_root = ElementTree.Element('')
+ self._serialize(shape, params, pseudo_root, root_name)
+ real_root = list(pseudo_root)[0]
+ return ElementTree.tostring(real_root, encoding=self.DEFAULT_ENCODING)
+
+ def _serialize(self, shape, params, xmlnode, name):
+ method = getattr(
+ self,
+ f'_serialize_type_{shape.type_name}',
+ self._default_serialize,
+ )
+ method(xmlnode, params, shape, name)
+
+ def _serialize_type_structure(self, xmlnode, params, shape, name):
+ structure_node = ElementTree.SubElement(xmlnode, name)
+
+ if 'xmlNamespace' in shape.serialization:
+ namespace_metadata = shape.serialization['xmlNamespace']
+ attribute_name = 'xmlns'
+ if namespace_metadata.get('prefix'):
+ attribute_name += f":{namespace_metadata['prefix']}"
+ structure_node.attrib[attribute_name] = namespace_metadata['uri']
+ for key, value in params.items():
+ member_shape = shape.members[key]
+ member_name = member_shape.serialization.get('name', key)
+ # We need to special case member shapes that are marked as an
+ # xmlAttribute. Rather than serializing into an XML child node,
+ # we instead serialize the shape to an XML attribute of the
+ # *current* node.
+ if value is None:
+ # Don't serialize any param whose value is None.
+ return
+ if member_shape.serialization.get('xmlAttribute'):
+ # xmlAttributes must have a serialization name.
+ xml_attribute_name = member_shape.serialization['name']
+ structure_node.attrib[xml_attribute_name] = value
+ continue
+ self._serialize(member_shape, value, structure_node, member_name)
+
+ def _serialize_type_list(self, xmlnode, params, shape, name):
+ member_shape = shape.member
+ if shape.serialization.get('flattened'):
+ element_name = name
+ list_node = xmlnode
+ else:
+ element_name = member_shape.serialization.get('name', 'member')
+ list_node = ElementTree.SubElement(xmlnode, name)
+ for item in params:
+ self._serialize(member_shape, item, list_node, element_name)
+
+ def _serialize_type_map(self, xmlnode, params, shape, name):
+ # Given the ``name`` of MyMap, and input of {"key1": "val1"}
+ # we serialize this as:
+ # <MyMap>
+ # <entry>
+ # <key>key1</key>
+ # <value>val1</value>
+ # </entry>
+ # </MyMap>
+ node = ElementTree.SubElement(xmlnode, name)
+ # TODO: handle flattened maps.
+ for key, value in params.items():
+ entry_node = ElementTree.SubElement(node, 'entry')
+ key_name = self._get_serialized_name(shape.key, default_name='key')
+ val_name = self._get_serialized_name(
+ shape.value, default_name='value'
+ )
+ self._serialize(shape.key, key, entry_node, key_name)
+ self._serialize(shape.value, value, entry_node, val_name)
+
+ def _serialize_type_boolean(self, xmlnode, params, shape, name):
+ # For scalar types, the 'params' attr is actually just a scalar
+ # value representing the data we need to serialize as a boolean.
+ # It will either be 'true' or 'false'
+ node = ElementTree.SubElement(xmlnode, name)
+ if params:
+ str_value = 'true'
+ else:
+ str_value = 'false'
+ node.text = str_value
+
+ def _serialize_type_blob(self, xmlnode, params, shape, name):
+ node = ElementTree.SubElement(xmlnode, name)
+ node.text = self._get_base64(params)
+
+ def _serialize_type_timestamp(self, xmlnode, params, shape, name):
+ node = ElementTree.SubElement(xmlnode, name)
+ node.text = self._convert_timestamp_to_str(
+ params, shape.serialization.get('timestampFormat')
+ )
+
+ def _default_serialize(self, xmlnode, params, shape, name):
+ node = ElementTree.SubElement(xmlnode, name)
+ node.text = str(params)
+
+
+class RpcV2CBORSerializer(BaseRpcV2Serializer, CBORSerializer):
+ TIMESTAMP_FORMAT = 'unixtimestamp'
+
+ def _serialize_body_params(self, parameters, input_shape):
+ body = bytearray()
+ self._serialize_data_item(body, parameters, input_shape)
+ return bytes(body)
+
+ def _serialize_headers(self, serialized, operation_model):
+ serialized['headers']['smithy-protocol'] = 'rpc-v2-cbor'
+
+ if operation_model.has_event_stream_output:
+ header_val = 'application/vnd.amazon.eventstream'
+ else:
+ header_val = 'application/cbor'
+
+ has_body = serialized['body'] != b''
+ has_content_type = has_header('Content-Type', serialized['headers'])
+
+ serialized['headers']['Accept'] = header_val
+ if not has_content_type and has_body:
+ serialized['headers']['Content-Type'] = header_val
+
+
+SERIALIZERS = {
+ 'ec2': EC2Serializer,
+ 'query': QuerySerializer,
+ 'json': JSONSerializer,
+ 'rest-json': RestJSONSerializer,
+ 'rest-xml': RestXMLSerializer,
+ 'smithy-rpc-v2-cbor': RpcV2CBORSerializer,
+}