aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/eventstream.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/eventstream.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/eventstream.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/eventstream.py622
1 files changed, 622 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/eventstream.py b/.venv/lib/python3.12/site-packages/botocore/eventstream.py
new file mode 100644
index 00000000..583ceb03
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/eventstream.py
@@ -0,0 +1,622 @@
+# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""Binary Event Stream Decoding"""
+
+from binascii import crc32
+from struct import unpack
+
+from botocore.exceptions import EventStreamError
+
+# byte length of the prelude (total_length + header_length + prelude_crc)
+_PRELUDE_LENGTH = 12
+_MAX_HEADERS_LENGTH = 128 * 1024 # 128 Kb
+
+
+class ParserError(Exception):
+ """Base binary flow encoding parsing exception."""
+
+ pass
+
+
+class DuplicateHeader(ParserError):
+ """Duplicate header found in the event."""
+
+ def __init__(self, header):
+ message = f'Duplicate header present: "{header}"'
+ super().__init__(message)
+
+
+class InvalidHeadersLength(ParserError):
+ """Headers length is longer than the maximum."""
+
+ def __init__(self, length):
+ message = f'Header length of {length} exceeded the maximum of {_MAX_HEADERS_LENGTH}'
+ super().__init__(message)
+
+
+class InvalidPayloadLength(ParserError):
+ """Payload length is longer than the maximum.
+
+ DEPRECATED: This case is no longer validated client side. Payloads
+ of varying lengths are now supported by AWS services.
+ """
+
+ def __init__(self, length):
+ message = f'Payload length of {length} exceeded the maximum of 24MB.'
+ super().__init__(message)
+
+
+class ChecksumMismatch(ParserError):
+ """Calculated checksum did not match the expected checksum."""
+
+ def __init__(self, expected, calculated):
+ message = f'Checksum mismatch: expected 0x{expected:08x}, calculated 0x{calculated:08x}'
+ super().__init__(message)
+
+
+class NoInitialResponseError(ParserError):
+ """An event of type initial-response was not received.
+
+ This exception is raised when the event stream produced no events or
+ the first event in the stream was not of the initial-response type.
+ """
+
+ def __init__(self):
+ message = 'First event was not of the initial-response type'
+ super().__init__(message)
+
+
+class DecodeUtils:
+ """Unpacking utility functions used in the decoder.
+
+ All methods on this class take raw bytes and return a tuple containing
+ the value parsed from the bytes and the number of bytes consumed to parse
+ that value.
+ """
+
+ UINT8_BYTE_FORMAT = '!B'
+ UINT16_BYTE_FORMAT = '!H'
+ UINT32_BYTE_FORMAT = '!I'
+ INT8_BYTE_FORMAT = '!b'
+ INT16_BYTE_FORMAT = '!h'
+ INT32_BYTE_FORMAT = '!i'
+ INT64_BYTE_FORMAT = '!q'
+ PRELUDE_BYTE_FORMAT = '!III'
+
+ # uint byte size to unpack format
+ UINT_BYTE_FORMAT = {
+ 1: UINT8_BYTE_FORMAT,
+ 2: UINT16_BYTE_FORMAT,
+ 4: UINT32_BYTE_FORMAT,
+ }
+
+ @staticmethod
+ def unpack_true(data):
+ """This method consumes none of the provided bytes and returns True.
+
+ :type data: bytes
+ :param data: The bytes to parse from. This is ignored in this method.
+
+ :rtype: tuple
+ :rtype: (bool, int)
+ :returns: The tuple (True, 0)
+ """
+ return True, 0
+
+ @staticmethod
+ def unpack_false(data):
+ """This method consumes none of the provided bytes and returns False.
+
+ :type data: bytes
+ :param data: The bytes to parse from. This is ignored in this method.
+
+ :rtype: tuple
+ :rtype: (bool, int)
+ :returns: The tuple (False, 0)
+ """
+ return False, 0
+
+ @staticmethod
+ def unpack_uint8(data):
+ """Parse an unsigned 8-bit integer from the bytes.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :rtype: (int, int)
+ :returns: A tuple containing the (parsed integer value, bytes consumed)
+ """
+ value = unpack(DecodeUtils.UINT8_BYTE_FORMAT, data[:1])[0]
+ return value, 1
+
+ @staticmethod
+ def unpack_uint32(data):
+ """Parse an unsigned 32-bit integer from the bytes.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :rtype: (int, int)
+ :returns: A tuple containing the (parsed integer value, bytes consumed)
+ """
+ value = unpack(DecodeUtils.UINT32_BYTE_FORMAT, data[:4])[0]
+ return value, 4
+
+ @staticmethod
+ def unpack_int8(data):
+ """Parse a signed 8-bit integer from the bytes.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :rtype: (int, int)
+ :returns: A tuple containing the (parsed integer value, bytes consumed)
+ """
+ value = unpack(DecodeUtils.INT8_BYTE_FORMAT, data[:1])[0]
+ return value, 1
+
+ @staticmethod
+ def unpack_int16(data):
+ """Parse a signed 16-bit integer from the bytes.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :rtype: tuple
+ :rtype: (int, int)
+ :returns: A tuple containing the (parsed integer value, bytes consumed)
+ """
+ value = unpack(DecodeUtils.INT16_BYTE_FORMAT, data[:2])[0]
+ return value, 2
+
+ @staticmethod
+ def unpack_int32(data):
+ """Parse a signed 32-bit integer from the bytes.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :rtype: tuple
+ :rtype: (int, int)
+ :returns: A tuple containing the (parsed integer value, bytes consumed)
+ """
+ value = unpack(DecodeUtils.INT32_BYTE_FORMAT, data[:4])[0]
+ return value, 4
+
+ @staticmethod
+ def unpack_int64(data):
+ """Parse a signed 64-bit integer from the bytes.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :rtype: tuple
+ :rtype: (int, int)
+ :returns: A tuple containing the (parsed integer value, bytes consumed)
+ """
+ value = unpack(DecodeUtils.INT64_BYTE_FORMAT, data[:8])[0]
+ return value, 8
+
+ @staticmethod
+ def unpack_byte_array(data, length_byte_size=2):
+ """Parse a variable length byte array from the bytes.
+
+ The bytes are expected to be in the following format:
+ [ length ][0 ... length bytes]
+ where length is an unsigned integer represented in the smallest number
+ of bytes to hold the maximum length of the array.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :type length_byte_size: int
+ :param length_byte_size: The byte size of the preceding integer that
+ represents the length of the array. Supported values are 1, 2, and 4.
+
+ :rtype: (bytes, int)
+ :returns: A tuple containing the (parsed byte array, bytes consumed).
+ """
+ uint_byte_format = DecodeUtils.UINT_BYTE_FORMAT[length_byte_size]
+ length = unpack(uint_byte_format, data[:length_byte_size])[0]
+ bytes_end = length + length_byte_size
+ array_bytes = data[length_byte_size:bytes_end]
+ return array_bytes, bytes_end
+
+ @staticmethod
+ def unpack_utf8_string(data, length_byte_size=2):
+ """Parse a variable length utf-8 string from the bytes.
+
+ The bytes are expected to be in the following format:
+ [ length ][0 ... length bytes]
+ where length is an unsigned integer represented in the smallest number
+ of bytes to hold the maximum length of the array and the following
+ bytes are a valid utf-8 string.
+
+ :type data: bytes
+ :param bytes: The bytes to parse from.
+
+ :type length_byte_size: int
+ :param length_byte_size: The byte size of the preceding integer that
+ represents the length of the array. Supported values are 1, 2, and 4.
+
+ :rtype: (str, int)
+ :returns: A tuple containing the (utf-8 string, bytes consumed).
+ """
+ array_bytes, consumed = DecodeUtils.unpack_byte_array(
+ data, length_byte_size
+ )
+ return array_bytes.decode('utf-8'), consumed
+
+ @staticmethod
+ def unpack_uuid(data):
+ """Parse a 16-byte uuid from the bytes.
+
+ :type data: bytes
+ :param data: The bytes to parse from.
+
+ :rtype: (bytes, int)
+ :returns: A tuple containing the (uuid bytes, bytes consumed).
+ """
+ return data[:16], 16
+
+ @staticmethod
+ def unpack_prelude(data):
+ """Parse the prelude for an event stream message from the bytes.
+
+ The prelude for an event stream message has the following format:
+ [total_length][header_length][prelude_crc]
+ where each field is an unsigned 32-bit integer.
+
+ :rtype: ((int, int, int), int)
+ :returns: A tuple of ((total_length, headers_length, prelude_crc),
+ consumed)
+ """
+ return (unpack(DecodeUtils.PRELUDE_BYTE_FORMAT, data), _PRELUDE_LENGTH)
+
+
+def _validate_checksum(data, checksum, crc=0):
+ # To generate the same numeric value across all Python versions and
+ # platforms use crc32(data) & 0xffffffff.
+ computed_checksum = crc32(data, crc) & 0xFFFFFFFF
+ if checksum != computed_checksum:
+ raise ChecksumMismatch(checksum, computed_checksum)
+
+
+class MessagePrelude:
+ """Represents the prelude of an event stream message."""
+
+ def __init__(self, total_length, headers_length, crc):
+ self.total_length = total_length
+ self.headers_length = headers_length
+ self.crc = crc
+
+ @property
+ def payload_length(self):
+ """Calculates the total payload length.
+
+ The extra minus 4 bytes is for the message CRC.
+
+ :rtype: int
+ :returns: The total payload length.
+ """
+ return self.total_length - self.headers_length - _PRELUDE_LENGTH - 4
+
+ @property
+ def payload_end(self):
+ """Calculates the byte offset for the end of the message payload.
+
+ The extra minus 4 bytes is for the message CRC.
+
+ :rtype: int
+ :returns: The byte offset from the beginning of the event stream
+ message to the end of the payload.
+ """
+ return self.total_length - 4
+
+ @property
+ def headers_end(self):
+ """Calculates the byte offset for the end of the message headers.
+
+ :rtype: int
+ :returns: The byte offset from the beginning of the event stream
+ message to the end of the headers.
+ """
+ return _PRELUDE_LENGTH + self.headers_length
+
+
+class EventStreamMessage:
+ """Represents an event stream message."""
+
+ def __init__(self, prelude, headers, payload, crc):
+ self.prelude = prelude
+ self.headers = headers
+ self.payload = payload
+ self.crc = crc
+
+ def to_response_dict(self, status_code=200):
+ message_type = self.headers.get(':message-type')
+ if message_type == 'error' or message_type == 'exception':
+ status_code = 400
+ return {
+ 'status_code': status_code,
+ 'headers': self.headers,
+ 'body': self.payload,
+ }
+
+
+class EventStreamHeaderParser:
+ """Parses the event headers from an event stream message.
+
+ Expects all of the header data upfront and creates a dictionary of headers
+ to return. This object can be reused multiple times to parse the headers
+ from multiple event stream messages.
+ """
+
+ # Maps header type to appropriate unpacking function
+ # These unpacking functions return the value and the amount unpacked
+ _HEADER_TYPE_MAP = {
+ # boolean_true
+ 0: DecodeUtils.unpack_true,
+ # boolean_false
+ 1: DecodeUtils.unpack_false,
+ # byte
+ 2: DecodeUtils.unpack_int8,
+ # short
+ 3: DecodeUtils.unpack_int16,
+ # integer
+ 4: DecodeUtils.unpack_int32,
+ # long
+ 5: DecodeUtils.unpack_int64,
+ # byte_array
+ 6: DecodeUtils.unpack_byte_array,
+ # string
+ 7: DecodeUtils.unpack_utf8_string,
+ # timestamp
+ 8: DecodeUtils.unpack_int64,
+ # uuid
+ 9: DecodeUtils.unpack_uuid,
+ }
+
+ def __init__(self):
+ self._data = None
+
+ def parse(self, data):
+ """Parses the event stream headers from an event stream message.
+
+ :type data: bytes
+ :param data: The bytes that correspond to the headers section of an
+ event stream message.
+
+ :rtype: dict
+ :returns: A dictionary of header key, value pairs.
+ """
+ self._data = data
+ return self._parse_headers()
+
+ def _parse_headers(self):
+ headers = {}
+ while self._data:
+ name, value = self._parse_header()
+ if name in headers:
+ raise DuplicateHeader(name)
+ headers[name] = value
+ return headers
+
+ def _parse_header(self):
+ name = self._parse_name()
+ value = self._parse_value()
+ return name, value
+
+ def _parse_name(self):
+ name, consumed = DecodeUtils.unpack_utf8_string(self._data, 1)
+ self._advance_data(consumed)
+ return name
+
+ def _parse_type(self):
+ type, consumed = DecodeUtils.unpack_uint8(self._data)
+ self._advance_data(consumed)
+ return type
+
+ def _parse_value(self):
+ header_type = self._parse_type()
+ value_unpacker = self._HEADER_TYPE_MAP[header_type]
+ value, consumed = value_unpacker(self._data)
+ self._advance_data(consumed)
+ return value
+
+ def _advance_data(self, consumed):
+ self._data = self._data[consumed:]
+
+
+class EventStreamBuffer:
+ """Streaming based event stream buffer
+
+ A buffer class that wraps bytes from an event stream providing parsed
+ messages as they become available via an iterable interface.
+ """
+
+ def __init__(self):
+ self._data = b''
+ self._prelude = None
+ self._header_parser = EventStreamHeaderParser()
+
+ def add_data(self, data):
+ """Add data to the buffer.
+
+ :type data: bytes
+ :param data: The bytes to add to the buffer to be used when parsing
+ """
+ self._data += data
+
+ def _validate_prelude(self, prelude):
+ if prelude.headers_length > _MAX_HEADERS_LENGTH:
+ raise InvalidHeadersLength(prelude.headers_length)
+
+ def _parse_prelude(self):
+ prelude_bytes = self._data[:_PRELUDE_LENGTH]
+ raw_prelude, _ = DecodeUtils.unpack_prelude(prelude_bytes)
+ prelude = MessagePrelude(*raw_prelude)
+ # The minus 4 removes the prelude crc from the bytes to be checked
+ _validate_checksum(prelude_bytes[: _PRELUDE_LENGTH - 4], prelude.crc)
+ self._validate_prelude(prelude)
+ return prelude
+
+ def _parse_headers(self):
+ header_bytes = self._data[_PRELUDE_LENGTH : self._prelude.headers_end]
+ return self._header_parser.parse(header_bytes)
+
+ def _parse_payload(self):
+ prelude = self._prelude
+ payload_bytes = self._data[prelude.headers_end : prelude.payload_end]
+ return payload_bytes
+
+ def _parse_message_crc(self):
+ prelude = self._prelude
+ crc_bytes = self._data[prelude.payload_end : prelude.total_length]
+ message_crc, _ = DecodeUtils.unpack_uint32(crc_bytes)
+ return message_crc
+
+ def _parse_message_bytes(self):
+ # The minus 4 includes the prelude crc to the bytes to be checked
+ message_bytes = self._data[
+ _PRELUDE_LENGTH - 4 : self._prelude.payload_end
+ ]
+ return message_bytes
+
+ def _validate_message_crc(self):
+ message_crc = self._parse_message_crc()
+ message_bytes = self._parse_message_bytes()
+ _validate_checksum(message_bytes, message_crc, crc=self._prelude.crc)
+ return message_crc
+
+ def _parse_message(self):
+ crc = self._validate_message_crc()
+ headers = self._parse_headers()
+ payload = self._parse_payload()
+ message = EventStreamMessage(self._prelude, headers, payload, crc)
+ self._prepare_for_next_message()
+ return message
+
+ def _prepare_for_next_message(self):
+ # Advance the data and reset the current prelude
+ self._data = self._data[self._prelude.total_length :]
+ self._prelude = None
+
+ def next(self):
+ """Provides the next available message parsed from the stream
+
+ :rtype: EventStreamMessage
+ :returns: The next event stream message
+ """
+ if len(self._data) < _PRELUDE_LENGTH:
+ raise StopIteration()
+
+ if self._prelude is None:
+ self._prelude = self._parse_prelude()
+
+ if len(self._data) < self._prelude.total_length:
+ raise StopIteration()
+
+ return self._parse_message()
+
+ def __next__(self):
+ return self.next()
+
+ def __iter__(self):
+ return self
+
+
+class EventStream:
+ """Wrapper class for an event stream body.
+
+ This wraps the underlying streaming body, parsing it for individual events
+ and yielding them as they come available through the iterator interface.
+
+ The following example uses the S3 select API to get structured data out of
+ an object stored in S3 using an event stream.
+
+ **Example:**
+ ::
+ from botocore.session import Session
+
+ s3 = Session().create_client('s3')
+ response = s3.select_object_content(
+ Bucket='bucketname',
+ Key='keyname',
+ ExpressionType='SQL',
+ RequestProgress={'Enabled': True},
+ Expression="SELECT * FROM S3Object s",
+ InputSerialization={'CSV': {}},
+ OutputSerialization={'CSV': {}},
+ )
+ # This is the event stream in the response
+ event_stream = response['Payload']
+ end_event_received = False
+ with open('output', 'wb') as f:
+ # Iterate over events in the event stream as they come
+ for event in event_stream:
+ # If we received a records event, write the data to a file
+ if 'Records' in event:
+ data = event['Records']['Payload']
+ f.write(data)
+ # If we received a progress event, print the details
+ elif 'Progress' in event:
+ print(event['Progress']['Details'])
+ # End event indicates that the request finished successfully
+ elif 'End' in event:
+ print('Result is complete')
+ end_event_received = True
+ if not end_event_received:
+ raise Exception("End event not received, request incomplete.")
+ """
+
+ def __init__(self, raw_stream, output_shape, parser, operation_name):
+ self._raw_stream = raw_stream
+ self._output_shape = output_shape
+ self._operation_name = operation_name
+ self._parser = parser
+ self._event_generator = self._create_raw_event_generator()
+
+ def __iter__(self):
+ for event in self._event_generator:
+ parsed_event = self._parse_event(event)
+ if parsed_event:
+ yield parsed_event
+
+ def _create_raw_event_generator(self):
+ event_stream_buffer = EventStreamBuffer()
+ for chunk in self._raw_stream.stream():
+ event_stream_buffer.add_data(chunk)
+ yield from event_stream_buffer
+
+ def _parse_event(self, event):
+ response_dict = event.to_response_dict()
+ parsed_response = self._parser.parse(response_dict, self._output_shape)
+ if response_dict['status_code'] == 200:
+ return parsed_response
+ else:
+ raise EventStreamError(parsed_response, self._operation_name)
+
+ def get_initial_response(self):
+ try:
+ initial_event = next(self._event_generator)
+ event_type = initial_event.headers.get(':event-type')
+ if event_type == 'initial-response':
+ return initial_event
+ except StopIteration:
+ pass
+ raise NoInitialResponseError()
+
+ def close(self):
+ """Closes the underlying streaming body."""
+ self._raw_stream.close()