aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/compress.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/compress.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/compress.py126
1 files changed, 126 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/compress.py b/.venv/lib/python3.12/site-packages/botocore/compress.py
new file mode 100644
index 00000000..1f8577e8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/compress.py
@@ -0,0 +1,126 @@
+# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""
+NOTE: All functions in this module are considered private and are
+subject to abrupt breaking changes. Please do not use them directly.
+
+"""
+
+import io
+import logging
+from gzip import GzipFile
+from gzip import compress as gzip_compress
+
+from botocore.compat import urlencode
+from botocore.utils import determine_content_length
+
+logger = logging.getLogger(__name__)
+
+
+def maybe_compress_request(config, request_dict, operation_model):
+ """Attempt to compress the request body using the modeled encodings."""
+ if _should_compress_request(config, request_dict, operation_model):
+ for encoding in operation_model.request_compression['encodings']:
+ encoder = COMPRESSION_MAPPING.get(encoding)
+ if encoder is not None:
+ logger.debug('Compressing request with %s encoding.', encoding)
+ request_dict['body'] = encoder(request_dict['body'])
+ _set_compression_header(request_dict['headers'], encoding)
+ return
+ else:
+ logger.debug('Unsupported compression encoding: %s', encoding)
+
+
+def _should_compress_request(config, request_dict, operation_model):
+ if (
+ config.disable_request_compression is not True
+ and config.signature_version != 'v2'
+ and operation_model.request_compression is not None
+ ):
+ if not _is_compressible_type(request_dict):
+ body_type = type(request_dict['body'])
+ log_msg = 'Body type %s does not support compression.'
+ logger.debug(log_msg, body_type)
+ return False
+
+ if operation_model.has_streaming_input:
+ streaming_input = operation_model.get_streaming_input()
+ streaming_metadata = streaming_input.metadata
+ return 'requiresLength' not in streaming_metadata
+
+ body_size = _get_body_size(request_dict['body'])
+ min_size = config.request_min_compression_size_bytes
+ return min_size <= body_size
+
+ return False
+
+
+def _is_compressible_type(request_dict):
+ body = request_dict['body']
+ # Coerce dict to a format compatible with compression.
+ if isinstance(body, dict):
+ body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8')
+ request_dict['body'] = body
+ is_supported_type = isinstance(body, (str, bytes, bytearray))
+ return is_supported_type or hasattr(body, 'read')
+
+
+def _get_body_size(body):
+ size = determine_content_length(body)
+ if size is None:
+ logger.debug(
+ 'Unable to get length of the request body: %s. '
+ 'Skipping compression.',
+ body,
+ )
+ size = 0
+ return size
+
+
+def _gzip_compress_body(body):
+ if isinstance(body, str):
+ return gzip_compress(body.encode('utf-8'))
+ elif isinstance(body, (bytes, bytearray)):
+ return gzip_compress(body)
+ elif hasattr(body, 'read'):
+ if hasattr(body, 'seek') and hasattr(body, 'tell'):
+ current_position = body.tell()
+ compressed_obj = _gzip_compress_fileobj(body)
+ body.seek(current_position)
+ return compressed_obj
+ return _gzip_compress_fileobj(body)
+
+
+def _gzip_compress_fileobj(body):
+ compressed_obj = io.BytesIO()
+ with GzipFile(fileobj=compressed_obj, mode='wb') as gz:
+ while True:
+ chunk = body.read(8192)
+ if not chunk:
+ break
+ if isinstance(chunk, str):
+ chunk = chunk.encode('utf-8')
+ gz.write(chunk)
+ compressed_obj.seek(0)
+ return compressed_obj
+
+
+def _set_compression_header(headers, encoding):
+ ce_header = headers.get('Content-Encoding')
+ if ce_header is None:
+ headers['Content-Encoding'] = encoding
+ else:
+ headers['Content-Encoding'] = f'{ce_header},{encoding}'
+
+
+COMPRESSION_MAPPING = {'gzip': _gzip_compress_body}