aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/s3transfer
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/s3transfer')
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/__init__.py887
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py437
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/compat.py94
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/constants.py38
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/copies.py388
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/crt.py991
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/delete.py71
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/download.py788
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/exceptions.py37
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/futures.py613
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/manager.py754
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/processpool.py1009
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/subscribers.py94
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/tasks.py390
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/upload.py840
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/utils.py833
16 files changed, 8264 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/__init__.py b/.venv/lib/python3.12/site-packages/s3transfer/__init__.py
new file mode 100644
index 00000000..b8abbf47
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/__init__.py
@@ -0,0 +1,887 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""Abstractions over S3's upload/download operations.
+
+This module provides high level abstractions for efficient
+uploads/downloads. It handles several things for the user:
+
+* Automatically switching to multipart transfers when
+ a file is over a specific size threshold
+* Uploading/downloading a file in parallel
+* Throttling based on max bandwidth
+* Progress callbacks to monitor transfers
+* Retries. While botocore handles retries for streaming uploads,
+ it is not possible for it to handle retries for streaming
+ downloads. This module handles retries for both cases so
+ you don't need to implement any retry logic yourself.
+
+This module has a reasonable set of defaults. It also allows you
+to configure many aspects of the transfer process including:
+
+* Multipart threshold size
+* Max parallel downloads
+* Max bandwidth
+* Socket timeouts
+* Retry amounts
+
+There is no support for s3->s3 multipart copies at this
+time.
+
+
+.. _ref_s3transfer_usage:
+
+Usage
+=====
+
+The simplest way to use this module is:
+
+.. code-block:: python
+
+ client = boto3.client('s3', 'us-west-2')
+ transfer = S3Transfer(client)
+ # Upload /tmp/myfile to s3://bucket/key
+ transfer.upload_file('/tmp/myfile', 'bucket', 'key')
+
+ # Download s3://bucket/key to /tmp/myfile
+ transfer.download_file('bucket', 'key', '/tmp/myfile')
+
+The ``upload_file`` and ``download_file`` methods also accept
+``**kwargs``, which will be forwarded through to the corresponding
+client operation. Here are a few examples using ``upload_file``::
+
+ # Making the object public
+ transfer.upload_file('/tmp/myfile', 'bucket', 'key',
+ extra_args={'ACL': 'public-read'})
+
+ # Setting metadata
+ transfer.upload_file('/tmp/myfile', 'bucket', 'key',
+ extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
+
+ # Setting content type
+ transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
+ extra_args={'ContentType': "application/json"})
+
+
+The ``S3Transfer`` class also supports progress callbacks so you can
+provide transfer progress to users. Both the ``upload_file`` and
+``download_file`` methods take an optional ``callback`` parameter.
+Here's an example of how to print a simple progress percentage
+to the user:
+
+.. code-block:: python
+
+ class ProgressPercentage(object):
+ def __init__(self, filename):
+ self._filename = filename
+ self._size = float(os.path.getsize(filename))
+ self._seen_so_far = 0
+ self._lock = threading.Lock()
+
+ def __call__(self, bytes_amount):
+ # To simplify we'll assume this is hooked up
+ # to a single filename.
+ with self._lock:
+ self._seen_so_far += bytes_amount
+ percentage = (self._seen_so_far / self._size) * 100
+ sys.stdout.write(
+ "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far,
+ self._size, percentage))
+ sys.stdout.flush()
+
+
+ transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
+ # Upload /tmp/myfile to s3://bucket/key and print upload progress.
+ transfer.upload_file('/tmp/myfile', 'bucket', 'key',
+ callback=ProgressPercentage('/tmp/myfile'))
+
+
+
+You can also provide a TransferConfig object to the S3Transfer
+object that gives you more fine grained control over the
+transfer. For example:
+
+.. code-block:: python
+
+ client = boto3.client('s3', 'us-west-2')
+ config = TransferConfig(
+ multipart_threshold=8 * 1024 * 1024,
+ max_concurrency=10,
+ num_download_attempts=10,
+ )
+ transfer = S3Transfer(client, config)
+ transfer.upload_file('/tmp/foo', 'bucket', 'key')
+
+
+"""
+
+import concurrent.futures
+import functools
+import logging
+import math
+import os
+import queue
+import random
+import socket
+import string
+import threading
+
+from botocore.compat import six # noqa: F401
+from botocore.exceptions import IncompleteReadError, ResponseStreamingError
+from botocore.vendored.requests.packages.urllib3.exceptions import (
+ ReadTimeoutError,
+)
+
+import s3transfer.compat
+from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
+
+__author__ = 'Amazon Web Services'
+__version__ = '0.11.4'
+
+
+class NullHandler(logging.Handler):
+ def emit(self, record):
+ pass
+
+
+logger = logging.getLogger(__name__)
+logger.addHandler(NullHandler())
+
+MB = 1024 * 1024
+SHUTDOWN_SENTINEL = object()
+
+
+def random_file_extension(num_digits=8):
+ return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
+
+
+def disable_upload_callbacks(request, operation_name, **kwargs):
+ if operation_name in ['PutObject', 'UploadPart'] and hasattr(
+ request.body, 'disable_callback'
+ ):
+ request.body.disable_callback()
+
+
+def enable_upload_callbacks(request, operation_name, **kwargs):
+ if operation_name in ['PutObject', 'UploadPart'] and hasattr(
+ request.body, 'enable_callback'
+ ):
+ request.body.enable_callback()
+
+
+class QueueShutdownError(Exception):
+ pass
+
+
+class ReadFileChunk:
+ def __init__(
+ self,
+ fileobj,
+ start_byte,
+ chunk_size,
+ full_file_size,
+ callback=None,
+ enable_callback=True,
+ ):
+ """
+
+ Given a file object shown below:
+
+ |___________________________________________________|
+ 0 | | full_file_size
+ |----chunk_size---|
+ start_byte
+
+ :type fileobj: file
+ :param fileobj: File like object
+
+ :type start_byte: int
+ :param start_byte: The first byte from which to start reading.
+
+ :type chunk_size: int
+ :param chunk_size: The max chunk size to read. Trying to read
+ pass the end of the chunk size will behave like you've
+ reached the end of the file.
+
+ :type full_file_size: int
+ :param full_file_size: The entire content length associated
+ with ``fileobj``.
+
+ :type callback: function(amount_read)
+ :param callback: Called whenever data is read from this object.
+
+ """
+ self._fileobj = fileobj
+ self._start_byte = start_byte
+ self._size = self._calculate_file_size(
+ self._fileobj,
+ requested_size=chunk_size,
+ start_byte=start_byte,
+ actual_file_size=full_file_size,
+ )
+ self._fileobj.seek(self._start_byte)
+ self._amount_read = 0
+ self._callback = callback
+ self._callback_enabled = enable_callback
+
+ @classmethod
+ def from_filename(
+ cls,
+ filename,
+ start_byte,
+ chunk_size,
+ callback=None,
+ enable_callback=True,
+ ):
+ """Convenience factory function to create from a filename.
+
+ :type start_byte: int
+ :param start_byte: The first byte from which to start reading.
+
+ :type chunk_size: int
+ :param chunk_size: The max chunk size to read. Trying to read
+ pass the end of the chunk size will behave like you've
+ reached the end of the file.
+
+ :type full_file_size: int
+ :param full_file_size: The entire content length associated
+ with ``fileobj``.
+
+ :type callback: function(amount_read)
+ :param callback: Called whenever data is read from this object.
+
+ :type enable_callback: bool
+ :param enable_callback: Indicate whether to invoke callback
+ during read() calls.
+
+ :rtype: ``ReadFileChunk``
+ :return: A new instance of ``ReadFileChunk``
+
+ """
+ f = open(filename, 'rb')
+ file_size = os.fstat(f.fileno()).st_size
+ return cls(
+ f, start_byte, chunk_size, file_size, callback, enable_callback
+ )
+
+ def _calculate_file_size(
+ self, fileobj, requested_size, start_byte, actual_file_size
+ ):
+ max_chunk_size = actual_file_size - start_byte
+ return min(max_chunk_size, requested_size)
+
+ def read(self, amount=None):
+ if amount is None:
+ amount_to_read = self._size - self._amount_read
+ else:
+ amount_to_read = min(self._size - self._amount_read, amount)
+ data = self._fileobj.read(amount_to_read)
+ self._amount_read += len(data)
+ if self._callback is not None and self._callback_enabled:
+ self._callback(len(data))
+ return data
+
+ def enable_callback(self):
+ self._callback_enabled = True
+
+ def disable_callback(self):
+ self._callback_enabled = False
+
+ def seek(self, where):
+ self._fileobj.seek(self._start_byte + where)
+ if self._callback is not None and self._callback_enabled:
+ # To also rewind the callback() for an accurate progress report
+ self._callback(where - self._amount_read)
+ self._amount_read = where
+
+ def close(self):
+ self._fileobj.close()
+
+ def tell(self):
+ return self._amount_read
+
+ def __len__(self):
+ # __len__ is defined because requests will try to determine the length
+ # of the stream to set a content length. In the normal case
+ # of the file it will just stat the file, but we need to change that
+ # behavior. By providing a __len__, requests will use that instead
+ # of stat'ing the file.
+ return self._size
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+ def __iter__(self):
+ # This is a workaround for http://bugs.python.org/issue17575
+ # Basically httplib will try to iterate over the contents, even
+ # if its a file like object. This wasn't noticed because we've
+ # already exhausted the stream so iterating over the file immediately
+ # stops, which is what we're simulating here.
+ return iter([])
+
+
+class StreamReaderProgress:
+ """Wrapper for a read only stream that adds progress callbacks."""
+
+ def __init__(self, stream, callback=None):
+ self._stream = stream
+ self._callback = callback
+
+ def read(self, *args, **kwargs):
+ value = self._stream.read(*args, **kwargs)
+ if self._callback is not None:
+ self._callback(len(value))
+ return value
+
+
+class OSUtils:
+ def get_file_size(self, filename):
+ return os.path.getsize(filename)
+
+ def open_file_chunk_reader(self, filename, start_byte, size, callback):
+ return ReadFileChunk.from_filename(
+ filename, start_byte, size, callback, enable_callback=False
+ )
+
+ def open(self, filename, mode):
+ return open(filename, mode)
+
+ def remove_file(self, filename):
+ """Remove a file, noop if file does not exist."""
+ # Unlike os.remove, if the file does not exist,
+ # then this method does nothing.
+ try:
+ os.remove(filename)
+ except OSError:
+ pass
+
+ def rename_file(self, current_filename, new_filename):
+ s3transfer.compat.rename_file(current_filename, new_filename)
+
+
+class MultipartUploader:
+ # These are the extra_args that need to be forwarded onto
+ # subsequent upload_parts.
+ UPLOAD_PART_ARGS = [
+ 'SSECustomerKey',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKeyMD5',
+ 'RequestPayer',
+ ]
+
+ def __init__(
+ self,
+ client,
+ config,
+ osutil,
+ executor_cls=concurrent.futures.ThreadPoolExecutor,
+ ):
+ self._client = client
+ self._config = config
+ self._os = osutil
+ self._executor_cls = executor_cls
+
+ def _extra_upload_part_args(self, extra_args):
+ # Only the args in UPLOAD_PART_ARGS actually need to be passed
+ # onto the upload_part calls.
+ upload_parts_args = {}
+ for key, value in extra_args.items():
+ if key in self.UPLOAD_PART_ARGS:
+ upload_parts_args[key] = value
+ return upload_parts_args
+
+ def upload_file(self, filename, bucket, key, callback, extra_args):
+ response = self._client.create_multipart_upload(
+ Bucket=bucket, Key=key, **extra_args
+ )
+ upload_id = response['UploadId']
+ try:
+ parts = self._upload_parts(
+ upload_id, filename, bucket, key, callback, extra_args
+ )
+ except Exception as e:
+ logger.debug(
+ "Exception raised while uploading parts, "
+ "aborting multipart upload.",
+ exc_info=True,
+ )
+ self._client.abort_multipart_upload(
+ Bucket=bucket, Key=key, UploadId=upload_id
+ )
+ raise S3UploadFailedError(
+ "Failed to upload {} to {}: {}".format(
+ filename, '/'.join([bucket, key]), e
+ )
+ )
+ self._client.complete_multipart_upload(
+ Bucket=bucket,
+ Key=key,
+ UploadId=upload_id,
+ MultipartUpload={'Parts': parts},
+ )
+
+ def _upload_parts(
+ self, upload_id, filename, bucket, key, callback, extra_args
+ ):
+ upload_parts_extra_args = self._extra_upload_part_args(extra_args)
+ parts = []
+ part_size = self._config.multipart_chunksize
+ num_parts = int(
+ math.ceil(self._os.get_file_size(filename) / float(part_size))
+ )
+ max_workers = self._config.max_concurrency
+ with self._executor_cls(max_workers=max_workers) as executor:
+ upload_partial = functools.partial(
+ self._upload_one_part,
+ filename,
+ bucket,
+ key,
+ upload_id,
+ part_size,
+ upload_parts_extra_args,
+ callback,
+ )
+ for part in executor.map(upload_partial, range(1, num_parts + 1)):
+ parts.append(part)
+ return parts
+
+ def _upload_one_part(
+ self,
+ filename,
+ bucket,
+ key,
+ upload_id,
+ part_size,
+ extra_args,
+ callback,
+ part_number,
+ ):
+ open_chunk_reader = self._os.open_file_chunk_reader
+ with open_chunk_reader(
+ filename, part_size * (part_number - 1), part_size, callback
+ ) as body:
+ response = self._client.upload_part(
+ Bucket=bucket,
+ Key=key,
+ UploadId=upload_id,
+ PartNumber=part_number,
+ Body=body,
+ **extra_args,
+ )
+ etag = response['ETag']
+ return {'ETag': etag, 'PartNumber': part_number}
+
+
+class ShutdownQueue(queue.Queue):
+ """A queue implementation that can be shutdown.
+
+ Shutting down a queue means that this class adds a
+ trigger_shutdown method that will trigger all subsequent
+ calls to put() to fail with a ``QueueShutdownError``.
+
+ It purposefully deviates from queue.Queue, and is *not* meant
+ to be a drop in replacement for ``queue.Queue``.
+
+ """
+
+ def _init(self, maxsize):
+ self._shutdown = False
+ self._shutdown_lock = threading.Lock()
+ # queue.Queue is an old style class so we don't use super().
+ return queue.Queue._init(self, maxsize)
+
+ def trigger_shutdown(self):
+ with self._shutdown_lock:
+ self._shutdown = True
+ logger.debug("The IO queue is now shutdown.")
+
+ def put(self, item):
+ # Note: this is not sufficient, it's still possible to deadlock!
+ # Need to hook into the condition vars used by this class.
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise QueueShutdownError(
+ "Cannot put item to queue when " "queue has been shutdown."
+ )
+ return queue.Queue.put(self, item)
+
+
+class MultipartDownloader:
+ def __init__(
+ self,
+ client,
+ config,
+ osutil,
+ executor_cls=concurrent.futures.ThreadPoolExecutor,
+ ):
+ self._client = client
+ self._config = config
+ self._os = osutil
+ self._executor_cls = executor_cls
+ self._ioqueue = ShutdownQueue(self._config.max_io_queue)
+
+ def download_file(
+ self, bucket, key, filename, object_size, extra_args, callback=None
+ ):
+ with self._executor_cls(max_workers=2) as controller:
+ # 1 thread for the future that manages the uploading of files
+ # 1 thread for the future that manages IO writes.
+ download_parts_handler = functools.partial(
+ self._download_file_as_future,
+ bucket,
+ key,
+ filename,
+ object_size,
+ callback,
+ )
+ parts_future = controller.submit(download_parts_handler)
+
+ io_writes_handler = functools.partial(
+ self._perform_io_writes, filename
+ )
+ io_future = controller.submit(io_writes_handler)
+ results = concurrent.futures.wait(
+ [parts_future, io_future],
+ return_when=concurrent.futures.FIRST_EXCEPTION,
+ )
+ self._process_future_results(results)
+
+ def _process_future_results(self, futures):
+ finished, unfinished = futures
+ for future in finished:
+ future.result()
+
+ def _download_file_as_future(
+ self, bucket, key, filename, object_size, callback
+ ):
+ part_size = self._config.multipart_chunksize
+ num_parts = int(math.ceil(object_size / float(part_size)))
+ max_workers = self._config.max_concurrency
+ download_partial = functools.partial(
+ self._download_range,
+ bucket,
+ key,
+ filename,
+ part_size,
+ num_parts,
+ callback,
+ )
+ try:
+ with self._executor_cls(max_workers=max_workers) as executor:
+ list(executor.map(download_partial, range(num_parts)))
+ finally:
+ self._ioqueue.put(SHUTDOWN_SENTINEL)
+
+ def _calculate_range_param(self, part_size, part_index, num_parts):
+ start_range = part_index * part_size
+ if part_index == num_parts - 1:
+ end_range = ''
+ else:
+ end_range = start_range + part_size - 1
+ range_param = f'bytes={start_range}-{end_range}'
+ return range_param
+
+ def _download_range(
+ self, bucket, key, filename, part_size, num_parts, callback, part_index
+ ):
+ try:
+ range_param = self._calculate_range_param(
+ part_size, part_index, num_parts
+ )
+
+ max_attempts = self._config.num_download_attempts
+ last_exception = None
+ for i in range(max_attempts):
+ try:
+ logger.debug("Making get_object call.")
+ response = self._client.get_object(
+ Bucket=bucket, Key=key, Range=range_param
+ )
+ streaming_body = StreamReaderProgress(
+ response['Body'], callback
+ )
+ buffer_size = 1024 * 16
+ current_index = part_size * part_index
+ for chunk in iter(
+ lambda: streaming_body.read(buffer_size), b''
+ ):
+ self._ioqueue.put((current_index, chunk))
+ current_index += len(chunk)
+ return
+ except (
+ socket.timeout,
+ OSError,
+ ReadTimeoutError,
+ IncompleteReadError,
+ ResponseStreamingError,
+ ) as e:
+ logger.debug(
+ "Retrying exception caught (%s), "
+ "retrying request, (attempt %s / %s)",
+ e,
+ i,
+ max_attempts,
+ exc_info=True,
+ )
+ last_exception = e
+ continue
+ raise RetriesExceededError(last_exception)
+ finally:
+ logger.debug("EXITING _download_range for part: %s", part_index)
+
+ def _perform_io_writes(self, filename):
+ with self._os.open(filename, 'wb') as f:
+ while True:
+ task = self._ioqueue.get()
+ if task is SHUTDOWN_SENTINEL:
+ logger.debug(
+ "Shutdown sentinel received in IO handler, "
+ "shutting down IO handler."
+ )
+ return
+ else:
+ try:
+ offset, data = task
+ f.seek(offset)
+ f.write(data)
+ except Exception as e:
+ logger.debug(
+ "Caught exception in IO thread: %s",
+ e,
+ exc_info=True,
+ )
+ self._ioqueue.trigger_shutdown()
+ raise
+
+
+class TransferConfig:
+ def __init__(
+ self,
+ multipart_threshold=8 * MB,
+ max_concurrency=10,
+ multipart_chunksize=8 * MB,
+ num_download_attempts=5,
+ max_io_queue=100,
+ ):
+ self.multipart_threshold = multipart_threshold
+ self.max_concurrency = max_concurrency
+ self.multipart_chunksize = multipart_chunksize
+ self.num_download_attempts = num_download_attempts
+ self.max_io_queue = max_io_queue
+
+
+class S3Transfer:
+ ALLOWED_DOWNLOAD_ARGS = [
+ 'VersionId',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKey',
+ 'SSECustomerKeyMD5',
+ 'RequestPayer',
+ ]
+
+ ALLOWED_UPLOAD_ARGS = [
+ 'ACL',
+ 'CacheControl',
+ 'ContentDisposition',
+ 'ContentEncoding',
+ 'ContentLanguage',
+ 'ContentType',
+ 'Expires',
+ 'GrantFullControl',
+ 'GrantRead',
+ 'GrantReadACP',
+ 'GrantWriteACL',
+ 'Metadata',
+ 'RequestPayer',
+ 'ServerSideEncryption',
+ 'StorageClass',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKey',
+ 'SSECustomerKeyMD5',
+ 'SSEKMSKeyId',
+ 'SSEKMSEncryptionContext',
+ 'Tagging',
+ ]
+
+ def __init__(self, client, config=None, osutil=None):
+ self._client = client
+ self._client.meta.events.register(
+ 'before-call.s3.*', self._update_checksum_context
+ )
+ if config is None:
+ config = TransferConfig()
+ self._config = config
+ if osutil is None:
+ osutil = OSUtils()
+ self._osutil = osutil
+
+ def _update_checksum_context(self, params, **kwargs):
+ request_context = params.get("context", {})
+ checksum_context = request_context.get("checksum", {})
+ if "request_algorithm" in checksum_context:
+ # Force request checksum algorithm in the header if specified.
+ checksum_context["request_algorithm"]["in"] = "header"
+
+ def upload_file(
+ self, filename, bucket, key, callback=None, extra_args=None
+ ):
+ """Upload a file to an S3 object.
+
+ Variants have also been injected into S3 client, Bucket and Object.
+ You don't have to use S3Transfer.upload_file() directly.
+ """
+ if extra_args is None:
+ extra_args = {}
+ self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
+ events = self._client.meta.events
+ events.register_first(
+ 'request-created.s3',
+ disable_upload_callbacks,
+ unique_id='s3upload-callback-disable',
+ )
+ events.register_last(
+ 'request-created.s3',
+ enable_upload_callbacks,
+ unique_id='s3upload-callback-enable',
+ )
+ if (
+ self._osutil.get_file_size(filename)
+ >= self._config.multipart_threshold
+ ):
+ self._multipart_upload(filename, bucket, key, callback, extra_args)
+ else:
+ self._put_object(filename, bucket, key, callback, extra_args)
+
+ def _put_object(self, filename, bucket, key, callback, extra_args):
+ # We're using open_file_chunk_reader so we can take advantage of the
+ # progress callback functionality.
+ open_chunk_reader = self._osutil.open_file_chunk_reader
+ with open_chunk_reader(
+ filename,
+ 0,
+ self._osutil.get_file_size(filename),
+ callback=callback,
+ ) as body:
+ self._client.put_object(
+ Bucket=bucket, Key=key, Body=body, **extra_args
+ )
+
+ def download_file(
+ self, bucket, key, filename, extra_args=None, callback=None
+ ):
+ """Download an S3 object to a file.
+
+ Variants have also been injected into S3 client, Bucket and Object.
+ You don't have to use S3Transfer.download_file() directly.
+ """
+ # This method will issue a ``head_object`` request to determine
+ # the size of the S3 object. This is used to determine if the
+ # object is downloaded in parallel.
+ if extra_args is None:
+ extra_args = {}
+ self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
+ object_size = self._object_size(bucket, key, extra_args)
+ temp_filename = filename + os.extsep + random_file_extension()
+ try:
+ self._download_file(
+ bucket, key, temp_filename, object_size, extra_args, callback
+ )
+ except Exception:
+ logger.debug(
+ "Exception caught in download_file, removing partial "
+ "file: %s",
+ temp_filename,
+ exc_info=True,
+ )
+ self._osutil.remove_file(temp_filename)
+ raise
+ else:
+ self._osutil.rename_file(temp_filename, filename)
+
+ def _download_file(
+ self, bucket, key, filename, object_size, extra_args, callback
+ ):
+ if object_size >= self._config.multipart_threshold:
+ self._ranged_download(
+ bucket, key, filename, object_size, extra_args, callback
+ )
+ else:
+ self._get_object(bucket, key, filename, extra_args, callback)
+
+ def _validate_all_known_args(self, actual, allowed):
+ for kwarg in actual:
+ if kwarg not in allowed:
+ raise ValueError(
+ f"Invalid extra_args key '{kwarg}', "
+ f"must be one of: {', '.join(allowed)}"
+ )
+
+ def _ranged_download(
+ self, bucket, key, filename, object_size, extra_args, callback
+ ):
+ downloader = MultipartDownloader(
+ self._client, self._config, self._osutil
+ )
+ downloader.download_file(
+ bucket, key, filename, object_size, extra_args, callback
+ )
+
+ def _get_object(self, bucket, key, filename, extra_args, callback):
+ # precondition: num_download_attempts > 0
+ max_attempts = self._config.num_download_attempts
+ last_exception = None
+ for i in range(max_attempts):
+ try:
+ return self._do_get_object(
+ bucket, key, filename, extra_args, callback
+ )
+ except (
+ socket.timeout,
+ OSError,
+ ReadTimeoutError,
+ IncompleteReadError,
+ ResponseStreamingError,
+ ) as e:
+ # TODO: we need a way to reset the callback if the
+ # download failed.
+ logger.debug(
+ "Retrying exception caught (%s), "
+ "retrying request, (attempt %s / %s)",
+ e,
+ i,
+ max_attempts,
+ exc_info=True,
+ )
+ last_exception = e
+ continue
+ raise RetriesExceededError(last_exception)
+
+ def _do_get_object(self, bucket, key, filename, extra_args, callback):
+ response = self._client.get_object(
+ Bucket=bucket, Key=key, **extra_args
+ )
+ streaming_body = StreamReaderProgress(response['Body'], callback)
+ with self._osutil.open(filename, 'wb') as f:
+ for chunk in iter(lambda: streaming_body.read(8192), b''):
+ f.write(chunk)
+
+ def _object_size(self, bucket, key, extra_args):
+ return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
+ 'ContentLength'
+ ]
+
+ def _multipart_upload(self, filename, bucket, key, callback, extra_args):
+ uploader = MultipartUploader(self._client, self._config, self._osutil)
+ uploader.upload_file(filename, bucket, key, callback, extra_args)
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py b/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py
new file mode 100644
index 00000000..9301c8e3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py
@@ -0,0 +1,437 @@
+# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import threading
+import time
+
+
+class RequestExceededException(Exception):
+ def __init__(self, requested_amt, retry_time):
+ """Error when requested amount exceeds what is allowed
+
+ The request that raised this error should be retried after waiting
+ the time specified by ``retry_time``.
+
+ :type requested_amt: int
+ :param requested_amt: The originally requested byte amount
+
+ :type retry_time: float
+ :param retry_time: The length in time to wait to retry for the
+ requested amount
+ """
+ self.requested_amt = requested_amt
+ self.retry_time = retry_time
+ msg = f'Request amount {requested_amt} exceeded the amount available. Retry in {retry_time}'
+ super().__init__(msg)
+
+
+class RequestToken:
+ """A token to pass as an identifier when consuming from the LeakyBucket"""
+
+ pass
+
+
+class TimeUtils:
+ def time(self):
+ """Get the current time back
+
+ :rtype: float
+ :returns: The current time in seconds
+ """
+ return time.time()
+
+ def sleep(self, value):
+ """Sleep for a designated time
+
+ :type value: float
+ :param value: The time to sleep for in seconds
+ """
+ return time.sleep(value)
+
+
+class BandwidthLimiter:
+ def __init__(self, leaky_bucket, time_utils=None):
+ """Limits bandwidth for shared S3 transfers
+
+ :type leaky_bucket: LeakyBucket
+ :param leaky_bucket: The leaky bucket to use limit bandwidth
+
+ :type time_utils: TimeUtils
+ :param time_utils: Time utility to use for interacting with time.
+ """
+ self._leaky_bucket = leaky_bucket
+ self._time_utils = time_utils
+ if time_utils is None:
+ self._time_utils = TimeUtils()
+
+ def get_bandwith_limited_stream(
+ self, fileobj, transfer_coordinator, enabled=True
+ ):
+ """Wraps a fileobj in a bandwidth limited stream wrapper
+
+ :type fileobj: file-like obj
+ :param fileobj: The file-like obj to wrap
+
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ param transfer_coordinator: The coordinator for the general transfer
+ that the wrapped stream is a part of
+
+ :type enabled: boolean
+ :param enabled: Whether bandwidth limiting should be enabled to start
+ """
+ stream = BandwidthLimitedStream(
+ fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils
+ )
+ if not enabled:
+ stream.disable_bandwidth_limiting()
+ return stream
+
+
+class BandwidthLimitedStream:
+ def __init__(
+ self,
+ fileobj,
+ leaky_bucket,
+ transfer_coordinator,
+ time_utils=None,
+ bytes_threshold=256 * 1024,
+ ):
+ """Limits bandwidth for reads on a wrapped stream
+
+ :type fileobj: file-like object
+ :param fileobj: The file like object to wrap
+
+ :type leaky_bucket: LeakyBucket
+ :param leaky_bucket: The leaky bucket to use to throttle reads on
+ the stream
+
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ param transfer_coordinator: The coordinator for the general transfer
+ that the wrapped stream is a part of
+
+ :type time_utils: TimeUtils
+ :param time_utils: The time utility to use for interacting with time
+ """
+ self._fileobj = fileobj
+ self._leaky_bucket = leaky_bucket
+ self._transfer_coordinator = transfer_coordinator
+ self._time_utils = time_utils
+ if time_utils is None:
+ self._time_utils = TimeUtils()
+ self._bandwidth_limiting_enabled = True
+ self._request_token = RequestToken()
+ self._bytes_seen = 0
+ self._bytes_threshold = bytes_threshold
+
+ def enable_bandwidth_limiting(self):
+ """Enable bandwidth limiting on reads to the stream"""
+ self._bandwidth_limiting_enabled = True
+
+ def disable_bandwidth_limiting(self):
+ """Disable bandwidth limiting on reads to the stream"""
+ self._bandwidth_limiting_enabled = False
+
+ def read(self, amount):
+ """Read a specified amount
+
+ Reads will only be throttled if bandwidth limiting is enabled.
+ """
+ if not self._bandwidth_limiting_enabled:
+ return self._fileobj.read(amount)
+
+ # We do not want to be calling consume on every read as the read
+ # amounts can be small causing the lock of the leaky bucket to
+ # introduce noticeable overhead. So instead we keep track of
+ # how many bytes we have seen and only call consume once we pass a
+ # certain threshold.
+ self._bytes_seen += amount
+ if self._bytes_seen < self._bytes_threshold:
+ return self._fileobj.read(amount)
+
+ self._consume_through_leaky_bucket()
+ return self._fileobj.read(amount)
+
+ def _consume_through_leaky_bucket(self):
+ # NOTE: If the read amount on the stream are high, it will result
+ # in large bursty behavior as there is not an interface for partial
+ # reads. However given the read's on this abstraction are at most 256KB
+ # (via downloads), it reduces the burstiness to be small KB bursts at
+ # worst.
+ while not self._transfer_coordinator.exception:
+ try:
+ self._leaky_bucket.consume(
+ self._bytes_seen, self._request_token
+ )
+ self._bytes_seen = 0
+ return
+ except RequestExceededException as e:
+ self._time_utils.sleep(e.retry_time)
+ else:
+ raise self._transfer_coordinator.exception
+
+ def signal_transferring(self):
+ """Signal that data being read is being transferred to S3"""
+ self.enable_bandwidth_limiting()
+
+ def signal_not_transferring(self):
+ """Signal that data being read is not being transferred to S3"""
+ self.disable_bandwidth_limiting()
+
+ def seek(self, where, whence=0):
+ self._fileobj.seek(where, whence)
+
+ def tell(self):
+ return self._fileobj.tell()
+
+ def close(self):
+ if self._bandwidth_limiting_enabled and self._bytes_seen:
+ # This handles the case where the file is small enough to never
+ # trigger the threshold and thus is never subjugated to the
+ # leaky bucket on read(). This specifically happens for small
+ # uploads. So instead to account for those bytes, have
+ # it go through the leaky bucket when the file gets closed.
+ self._consume_through_leaky_bucket()
+ self._fileobj.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+
+class LeakyBucket:
+ def __init__(
+ self,
+ max_rate,
+ time_utils=None,
+ rate_tracker=None,
+ consumption_scheduler=None,
+ ):
+ """A leaky bucket abstraction to limit bandwidth consumption
+
+ :type rate: int
+ :type rate: The maximum rate to allow. This rate is in terms of
+ bytes per second.
+
+ :type time_utils: TimeUtils
+ :param time_utils: The time utility to use for interacting with time
+
+ :type rate_tracker: BandwidthRateTracker
+ :param rate_tracker: Tracks bandwidth consumption
+
+ :type consumption_scheduler: ConsumptionScheduler
+ :param consumption_scheduler: Schedules consumption retries when
+ necessary
+ """
+ self._max_rate = float(max_rate)
+ self._time_utils = time_utils
+ if time_utils is None:
+ self._time_utils = TimeUtils()
+ self._lock = threading.Lock()
+ self._rate_tracker = rate_tracker
+ if rate_tracker is None:
+ self._rate_tracker = BandwidthRateTracker()
+ self._consumption_scheduler = consumption_scheduler
+ if consumption_scheduler is None:
+ self._consumption_scheduler = ConsumptionScheduler()
+
+ def consume(self, amt, request_token):
+ """Consume an a requested amount
+
+ :type amt: int
+ :param amt: The amount of bytes to request to consume
+
+ :type request_token: RequestToken
+ :param request_token: The token associated to the consumption
+ request that is used to identify the request. So if a
+ RequestExceededException is raised the token should be used
+ in subsequent retry consume() request.
+
+ :raises RequestExceededException: If the consumption amount would
+ exceed the maximum allocated bandwidth
+
+ :rtype: int
+ :returns: The amount consumed
+ """
+ with self._lock:
+ time_now = self._time_utils.time()
+ if self._consumption_scheduler.is_scheduled(request_token):
+ return self._release_requested_amt_for_scheduled_request(
+ amt, request_token, time_now
+ )
+ elif self._projected_to_exceed_max_rate(amt, time_now):
+ self._raise_request_exceeded_exception(
+ amt, request_token, time_now
+ )
+ else:
+ return self._release_requested_amt(amt, time_now)
+
+ def _projected_to_exceed_max_rate(self, amt, time_now):
+ projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
+ return projected_rate > self._max_rate
+
+ def _release_requested_amt_for_scheduled_request(
+ self, amt, request_token, time_now
+ ):
+ self._consumption_scheduler.process_scheduled_consumption(
+ request_token
+ )
+ return self._release_requested_amt(amt, time_now)
+
+ def _raise_request_exceeded_exception(self, amt, request_token, time_now):
+ allocated_time = amt / float(self._max_rate)
+ retry_time = self._consumption_scheduler.schedule_consumption(
+ amt, request_token, allocated_time
+ )
+ raise RequestExceededException(
+ requested_amt=amt, retry_time=retry_time
+ )
+
+ def _release_requested_amt(self, amt, time_now):
+ self._rate_tracker.record_consumption_rate(amt, time_now)
+ return amt
+
+
+class ConsumptionScheduler:
+ def __init__(self):
+ """Schedules when to consume a desired amount"""
+ self._tokens_to_scheduled_consumption = {}
+ self._total_wait = 0
+
+ def is_scheduled(self, token):
+ """Indicates if a consumption request has been scheduled
+
+ :type token: RequestToken
+ :param token: The token associated to the consumption
+ request that is used to identify the request.
+ """
+ return token in self._tokens_to_scheduled_consumption
+
+ def schedule_consumption(self, amt, token, time_to_consume):
+ """Schedules a wait time to be able to consume an amount
+
+ :type amt: int
+ :param amt: The amount of bytes scheduled to be consumed
+
+ :type token: RequestToken
+ :param token: The token associated to the consumption
+ request that is used to identify the request.
+
+ :type time_to_consume: float
+ :param time_to_consume: The desired time it should take for that
+ specific request amount to be consumed in regardless of previously
+ scheduled consumption requests
+
+ :rtype: float
+ :returns: The amount of time to wait for the specific request before
+ actually consuming the specified amount.
+ """
+ self._total_wait += time_to_consume
+ self._tokens_to_scheduled_consumption[token] = {
+ 'wait_duration': self._total_wait,
+ 'time_to_consume': time_to_consume,
+ }
+ return self._total_wait
+
+ def process_scheduled_consumption(self, token):
+ """Processes a scheduled consumption request that has completed
+
+ :type token: RequestToken
+ :param token: The token associated to the consumption
+ request that is used to identify the request.
+ """
+ scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
+ self._total_wait = max(
+ self._total_wait - scheduled_retry['time_to_consume'], 0
+ )
+
+
+class BandwidthRateTracker:
+ def __init__(self, alpha=0.8):
+ """Tracks the rate of bandwidth consumption
+
+ :type a: float
+ :param a: The constant to use in calculating the exponentional moving
+ average of the bandwidth rate. Specifically it is used in the
+ following calculation:
+
+ current_rate = alpha * new_rate + (1 - alpha) * current_rate
+
+ This value of this constant should be between 0 and 1.
+ """
+ self._alpha = alpha
+ self._last_time = None
+ self._current_rate = None
+
+ @property
+ def current_rate(self):
+ """The current transfer rate
+
+ :rtype: float
+ :returns: The current tracked transfer rate
+ """
+ if self._last_time is None:
+ return 0.0
+ return self._current_rate
+
+ def get_projected_rate(self, amt, time_at_consumption):
+ """Get the projected rate using a provided amount and time
+
+ :type amt: int
+ :param amt: The proposed amount to consume
+
+ :type time_at_consumption: float
+ :param time_at_consumption: The proposed time to consume at
+
+ :rtype: float
+ :returns: The consumption rate if that amt and time were consumed
+ """
+ if self._last_time is None:
+ return 0.0
+ return self._calculate_exponential_moving_average_rate(
+ amt, time_at_consumption
+ )
+
+ def record_consumption_rate(self, amt, time_at_consumption):
+ """Record the consumption rate based off amount and time point
+
+ :type amt: int
+ :param amt: The amount that got consumed
+
+ :type time_at_consumption: float
+ :param time_at_consumption: The time at which the amount was consumed
+ """
+ if self._last_time is None:
+ self._last_time = time_at_consumption
+ self._current_rate = 0.0
+ return
+ self._current_rate = self._calculate_exponential_moving_average_rate(
+ amt, time_at_consumption
+ )
+ self._last_time = time_at_consumption
+
+ def _calculate_rate(self, amt, time_at_consumption):
+ time_delta = time_at_consumption - self._last_time
+ if time_delta <= 0:
+ # While it is really unlikely to see this in an actual transfer,
+ # we do not want to be returning back a negative rate or try to
+ # divide the amount by zero. So instead return back an infinite
+ # rate as the time delta is infinitesimally small.
+ return float('inf')
+ return amt / (time_delta)
+
+ def _calculate_exponential_moving_average_rate(
+ self, amt, time_at_consumption
+ ):
+ new_rate = self._calculate_rate(amt, time_at_consumption)
+ return self._alpha * new_rate + (1 - self._alpha) * self._current_rate
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/compat.py b/.venv/lib/python3.12/site-packages/s3transfer/compat.py
new file mode 100644
index 00000000..68267ad0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/compat.py
@@ -0,0 +1,94 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import errno
+import inspect
+import os
+import socket
+import sys
+
+from botocore.compat import six
+
+if sys.platform.startswith('win'):
+ def rename_file(current_filename, new_filename):
+ try:
+ os.remove(new_filename)
+ except OSError as e:
+ if not e.errno == errno.ENOENT:
+ # We only want to a ignore trying to remove
+ # a file that does not exist. If it fails
+ # for any other reason we should be propagating
+ # that exception.
+ raise
+ os.rename(current_filename, new_filename)
+else:
+ rename_file = os.rename
+
+
+def accepts_kwargs(func):
+ return inspect.getfullargspec(func)[2]
+
+
+# In python 3, socket.error is OSError, which is too general
+# for what we want (i.e FileNotFoundError is a subclass of OSError).
+# In python 3, all the socket related errors are in a newly created
+# ConnectionError.
+SOCKET_ERROR = ConnectionError
+MAXINT = None
+
+
+def seekable(fileobj):
+ """Backwards compat function to determine if a fileobj is seekable
+
+ :param fileobj: The file-like object to determine if seekable
+
+ :returns: True, if seekable. False, otherwise.
+ """
+ # If the fileobj has a seekable attr, try calling the seekable()
+ # method on it.
+ if hasattr(fileobj, 'seekable'):
+ return fileobj.seekable()
+ # If there is no seekable attr, check if the object can be seeked
+ # or telled. If it can, try to seek to the current position.
+ elif hasattr(fileobj, 'seek') and hasattr(fileobj, 'tell'):
+ try:
+ fileobj.seek(0, 1)
+ return True
+ except OSError:
+ # If an io related error was thrown then it is not seekable.
+ return False
+ # Else, the fileobj is not seekable
+ return False
+
+
+def readable(fileobj):
+ """Determines whether or not a file-like object is readable.
+
+ :param fileobj: The file-like object to determine if readable
+
+ :returns: True, if readable. False otherwise.
+ """
+ if hasattr(fileobj, 'readable'):
+ return fileobj.readable()
+
+ return hasattr(fileobj, 'read')
+
+
+def fallocate(fileobj, size):
+ if hasattr(os, 'posix_fallocate'):
+ os.posix_fallocate(fileobj.fileno(), 0, size)
+ else:
+ fileobj.truncate(size)
+
+
+# Import at end of file to avoid circular dependencies
+from multiprocessing.managers import BaseManager # noqa: F401,E402
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/constants.py b/.venv/lib/python3.12/site-packages/s3transfer/constants.py
new file mode 100644
index 00000000..9a2ec9bb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/constants.py
@@ -0,0 +1,38 @@
+# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import s3transfer
+
+KB = 1024
+MB = KB * KB
+GB = MB * KB
+
+ALLOWED_DOWNLOAD_ARGS = [
+ 'ChecksumMode',
+ 'VersionId',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKey',
+ 'SSECustomerKeyMD5',
+ 'RequestPayer',
+ 'ExpectedBucketOwner',
+]
+
+FULL_OBJECT_CHECKSUM_ARGS = [
+ 'ChecksumCRC32',
+ 'ChecksumCRC32C',
+ 'ChecksumCRC64NVME',
+ 'ChecksumSHA1',
+ 'ChecksumSHA256',
+]
+
+USER_AGENT = f's3transfer/{s3transfer.__version__}'
+PROCESS_USER_AGENT = f'{USER_AGENT} processpool'
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/copies.py b/.venv/lib/python3.12/site-packages/s3transfer/copies.py
new file mode 100644
index 00000000..c2ae9ce0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/copies.py
@@ -0,0 +1,388 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import copy
+import math
+
+from s3transfer.tasks import (
+ CompleteMultipartUploadTask,
+ CreateMultipartUploadTask,
+ SubmissionTask,
+ Task,
+)
+from s3transfer.utils import (
+ ChunksizeAdjuster,
+ calculate_range_parameter,
+ get_callbacks,
+ get_filtered_dict,
+)
+
+
+class CopySubmissionTask(SubmissionTask):
+ """Task for submitting tasks to execute a copy"""
+
+ EXTRA_ARGS_TO_HEAD_ARGS_MAPPING = {
+ 'CopySourceIfMatch': 'IfMatch',
+ 'CopySourceIfModifiedSince': 'IfModifiedSince',
+ 'CopySourceIfNoneMatch': 'IfNoneMatch',
+ 'CopySourceIfUnmodifiedSince': 'IfUnmodifiedSince',
+ 'CopySourceSSECustomerKey': 'SSECustomerKey',
+ 'CopySourceSSECustomerAlgorithm': 'SSECustomerAlgorithm',
+ 'CopySourceSSECustomerKeyMD5': 'SSECustomerKeyMD5',
+ 'RequestPayer': 'RequestPayer',
+ 'ExpectedBucketOwner': 'ExpectedBucketOwner',
+ }
+
+ UPLOAD_PART_COPY_ARGS = [
+ 'CopySourceIfMatch',
+ 'CopySourceIfModifiedSince',
+ 'CopySourceIfNoneMatch',
+ 'CopySourceIfUnmodifiedSince',
+ 'CopySourceSSECustomerKey',
+ 'CopySourceSSECustomerAlgorithm',
+ 'CopySourceSSECustomerKeyMD5',
+ 'SSECustomerKey',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKeyMD5',
+ 'RequestPayer',
+ 'ExpectedBucketOwner',
+ ]
+
+ CREATE_MULTIPART_ARGS_BLACKLIST = [
+ 'CopySourceIfMatch',
+ 'CopySourceIfModifiedSince',
+ 'CopySourceIfNoneMatch',
+ 'CopySourceIfUnmodifiedSince',
+ 'CopySourceSSECustomerKey',
+ 'CopySourceSSECustomerAlgorithm',
+ 'CopySourceSSECustomerKeyMD5',
+ 'MetadataDirective',
+ 'TaggingDirective',
+ ]
+
+ COMPLETE_MULTIPART_ARGS = [
+ 'SSECustomerKey',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKeyMD5',
+ 'RequestPayer',
+ 'ExpectedBucketOwner',
+ ]
+
+ def _submit(
+ self, client, config, osutil, request_executor, transfer_future
+ ):
+ """
+ :param client: The client associated with the transfer manager
+
+ :type config: s3transfer.manager.TransferConfig
+ :param config: The transfer config associated with the transfer
+ manager
+
+ :type osutil: s3transfer.utils.OSUtil
+ :param osutil: The os utility associated to the transfer manager
+
+ :type request_executor: s3transfer.futures.BoundedExecutor
+ :param request_executor: The request executor associated with the
+ transfer manager
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future associated with the
+ transfer request that tasks are being submitted for
+ """
+ # Determine the size if it was not provided
+ if transfer_future.meta.size is None:
+ # If a size was not provided figure out the size for the
+ # user. Note that we will only use the client provided to
+ # the TransferManager. If the object is outside of the region
+ # of the client, they may have to provide the file size themselves
+ # with a completely new client.
+ call_args = transfer_future.meta.call_args
+ head_object_request = (
+ self._get_head_object_request_from_copy_source(
+ call_args.copy_source
+ )
+ )
+ extra_args = call_args.extra_args
+
+ # Map any values that may be used in the head object that is
+ # used in the copy object
+ for param, value in extra_args.items():
+ if param in self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING:
+ head_object_request[
+ self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING[param]
+ ] = value
+
+ response = call_args.source_client.head_object(
+ **head_object_request
+ )
+ transfer_future.meta.provide_transfer_size(
+ response['ContentLength']
+ )
+
+ # If it is greater than threshold do a multipart copy, otherwise
+ # do a regular copy object.
+ if transfer_future.meta.size < config.multipart_threshold:
+ self._submit_copy_request(
+ client, config, osutil, request_executor, transfer_future
+ )
+ else:
+ self._submit_multipart_request(
+ client, config, osutil, request_executor, transfer_future
+ )
+
+ def _submit_copy_request(
+ self, client, config, osutil, request_executor, transfer_future
+ ):
+ call_args = transfer_future.meta.call_args
+
+ # Get the needed progress callbacks for the task
+ progress_callbacks = get_callbacks(transfer_future, 'progress')
+
+ # Submit the request of a single copy.
+ self._transfer_coordinator.submit(
+ request_executor,
+ CopyObjectTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'copy_source': call_args.copy_source,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'extra_args': call_args.extra_args,
+ 'callbacks': progress_callbacks,
+ 'size': transfer_future.meta.size,
+ },
+ is_final=True,
+ ),
+ )
+
+ def _submit_multipart_request(
+ self, client, config, osutil, request_executor, transfer_future
+ ):
+ call_args = transfer_future.meta.call_args
+
+ # Submit the request to create a multipart upload and make sure it
+ # does not include any of the arguments used for copy part.
+ create_multipart_extra_args = {}
+ for param, val in call_args.extra_args.items():
+ if param not in self.CREATE_MULTIPART_ARGS_BLACKLIST:
+ create_multipart_extra_args[param] = val
+
+ create_multipart_future = self._transfer_coordinator.submit(
+ request_executor,
+ CreateMultipartUploadTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'extra_args': create_multipart_extra_args,
+ },
+ ),
+ )
+
+ # Determine how many parts are needed based on filesize and
+ # desired chunksize.
+ part_size = config.multipart_chunksize
+ adjuster = ChunksizeAdjuster()
+ part_size = adjuster.adjust_chunksize(
+ part_size, transfer_future.meta.size
+ )
+ num_parts = int(
+ math.ceil(transfer_future.meta.size / float(part_size))
+ )
+
+ # Submit requests to upload the parts of the file.
+ part_futures = []
+ progress_callbacks = get_callbacks(transfer_future, 'progress')
+
+ for part_number in range(1, num_parts + 1):
+ extra_part_args = self._extra_upload_part_args(
+ call_args.extra_args
+ )
+ # The part number for upload part starts at 1 while the
+ # range parameter starts at zero, so just subtract 1 off of
+ # the part number
+ extra_part_args['CopySourceRange'] = calculate_range_parameter(
+ part_size,
+ part_number - 1,
+ num_parts,
+ transfer_future.meta.size,
+ )
+ # Get the size of the part copy as well for the progress
+ # callbacks.
+ size = self._get_transfer_size(
+ part_size,
+ part_number - 1,
+ num_parts,
+ transfer_future.meta.size,
+ )
+ # Get the checksum algorithm of the multipart request.
+ checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm")
+ part_futures.append(
+ self._transfer_coordinator.submit(
+ request_executor,
+ CopyPartTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'copy_source': call_args.copy_source,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'part_number': part_number,
+ 'extra_args': extra_part_args,
+ 'callbacks': progress_callbacks,
+ 'size': size,
+ 'checksum_algorithm': checksum_algorithm,
+ },
+ pending_main_kwargs={
+ 'upload_id': create_multipart_future
+ },
+ ),
+ )
+ )
+
+ complete_multipart_extra_args = self._extra_complete_multipart_args(
+ call_args.extra_args
+ )
+ # Submit the request to complete the multipart upload.
+ self._transfer_coordinator.submit(
+ request_executor,
+ CompleteMultipartUploadTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'extra_args': complete_multipart_extra_args,
+ },
+ pending_main_kwargs={
+ 'upload_id': create_multipart_future,
+ 'parts': part_futures,
+ },
+ is_final=True,
+ ),
+ )
+
+ def _get_head_object_request_from_copy_source(self, copy_source):
+ if isinstance(copy_source, dict):
+ return copy.copy(copy_source)
+ else:
+ raise TypeError(
+ 'Expecting dictionary formatted: '
+ '{"Bucket": bucket_name, "Key": key} '
+ f'but got {copy_source} or type {type(copy_source)}.'
+ )
+
+ def _extra_upload_part_args(self, extra_args):
+ # Only the args in COPY_PART_ARGS actually need to be passed
+ # onto the upload_part_copy calls.
+ return get_filtered_dict(extra_args, self.UPLOAD_PART_COPY_ARGS)
+
+ def _extra_complete_multipart_args(self, extra_args):
+ return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
+
+ def _get_transfer_size(
+ self, part_size, part_index, num_parts, total_transfer_size
+ ):
+ if part_index == num_parts - 1:
+ # The last part may be different in size then the rest of the
+ # parts.
+ return total_transfer_size - (part_index * part_size)
+ return part_size
+
+
+class CopyObjectTask(Task):
+ """Task to do a nonmultipart copy"""
+
+ def _main(
+ self, client, copy_source, bucket, key, extra_args, callbacks, size
+ ):
+ """
+ :param client: The client to use when calling PutObject
+ :param copy_source: The CopySource parameter to use
+ :param bucket: The name of the bucket to copy to
+ :param key: The name of the key to copy to
+ :param extra_args: A dictionary of any extra arguments that may be
+ used in the upload.
+ :param callbacks: List of callbacks to call after copy
+ :param size: The size of the transfer. This value is passed into
+ the callbacks
+
+ """
+ client.copy_object(
+ CopySource=copy_source, Bucket=bucket, Key=key, **extra_args
+ )
+ for callback in callbacks:
+ callback(bytes_transferred=size)
+
+
+class CopyPartTask(Task):
+ """Task to upload a part in a multipart copy"""
+
+ def _main(
+ self,
+ client,
+ copy_source,
+ bucket,
+ key,
+ upload_id,
+ part_number,
+ extra_args,
+ callbacks,
+ size,
+ checksum_algorithm=None,
+ ):
+ """
+ :param client: The client to use when calling PutObject
+ :param copy_source: The CopySource parameter to use
+ :param bucket: The name of the bucket to upload to
+ :param key: The name of the key to upload to
+ :param upload_id: The id of the upload
+ :param part_number: The number representing the part of the multipart
+ upload
+ :param extra_args: A dictionary of any extra arguments that may be
+ used in the upload.
+ :param callbacks: List of callbacks to call after copy part
+ :param size: The size of the transfer. This value is passed into
+ the callbacks
+ :param checksum_algorithm: The algorithm that was used to create the multipart
+ upload
+
+ :rtype: dict
+ :returns: A dictionary representing a part::
+
+ {'Etag': etag_value, 'PartNumber': part_number}
+
+ This value can be appended to a list to be used to complete
+ the multipart upload. If a checksum is in the response,
+ it will also be included.
+ """
+ response = client.upload_part_copy(
+ CopySource=copy_source,
+ Bucket=bucket,
+ Key=key,
+ UploadId=upload_id,
+ PartNumber=part_number,
+ **extra_args,
+ )
+ for callback in callbacks:
+ callback(bytes_transferred=size)
+ etag = response['CopyPartResult']['ETag']
+ part_metadata = {'ETag': etag, 'PartNumber': part_number}
+ if checksum_algorithm:
+ checksum_member = f'Checksum{checksum_algorithm.upper()}'
+ if checksum_member in response['CopyPartResult']:
+ part_metadata[checksum_member] = response['CopyPartResult'][
+ checksum_member
+ ]
+ return part_metadata
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/crt.py b/.venv/lib/python3.12/site-packages/s3transfer/crt.py
new file mode 100644
index 00000000..b11c3682
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/crt.py
@@ -0,0 +1,991 @@
+# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import logging
+import re
+import threading
+from io import BytesIO
+
+import awscrt.http
+import awscrt.s3
+import botocore.awsrequest
+import botocore.session
+from awscrt.auth import (
+ AwsCredentials,
+ AwsCredentialsProvider,
+ AwsSigningAlgorithm,
+ AwsSigningConfig,
+)
+from awscrt.io import (
+ ClientBootstrap,
+ ClientTlsContext,
+ DefaultHostResolver,
+ EventLoopGroup,
+ TlsContextOptions,
+)
+from awscrt.s3 import S3Client, S3RequestTlsMode, S3RequestType
+from botocore import UNSIGNED
+from botocore.compat import urlsplit
+from botocore.config import Config
+from botocore.exceptions import NoCredentialsError
+from botocore.utils import ArnParser, InvalidArnException
+
+from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS, MB
+from s3transfer.exceptions import TransferNotDoneError
+from s3transfer.futures import BaseTransferFuture, BaseTransferMeta
+from s3transfer.manager import TransferManager
+from s3transfer.utils import (
+ CallArgs,
+ OSUtils,
+ get_callbacks,
+ is_s3express_bucket,
+)
+
+logger = logging.getLogger(__name__)
+
+CRT_S3_PROCESS_LOCK = None
+
+
+def acquire_crt_s3_process_lock(name):
+ # Currently, the CRT S3 client performs best when there is only one
+ # instance of it running on a host. This lock allows an application to
+ # signal across processes whether there is another process of the same
+ # application using the CRT S3 client and prevent spawning more than one
+ # CRT S3 clients running on the system for that application.
+ #
+ # NOTE: When acquiring the CRT process lock, the lock automatically is
+ # released when the lock object is garbage collected. So, the CRT process
+ # lock is set as a global so that it is not unintentionally garbage
+ # collected/released if reference of the lock is lost.
+ global CRT_S3_PROCESS_LOCK
+ if CRT_S3_PROCESS_LOCK is None:
+ crt_lock = awscrt.s3.CrossProcessLock(name)
+ try:
+ crt_lock.acquire()
+ except RuntimeError:
+ # If there is another process that is holding the lock, the CRT
+ # returns a RuntimeError. We return None here to signal that our
+ # current process was not able to acquire the lock.
+ return None
+ CRT_S3_PROCESS_LOCK = crt_lock
+ return CRT_S3_PROCESS_LOCK
+
+
+def create_s3_crt_client(
+ region,
+ crt_credentials_provider=None,
+ num_threads=None,
+ target_throughput=None,
+ part_size=8 * MB,
+ use_ssl=True,
+ verify=None,
+):
+ """
+ :type region: str
+ :param region: The region used for signing
+
+ :type crt_credentials_provider:
+ Optional[awscrt.auth.AwsCredentialsProvider]
+ :param crt_credentials_provider: CRT AWS credentials provider
+ to use to sign requests. If not set, requests will not be signed.
+
+ :type num_threads: Optional[int]
+ :param num_threads: Number of worker threads generated. Default
+ is the number of processors in the machine.
+
+ :type target_throughput: Optional[int]
+ :param target_throughput: Throughput target in bytes per second.
+ By default, CRT will automatically attempt to choose a target
+ throughput that matches the system's maximum network throughput.
+ Currently, if CRT is unable to determine the maximum network
+ throughput, a fallback target throughput of ``1_250_000_000`` bytes
+ per second (which translates to 10 gigabits per second, or 1.16
+ gibibytes per second) is used. To set a specific target
+ throughput, set a value for this parameter.
+
+ :type part_size: Optional[int]
+ :param part_size: Size, in Bytes, of parts that files will be downloaded
+ or uploaded in.
+
+ :type use_ssl: boolean
+ :param use_ssl: Whether or not to use SSL. By default, SSL is used.
+ Note that not all services support non-ssl connections.
+
+ :type verify: Optional[boolean/string]
+ :param verify: Whether or not to verify SSL certificates.
+ By default SSL certificates are verified. You can provide the
+ following values:
+
+ * False - do not validate SSL certificates. SSL will still be
+ used (unless use_ssl is False), but SSL certificates
+ will not be verified.
+ * path/to/cert/bundle.pem - A filename of the CA cert bundle to
+ use. Specify this argument if you want to use a custom CA cert
+ bundle instead of the default one on your system.
+ """
+ event_loop_group = EventLoopGroup(num_threads)
+ host_resolver = DefaultHostResolver(event_loop_group)
+ bootstrap = ClientBootstrap(event_loop_group, host_resolver)
+ tls_connection_options = None
+
+ tls_mode = (
+ S3RequestTlsMode.ENABLED if use_ssl else S3RequestTlsMode.DISABLED
+ )
+ if verify is not None:
+ tls_ctx_options = TlsContextOptions()
+ if verify:
+ tls_ctx_options.override_default_trust_store_from_path(
+ ca_filepath=verify
+ )
+ else:
+ tls_ctx_options.verify_peer = False
+ client_tls_option = ClientTlsContext(tls_ctx_options)
+ tls_connection_options = client_tls_option.new_connection_options()
+ target_gbps = _get_crt_throughput_target_gbps(
+ provided_throughput_target_bytes=target_throughput
+ )
+ return S3Client(
+ bootstrap=bootstrap,
+ region=region,
+ credential_provider=crt_credentials_provider,
+ part_size=part_size,
+ tls_mode=tls_mode,
+ tls_connection_options=tls_connection_options,
+ throughput_target_gbps=target_gbps,
+ enable_s3express=True,
+ )
+
+
+def _get_crt_throughput_target_gbps(provided_throughput_target_bytes=None):
+ if provided_throughput_target_bytes is None:
+ target_gbps = awscrt.s3.get_recommended_throughput_target_gbps()
+ logger.debug(
+ 'Recommended CRT throughput target in gbps: %s', target_gbps
+ )
+ if target_gbps is None:
+ target_gbps = 10.0
+ else:
+ # NOTE: The GB constant in s3transfer is technically a gibibyte. The
+ # GB constant is not used here because the CRT interprets gigabits
+ # for networking as a base power of 10
+ # (i.e. 1000 ** 3 instead of 1024 ** 3).
+ target_gbps = provided_throughput_target_bytes * 8 / 1_000_000_000
+ logger.debug('Using CRT throughput target in gbps: %s', target_gbps)
+ return target_gbps
+
+
+class CRTTransferManager:
+ ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS
+ ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS
+ ALLOWED_DELETE_ARGS = TransferManager.ALLOWED_DELETE_ARGS
+
+ VALIDATE_SUPPORTED_BUCKET_VALUES = True
+
+ _UNSUPPORTED_BUCKET_PATTERNS = TransferManager._UNSUPPORTED_BUCKET_PATTERNS
+
+ def __init__(self, crt_s3_client, crt_request_serializer, osutil=None):
+ """A transfer manager interface for Amazon S3 on CRT s3 client.
+
+ :type crt_s3_client: awscrt.s3.S3Client
+ :param crt_s3_client: The CRT s3 client, handling all the
+ HTTP requests and functions under then hood
+
+ :type crt_request_serializer: s3transfer.crt.BaseCRTRequestSerializer
+ :param crt_request_serializer: Serializer, generates unsigned crt HTTP
+ request.
+
+ :type osutil: s3transfer.utils.OSUtils
+ :param osutil: OSUtils object to use for os-related behavior when
+ using with transfer manager.
+ """
+ if osutil is None:
+ self._osutil = OSUtils()
+ self._crt_s3_client = crt_s3_client
+ self._s3_args_creator = S3ClientArgsCreator(
+ crt_request_serializer, self._osutil
+ )
+ self._crt_exception_translator = (
+ crt_request_serializer.translate_crt_exception
+ )
+ self._future_coordinators = []
+ self._semaphore = threading.Semaphore(128) # not configurable
+ # A counter to create unique id's for each transfer submitted.
+ self._id_counter = 0
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, *args):
+ cancel = False
+ if exc_type:
+ cancel = True
+ self._shutdown(cancel)
+
+ def download(
+ self, bucket, key, fileobj, extra_args=None, subscribers=None
+ ):
+ if extra_args is None:
+ extra_args = {}
+ if subscribers is None:
+ subscribers = {}
+ self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
+ self._validate_if_bucket_supported(bucket)
+ callargs = CallArgs(
+ bucket=bucket,
+ key=key,
+ fileobj=fileobj,
+ extra_args=extra_args,
+ subscribers=subscribers,
+ )
+ return self._submit_transfer("get_object", callargs)
+
+ def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
+ if extra_args is None:
+ extra_args = {}
+ if subscribers is None:
+ subscribers = {}
+ self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
+ self._validate_if_bucket_supported(bucket)
+ self._validate_checksum_algorithm_supported(extra_args)
+ callargs = CallArgs(
+ bucket=bucket,
+ key=key,
+ fileobj=fileobj,
+ extra_args=extra_args,
+ subscribers=subscribers,
+ )
+ return self._submit_transfer("put_object", callargs)
+
+ def delete(self, bucket, key, extra_args=None, subscribers=None):
+ if extra_args is None:
+ extra_args = {}
+ if subscribers is None:
+ subscribers = {}
+ self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
+ self._validate_if_bucket_supported(bucket)
+ callargs = CallArgs(
+ bucket=bucket,
+ key=key,
+ extra_args=extra_args,
+ subscribers=subscribers,
+ )
+ return self._submit_transfer("delete_object", callargs)
+
+ def shutdown(self, cancel=False):
+ self._shutdown(cancel)
+
+ def _validate_if_bucket_supported(self, bucket):
+ # s3 high level operations don't support some resources
+ # (eg. S3 Object Lambda) only direct API calls are available
+ # for such resources
+ if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
+ for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
+ match = pattern.match(bucket)
+ if match:
+ raise ValueError(
+ f'TransferManager methods do not support {resource} '
+ 'resource. Use direct client calls instead.'
+ )
+
+ def _validate_all_known_args(self, actual, allowed):
+ for kwarg in actual:
+ if kwarg not in allowed:
+ raise ValueError(
+ f"Invalid extra_args key '{kwarg}', "
+ f"must be one of: {', '.join(allowed)}"
+ )
+
+ def _validate_checksum_algorithm_supported(self, extra_args):
+ checksum_algorithm = extra_args.get('ChecksumAlgorithm')
+ if checksum_algorithm is None:
+ return
+ supported_algorithms = list(awscrt.s3.S3ChecksumAlgorithm.__members__)
+ if checksum_algorithm.upper() not in supported_algorithms:
+ raise ValueError(
+ f'ChecksumAlgorithm: {checksum_algorithm} not supported. '
+ f'Supported algorithms are: {supported_algorithms}'
+ )
+
+ def _cancel_transfers(self):
+ for coordinator in self._future_coordinators:
+ if not coordinator.done():
+ coordinator.cancel()
+
+ def _finish_transfers(self):
+ for coordinator in self._future_coordinators:
+ coordinator.result()
+
+ def _wait_transfers_done(self):
+ for coordinator in self._future_coordinators:
+ coordinator.wait_until_on_done_callbacks_complete()
+
+ def _shutdown(self, cancel=False):
+ if cancel:
+ self._cancel_transfers()
+ try:
+ self._finish_transfers()
+
+ except KeyboardInterrupt:
+ self._cancel_transfers()
+ except Exception:
+ pass
+ finally:
+ self._wait_transfers_done()
+
+ def _release_semaphore(self, **kwargs):
+ self._semaphore.release()
+
+ def _submit_transfer(self, request_type, call_args):
+ on_done_after_calls = [self._release_semaphore]
+ coordinator = CRTTransferCoordinator(
+ transfer_id=self._id_counter,
+ exception_translator=self._crt_exception_translator,
+ )
+ components = {
+ 'meta': CRTTransferMeta(self._id_counter, call_args),
+ 'coordinator': coordinator,
+ }
+ future = CRTTransferFuture(**components)
+ afterdone = AfterDoneHandler(coordinator)
+ on_done_after_calls.append(afterdone)
+
+ try:
+ self._semaphore.acquire()
+ on_queued = self._s3_args_creator.get_crt_callback(
+ future, 'queued'
+ )
+ on_queued()
+ crt_callargs = self._s3_args_creator.get_make_request_args(
+ request_type,
+ call_args,
+ coordinator,
+ future,
+ on_done_after_calls,
+ )
+ crt_s3_request = self._crt_s3_client.make_request(**crt_callargs)
+ except Exception as e:
+ coordinator.set_exception(e, True)
+ on_done = self._s3_args_creator.get_crt_callback(
+ future, 'done', after_subscribers=on_done_after_calls
+ )
+ on_done(error=e)
+ else:
+ coordinator.set_s3_request(crt_s3_request)
+ self._future_coordinators.append(coordinator)
+
+ self._id_counter += 1
+ return future
+
+
+class CRTTransferMeta(BaseTransferMeta):
+ """Holds metadata about the CRTTransferFuture"""
+
+ def __init__(self, transfer_id=None, call_args=None):
+ self._transfer_id = transfer_id
+ self._call_args = call_args
+ self._user_context = {}
+
+ @property
+ def call_args(self):
+ return self._call_args
+
+ @property
+ def transfer_id(self):
+ return self._transfer_id
+
+ @property
+ def user_context(self):
+ return self._user_context
+
+
+class CRTTransferFuture(BaseTransferFuture):
+ def __init__(self, meta=None, coordinator=None):
+ """The future associated to a submitted transfer request via CRT S3 client
+
+ :type meta: s3transfer.crt.CRTTransferMeta
+ :param meta: The metadata associated to the transfer future.
+
+ :type coordinator: s3transfer.crt.CRTTransferCoordinator
+ :param coordinator: The coordinator associated to the transfer future.
+ """
+ self._meta = meta
+ if meta is None:
+ self._meta = CRTTransferMeta()
+ self._coordinator = coordinator
+
+ @property
+ def meta(self):
+ return self._meta
+
+ def done(self):
+ return self._coordinator.done()
+
+ def result(self, timeout=None):
+ self._coordinator.result(timeout)
+
+ def cancel(self):
+ self._coordinator.cancel()
+
+ def set_exception(self, exception):
+ """Sets the exception on the future."""
+ if not self.done():
+ raise TransferNotDoneError(
+ 'set_exception can only be called once the transfer is '
+ 'complete.'
+ )
+ self._coordinator.set_exception(exception, override=True)
+
+
+class BaseCRTRequestSerializer:
+ def serialize_http_request(self, transfer_type, future):
+ """Serialize CRT HTTP requests.
+
+ :type transfer_type: string
+ :param transfer_type: the type of transfer made,
+ e.g 'put_object', 'get_object', 'delete_object'
+
+ :type future: s3transfer.crt.CRTTransferFuture
+
+ :rtype: awscrt.http.HttpRequest
+ :returns: An unsigned HTTP request to be used for the CRT S3 client
+ """
+ raise NotImplementedError('serialize_http_request()')
+
+ def translate_crt_exception(self, exception):
+ raise NotImplementedError('translate_crt_exception()')
+
+
+class BotocoreCRTRequestSerializer(BaseCRTRequestSerializer):
+ def __init__(self, session, client_kwargs=None):
+ """Serialize CRT HTTP request using botocore logic
+ It also takes into account configuration from both the session
+ and any keyword arguments that could be passed to
+ `Session.create_client()` when serializing the request.
+
+ :type session: botocore.session.Session
+
+ :type client_kwargs: Optional[Dict[str, str]])
+ :param client_kwargs: The kwargs for the botocore
+ s3 client initialization.
+ """
+ self._session = session
+ if client_kwargs is None:
+ client_kwargs = {}
+ self._resolve_client_config(session, client_kwargs)
+ self._client = session.create_client(**client_kwargs)
+ self._client.meta.events.register(
+ 'request-created.s3.*', self._capture_http_request
+ )
+ self._client.meta.events.register(
+ 'after-call.s3.*', self._change_response_to_serialized_http_request
+ )
+ self._client.meta.events.register(
+ 'before-send.s3.*', self._make_fake_http_response
+ )
+ self._client.meta.events.register(
+ 'before-call.s3.*', self._remove_checksum_context
+ )
+
+ def _resolve_client_config(self, session, client_kwargs):
+ user_provided_config = None
+ if session.get_default_client_config():
+ user_provided_config = session.get_default_client_config()
+ if 'config' in client_kwargs:
+ user_provided_config = client_kwargs['config']
+
+ client_config = Config(signature_version=UNSIGNED)
+ if user_provided_config:
+ client_config = user_provided_config.merge(client_config)
+ client_kwargs['config'] = client_config
+ client_kwargs["service_name"] = "s3"
+
+ def _crt_request_from_aws_request(self, aws_request):
+ url_parts = urlsplit(aws_request.url)
+ crt_path = url_parts.path
+ if url_parts.query:
+ crt_path = f'{crt_path}?{url_parts.query}'
+ headers_list = []
+ for name, value in aws_request.headers.items():
+ if isinstance(value, str):
+ headers_list.append((name, value))
+ else:
+ headers_list.append((name, str(value, 'utf-8')))
+
+ crt_headers = awscrt.http.HttpHeaders(headers_list)
+
+ crt_request = awscrt.http.HttpRequest(
+ method=aws_request.method,
+ path=crt_path,
+ headers=crt_headers,
+ body_stream=aws_request.body,
+ )
+ return crt_request
+
+ def _convert_to_crt_http_request(self, botocore_http_request):
+ # Logic that does CRTUtils.crt_request_from_aws_request
+ crt_request = self._crt_request_from_aws_request(botocore_http_request)
+ if crt_request.headers.get("host") is None:
+ # If host is not set, set it for the request before using CRT s3
+ url_parts = urlsplit(botocore_http_request.url)
+ crt_request.headers.set("host", url_parts.netloc)
+ if crt_request.headers.get('Content-MD5') is not None:
+ crt_request.headers.remove("Content-MD5")
+
+ # In general, the CRT S3 client expects a content length header. It
+ # only expects a missing content length header if the body is not
+ # seekable. However, botocore does not set the content length header
+ # for GetObject API requests and so we set the content length to zero
+ # to meet the CRT S3 client's expectation that the content length
+ # header is set even if there is no body.
+ if crt_request.headers.get('Content-Length') is None:
+ if botocore_http_request.body is None:
+ crt_request.headers.add('Content-Length', "0")
+
+ # Botocore sets the Transfer-Encoding header when it cannot determine
+ # the content length of the request body (e.g. it's not seekable).
+ # However, CRT does not support this header, but it supports
+ # non-seekable bodies. So we remove this header to not cause issues
+ # in the downstream CRT S3 request.
+ if crt_request.headers.get('Transfer-Encoding') is not None:
+ crt_request.headers.remove('Transfer-Encoding')
+
+ return crt_request
+
+ def _capture_http_request(self, request, **kwargs):
+ request.context['http_request'] = request
+
+ def _change_response_to_serialized_http_request(
+ self, context, parsed, **kwargs
+ ):
+ request = context['http_request']
+ parsed['HTTPRequest'] = request.prepare()
+
+ def _make_fake_http_response(self, request, **kwargs):
+ return botocore.awsrequest.AWSResponse(
+ None,
+ 200,
+ {},
+ FakeRawResponse(b""),
+ )
+
+ def _get_botocore_http_request(self, client_method, call_args):
+ return getattr(self._client, client_method)(
+ Bucket=call_args.bucket, Key=call_args.key, **call_args.extra_args
+ )['HTTPRequest']
+
+ def serialize_http_request(self, transfer_type, future):
+ botocore_http_request = self._get_botocore_http_request(
+ transfer_type, future.meta.call_args
+ )
+ crt_request = self._convert_to_crt_http_request(botocore_http_request)
+ return crt_request
+
+ def translate_crt_exception(self, exception):
+ if isinstance(exception, awscrt.s3.S3ResponseError):
+ return self._translate_crt_s3_response_error(exception)
+ else:
+ return None
+
+ def _translate_crt_s3_response_error(self, s3_response_error):
+ status_code = s3_response_error.status_code
+ if status_code < 301:
+ # Botocore's exception parsing only
+ # runs on status codes >= 301
+ return None
+
+ headers = {k: v for k, v in s3_response_error.headers}
+ operation_name = s3_response_error.operation_name
+ if operation_name is not None:
+ service_model = self._client.meta.service_model
+ shape = service_model.operation_model(operation_name).output_shape
+ else:
+ shape = None
+
+ response_dict = {
+ 'headers': botocore.awsrequest.HeadersDict(headers),
+ 'status_code': status_code,
+ 'body': s3_response_error.body,
+ }
+ parsed_response = self._client._response_parser.parse(
+ response_dict, shape=shape
+ )
+
+ error_code = parsed_response.get("Error", {}).get("Code")
+ error_class = self._client.exceptions.from_code(error_code)
+ return error_class(parsed_response, operation_name=operation_name)
+
+ def _remove_checksum_context(self, params, **kwargs):
+ request_context = params.get("context", {})
+ if "checksum" in request_context:
+ del request_context["checksum"]
+
+
+class FakeRawResponse(BytesIO):
+ def stream(self, amt=1024, decode_content=None):
+ while True:
+ chunk = self.read(amt)
+ if not chunk:
+ break
+ yield chunk
+
+
+class BotocoreCRTCredentialsWrapper:
+ def __init__(self, resolved_botocore_credentials):
+ self._resolved_credentials = resolved_botocore_credentials
+
+ def __call__(self):
+ credentials = self._get_credentials().get_frozen_credentials()
+ return AwsCredentials(
+ credentials.access_key, credentials.secret_key, credentials.token
+ )
+
+ def to_crt_credentials_provider(self):
+ return AwsCredentialsProvider.new_delegate(self)
+
+ def _get_credentials(self):
+ if self._resolved_credentials is None:
+ raise NoCredentialsError()
+ return self._resolved_credentials
+
+
+class CRTTransferCoordinator:
+ """A helper class for managing CRTTransferFuture"""
+
+ def __init__(
+ self, transfer_id=None, s3_request=None, exception_translator=None
+ ):
+ self.transfer_id = transfer_id
+ self._exception_translator = exception_translator
+ self._s3_request = s3_request
+ self._lock = threading.Lock()
+ self._exception = None
+ self._crt_future = None
+ self._done_event = threading.Event()
+
+ @property
+ def s3_request(self):
+ return self._s3_request
+
+ def set_done_callbacks_complete(self):
+ self._done_event.set()
+
+ def wait_until_on_done_callbacks_complete(self, timeout=None):
+ self._done_event.wait(timeout)
+
+ def set_exception(self, exception, override=False):
+ with self._lock:
+ if not self.done() or override:
+ self._exception = exception
+
+ def cancel(self):
+ if self._s3_request:
+ self._s3_request.cancel()
+
+ def result(self, timeout=None):
+ if self._exception:
+ raise self._exception
+ try:
+ self._crt_future.result(timeout)
+ except KeyboardInterrupt:
+ self.cancel()
+ self._crt_future.result(timeout)
+ raise
+ except Exception as e:
+ self.handle_exception(e)
+ finally:
+ if self._s3_request:
+ self._s3_request = None
+
+ def handle_exception(self, exc):
+ translated_exc = None
+ if self._exception_translator:
+ try:
+ translated_exc = self._exception_translator(exc)
+ except Exception as e:
+ # Bail out if we hit an issue translating
+ # and raise the original error.
+ logger.debug("Unable to translate exception.", exc_info=e)
+ pass
+ if translated_exc is not None:
+ raise translated_exc from exc
+ else:
+ raise exc
+
+ def done(self):
+ if self._crt_future is None:
+ return False
+ return self._crt_future.done()
+
+ def set_s3_request(self, s3_request):
+ self._s3_request = s3_request
+ self._crt_future = self._s3_request.finished_future
+
+
+class S3ClientArgsCreator:
+ def __init__(self, crt_request_serializer, os_utils):
+ self._request_serializer = crt_request_serializer
+ self._os_utils = os_utils
+
+ def get_make_request_args(
+ self, request_type, call_args, coordinator, future, on_done_after_calls
+ ):
+ request_args_handler = getattr(
+ self,
+ f'_get_make_request_args_{request_type}',
+ self._default_get_make_request_args,
+ )
+ return request_args_handler(
+ request_type=request_type,
+ call_args=call_args,
+ coordinator=coordinator,
+ future=future,
+ on_done_before_calls=[],
+ on_done_after_calls=on_done_after_calls,
+ )
+
+ def get_crt_callback(
+ self,
+ future,
+ callback_type,
+ before_subscribers=None,
+ after_subscribers=None,
+ ):
+ def invoke_all_callbacks(*args, **kwargs):
+ callbacks_list = []
+ if before_subscribers is not None:
+ callbacks_list += before_subscribers
+ callbacks_list += get_callbacks(future, callback_type)
+ if after_subscribers is not None:
+ callbacks_list += after_subscribers
+ for callback in callbacks_list:
+ # The get_callbacks helper will set the first augment
+ # by keyword, the other augments need to be set by keyword
+ # as well
+ if callback_type == "progress":
+ callback(bytes_transferred=args[0])
+ else:
+ callback(*args, **kwargs)
+
+ return invoke_all_callbacks
+
+ def _get_make_request_args_put_object(
+ self,
+ request_type,
+ call_args,
+ coordinator,
+ future,
+ on_done_before_calls,
+ on_done_after_calls,
+ ):
+ send_filepath = None
+ if isinstance(call_args.fileobj, str):
+ send_filepath = call_args.fileobj
+ data_len = self._os_utils.get_file_size(send_filepath)
+ call_args.extra_args["ContentLength"] = data_len
+ else:
+ call_args.extra_args["Body"] = call_args.fileobj
+
+ checksum_config = None
+ if not any(
+ checksum_arg in call_args.extra_args
+ for checksum_arg in FULL_OBJECT_CHECKSUM_ARGS
+ ):
+ checksum_algorithm = call_args.extra_args.pop(
+ 'ChecksumAlgorithm', 'CRC32'
+ ).upper()
+ checksum_config = awscrt.s3.S3ChecksumConfig(
+ algorithm=awscrt.s3.S3ChecksumAlgorithm[checksum_algorithm],
+ location=awscrt.s3.S3ChecksumLocation.TRAILER,
+ )
+ # Suppress botocore's automatic MD5 calculation by setting an override
+ # value that will get deleted in the BotocoreCRTRequestSerializer.
+ # As part of the CRT S3 request, we request the CRT S3 client to
+ # automatically add trailing checksums to its uploads.
+ call_args.extra_args["ContentMD5"] = "override-to-be-removed"
+
+ make_request_args = self._default_get_make_request_args(
+ request_type=request_type,
+ call_args=call_args,
+ coordinator=coordinator,
+ future=future,
+ on_done_before_calls=on_done_before_calls,
+ on_done_after_calls=on_done_after_calls,
+ )
+ make_request_args['send_filepath'] = send_filepath
+ make_request_args['checksum_config'] = checksum_config
+ return make_request_args
+
+ def _get_make_request_args_get_object(
+ self,
+ request_type,
+ call_args,
+ coordinator,
+ future,
+ on_done_before_calls,
+ on_done_after_calls,
+ ):
+ recv_filepath = None
+ on_body = None
+ checksum_config = awscrt.s3.S3ChecksumConfig(validate_response=True)
+ if isinstance(call_args.fileobj, str):
+ final_filepath = call_args.fileobj
+ recv_filepath = self._os_utils.get_temp_filename(final_filepath)
+ on_done_before_calls.append(
+ RenameTempFileHandler(
+ coordinator, final_filepath, recv_filepath, self._os_utils
+ )
+ )
+ else:
+ on_body = OnBodyFileObjWriter(call_args.fileobj)
+
+ make_request_args = self._default_get_make_request_args(
+ request_type=request_type,
+ call_args=call_args,
+ coordinator=coordinator,
+ future=future,
+ on_done_before_calls=on_done_before_calls,
+ on_done_after_calls=on_done_after_calls,
+ )
+ make_request_args['recv_filepath'] = recv_filepath
+ make_request_args['on_body'] = on_body
+ make_request_args['checksum_config'] = checksum_config
+ return make_request_args
+
+ def _default_get_make_request_args(
+ self,
+ request_type,
+ call_args,
+ coordinator,
+ future,
+ on_done_before_calls,
+ on_done_after_calls,
+ ):
+ make_request_args = {
+ 'request': self._request_serializer.serialize_http_request(
+ request_type, future
+ ),
+ 'type': getattr(
+ S3RequestType, request_type.upper(), S3RequestType.DEFAULT
+ ),
+ 'on_done': self.get_crt_callback(
+ future, 'done', on_done_before_calls, on_done_after_calls
+ ),
+ 'on_progress': self.get_crt_callback(future, 'progress'),
+ }
+
+ # For DEFAULT requests, CRT requires the official S3 operation name.
+ # So transform string like "delete_object" -> "DeleteObject".
+ if make_request_args['type'] == S3RequestType.DEFAULT:
+ make_request_args['operation_name'] = ''.join(
+ x.title() for x in request_type.split('_')
+ )
+
+ arn_handler = _S3ArnParamHandler()
+ if (
+ accesspoint_arn_details := arn_handler.handle_arn(call_args.bucket)
+ ) and accesspoint_arn_details['region'] == "":
+ # Configure our region to `*` to propogate in `x-amz-region-set`
+ # for multi-region support in MRAP accesspoints.
+ # use_double_uri_encode and should_normalize_uri_path are defaulted to be True
+ # But SDK already encoded the URI, and it's for S3, so set both to False
+ make_request_args['signing_config'] = AwsSigningConfig(
+ algorithm=AwsSigningAlgorithm.V4_ASYMMETRIC,
+ region="*",
+ use_double_uri_encode=False,
+ should_normalize_uri_path=False,
+ )
+ call_args.bucket = accesspoint_arn_details['resource_name']
+ elif is_s3express_bucket(call_args.bucket):
+ # use_double_uri_encode and should_normalize_uri_path are defaulted to be True
+ # But SDK already encoded the URI, and it's for S3, so set both to False
+ make_request_args['signing_config'] = AwsSigningConfig(
+ algorithm=AwsSigningAlgorithm.V4_S3EXPRESS,
+ use_double_uri_encode=False,
+ should_normalize_uri_path=False,
+ )
+ return make_request_args
+
+
+class RenameTempFileHandler:
+ def __init__(self, coordinator, final_filename, temp_filename, osutil):
+ self._coordinator = coordinator
+ self._final_filename = final_filename
+ self._temp_filename = temp_filename
+ self._osutil = osutil
+
+ def __call__(self, **kwargs):
+ error = kwargs['error']
+ if error:
+ self._osutil.remove_file(self._temp_filename)
+ else:
+ try:
+ self._osutil.rename_file(
+ self._temp_filename, self._final_filename
+ )
+ except Exception as e:
+ self._osutil.remove_file(self._temp_filename)
+ # the CRT future has done already at this point
+ self._coordinator.set_exception(e)
+
+
+class AfterDoneHandler:
+ def __init__(self, coordinator):
+ self._coordinator = coordinator
+
+ def __call__(self, **kwargs):
+ self._coordinator.set_done_callbacks_complete()
+
+
+class OnBodyFileObjWriter:
+ def __init__(self, fileobj):
+ self._fileobj = fileobj
+
+ def __call__(self, chunk, **kwargs):
+ self._fileobj.write(chunk)
+
+
+class _S3ArnParamHandler:
+ """Partial port of S3ArnParamHandler from botocore.
+
+ This is used to make a determination on MRAP accesspoints for signing
+ purposes. This should be safe to remove once we properly integrate auth
+ resolution from Botocore into the CRT transfer integration.
+ """
+
+ _RESOURCE_REGEX = re.compile(
+ r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$'
+ )
+
+ def __init__(self):
+ self._arn_parser = ArnParser()
+
+ def handle_arn(self, bucket):
+ arn_details = self._get_arn_details_from_bucket(bucket)
+ if arn_details is None:
+ return
+ if arn_details['resource_type'] == 'accesspoint':
+ return arn_details
+
+ def _get_arn_details_from_bucket(self, bucket):
+ try:
+ arn_details = self._arn_parser.parse_arn(bucket)
+ self._add_resource_type_and_name(arn_details)
+ return arn_details
+ except InvalidArnException:
+ pass
+ return None
+
+ def _add_resource_type_and_name(self, arn_details):
+ match = self._RESOURCE_REGEX.match(arn_details['resource'])
+ if match:
+ arn_details['resource_type'] = match.group('resource_type')
+ arn_details['resource_name'] = match.group('resource_name')
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/delete.py b/.venv/lib/python3.12/site-packages/s3transfer/delete.py
new file mode 100644
index 00000000..74084d31
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/delete.py
@@ -0,0 +1,71 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+from s3transfer.tasks import SubmissionTask, Task
+
+
+class DeleteSubmissionTask(SubmissionTask):
+ """Task for submitting tasks to execute an object deletion."""
+
+ def _submit(self, client, request_executor, transfer_future, **kwargs):
+ """
+ :param client: The client associated with the transfer manager
+
+ :type config: s3transfer.manager.TransferConfig
+ :param config: The transfer config associated with the transfer
+ manager
+
+ :type osutil: s3transfer.utils.OSUtil
+ :param osutil: The os utility associated to the transfer manager
+
+ :type request_executor: s3transfer.futures.BoundedExecutor
+ :param request_executor: The request executor associated with the
+ transfer manager
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future associated with the
+ transfer request that tasks are being submitted for
+ """
+ call_args = transfer_future.meta.call_args
+
+ self._transfer_coordinator.submit(
+ request_executor,
+ DeleteObjectTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'extra_args': call_args.extra_args,
+ },
+ is_final=True,
+ ),
+ )
+
+
+class DeleteObjectTask(Task):
+ def _main(self, client, bucket, key, extra_args):
+ """
+
+ :param client: The S3 client to use when calling DeleteObject
+
+ :type bucket: str
+ :param bucket: The name of the bucket.
+
+ :type key: str
+ :param key: The name of the object to delete.
+
+ :type extra_args: dict
+ :param extra_args: Extra arguments to pass to the DeleteObject call.
+
+ """
+ client.delete_object(Bucket=bucket, Key=key, **extra_args)
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/download.py b/.venv/lib/python3.12/site-packages/s3transfer/download.py
new file mode 100644
index 00000000..e1a9cf0e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/download.py
@@ -0,0 +1,788 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import heapq
+import logging
+import threading
+
+from s3transfer.compat import seekable
+from s3transfer.exceptions import RetriesExceededError
+from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG
+from s3transfer.tasks import SubmissionTask, Task
+from s3transfer.utils import (
+ S3_RETRYABLE_DOWNLOAD_ERRORS,
+ CountCallbackInvoker,
+ DeferredOpenFile,
+ FunctionContainer,
+ StreamReaderProgress,
+ calculate_num_parts,
+ calculate_range_parameter,
+ get_callbacks,
+ invoke_progress_callbacks,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class DownloadOutputManager:
+ """Base manager class for handling various types of files for downloads
+
+ This class is typically used for the DownloadSubmissionTask class to help
+ determine the following:
+
+ * Provides the fileobj to write to downloads to
+ * Get a task to complete once everything downloaded has been written
+
+ The answers/implementations differ for the various types of file outputs
+ that may be accepted. All implementations must subclass and override
+ public methods from this class.
+ """
+
+ def __init__(self, osutil, transfer_coordinator, io_executor):
+ self._osutil = osutil
+ self._transfer_coordinator = transfer_coordinator
+ self._io_executor = io_executor
+
+ @classmethod
+ def is_compatible(cls, download_target, osutil):
+ """Determines if the target for the download is compatible with manager
+
+ :param download_target: The target for which the upload will write
+ data to.
+
+ :param osutil: The os utility to be used for the transfer
+
+ :returns: True if the manager can handle the type of target specified
+ otherwise returns False.
+ """
+ raise NotImplementedError('must implement is_compatible()')
+
+ def get_download_task_tag(self):
+ """Get the tag (if any) to associate all GetObjectTasks
+
+ :rtype: s3transfer.futures.TaskTag
+ :returns: The tag to associate all GetObjectTasks with
+ """
+ return None
+
+ def get_fileobj_for_io_writes(self, transfer_future):
+ """Get file-like object to use for io writes in the io executor
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The future associated with upload request
+
+ returns: A file-like object to write to
+ """
+ raise NotImplementedError('must implement get_fileobj_for_io_writes()')
+
+ def queue_file_io_task(self, fileobj, data, offset):
+ """Queue IO write for submission to the IO executor.
+
+ This method accepts an IO executor and information about the
+ downloaded data, and handles submitting this to the IO executor.
+
+ This method may defer submission to the IO executor if necessary.
+
+ """
+ self._transfer_coordinator.submit(
+ self._io_executor, self.get_io_write_task(fileobj, data, offset)
+ )
+
+ def get_io_write_task(self, fileobj, data, offset):
+ """Get an IO write task for the requested set of data
+
+ This task can be ran immediately or be submitted to the IO executor
+ for it to run.
+
+ :type fileobj: file-like object
+ :param fileobj: The file-like object to write to
+
+ :type data: bytes
+ :param data: The data to write out
+
+ :type offset: integer
+ :param offset: The offset to write the data to in the file-like object
+
+ :returns: An IO task to be used to write data to a file-like object
+ """
+ return IOWriteTask(
+ self._transfer_coordinator,
+ main_kwargs={
+ 'fileobj': fileobj,
+ 'data': data,
+ 'offset': offset,
+ },
+ )
+
+ def get_final_io_task(self):
+ """Get the final io task to complete the download
+
+ This is needed because based on the architecture of the TransferManager
+ the final tasks will be sent to the IO executor, but the executor
+ needs a final task for it to signal that the transfer is done and
+ all done callbacks can be run.
+
+ :rtype: s3transfer.tasks.Task
+ :returns: A final task to completed in the io executor
+ """
+ raise NotImplementedError('must implement get_final_io_task()')
+
+ def _get_fileobj_from_filename(self, filename):
+ f = DeferredOpenFile(
+ filename, mode='wb', open_function=self._osutil.open
+ )
+ # Make sure the file gets closed and we remove the temporary file
+ # if anything goes wrong during the process.
+ self._transfer_coordinator.add_failure_cleanup(f.close)
+ return f
+
+
+class DownloadFilenameOutputManager(DownloadOutputManager):
+ def __init__(self, osutil, transfer_coordinator, io_executor):
+ super().__init__(osutil, transfer_coordinator, io_executor)
+ self._final_filename = None
+ self._temp_filename = None
+ self._temp_fileobj = None
+
+ @classmethod
+ def is_compatible(cls, download_target, osutil):
+ return isinstance(download_target, str)
+
+ def get_fileobj_for_io_writes(self, transfer_future):
+ fileobj = transfer_future.meta.call_args.fileobj
+ self._final_filename = fileobj
+ self._temp_filename = self._osutil.get_temp_filename(fileobj)
+ self._temp_fileobj = self._get_temp_fileobj()
+ return self._temp_fileobj
+
+ def get_final_io_task(self):
+ # A task to rename the file from the temporary file to its final
+ # location is needed. This should be the last task needed to complete
+ # the download.
+ return IORenameFileTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'fileobj': self._temp_fileobj,
+ 'final_filename': self._final_filename,
+ 'osutil': self._osutil,
+ },
+ is_final=True,
+ )
+
+ def _get_temp_fileobj(self):
+ f = self._get_fileobj_from_filename(self._temp_filename)
+ self._transfer_coordinator.add_failure_cleanup(
+ self._osutil.remove_file, self._temp_filename
+ )
+ return f
+
+
+class DownloadSeekableOutputManager(DownloadOutputManager):
+ @classmethod
+ def is_compatible(cls, download_target, osutil):
+ return seekable(download_target)
+
+ def get_fileobj_for_io_writes(self, transfer_future):
+ # Return the fileobj provided to the future.
+ return transfer_future.meta.call_args.fileobj
+
+ def get_final_io_task(self):
+ # This task will serve the purpose of signaling when all of the io
+ # writes have finished so done callbacks can be called.
+ return CompleteDownloadNOOPTask(
+ transfer_coordinator=self._transfer_coordinator
+ )
+
+
+class DownloadNonSeekableOutputManager(DownloadOutputManager):
+ def __init__(
+ self, osutil, transfer_coordinator, io_executor, defer_queue=None
+ ):
+ super().__init__(osutil, transfer_coordinator, io_executor)
+ if defer_queue is None:
+ defer_queue = DeferQueue()
+ self._defer_queue = defer_queue
+ self._io_submit_lock = threading.Lock()
+
+ @classmethod
+ def is_compatible(cls, download_target, osutil):
+ return hasattr(download_target, 'write')
+
+ def get_download_task_tag(self):
+ return IN_MEMORY_DOWNLOAD_TAG
+
+ def get_fileobj_for_io_writes(self, transfer_future):
+ return transfer_future.meta.call_args.fileobj
+
+ def get_final_io_task(self):
+ return CompleteDownloadNOOPTask(
+ transfer_coordinator=self._transfer_coordinator
+ )
+
+ def queue_file_io_task(self, fileobj, data, offset):
+ with self._io_submit_lock:
+ writes = self._defer_queue.request_writes(offset, data)
+ for write in writes:
+ data = write['data']
+ logger.debug(
+ "Queueing IO offset %s for fileobj: %s",
+ write['offset'],
+ fileobj,
+ )
+ super().queue_file_io_task(fileobj, data, offset)
+
+ def get_io_write_task(self, fileobj, data, offset):
+ return IOStreamingWriteTask(
+ self._transfer_coordinator,
+ main_kwargs={
+ 'fileobj': fileobj,
+ 'data': data,
+ },
+ )
+
+
+class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager):
+ def __init__(
+ self, osutil, transfer_coordinator, io_executor, defer_queue=None
+ ):
+ super().__init__(
+ osutil, transfer_coordinator, io_executor, defer_queue
+ )
+ self._fileobj = None
+
+ @classmethod
+ def is_compatible(cls, download_target, osutil):
+ return isinstance(download_target, str) and osutil.is_special_file(
+ download_target
+ )
+
+ def get_fileobj_for_io_writes(self, transfer_future):
+ filename = transfer_future.meta.call_args.fileobj
+ self._fileobj = self._get_fileobj_from_filename(filename)
+ return self._fileobj
+
+ def get_final_io_task(self):
+ # Make sure the file gets closed once the transfer is done.
+ return IOCloseTask(
+ transfer_coordinator=self._transfer_coordinator,
+ is_final=True,
+ main_kwargs={'fileobj': self._fileobj},
+ )
+
+
+class DownloadSubmissionTask(SubmissionTask):
+ """Task for submitting tasks to execute a download"""
+
+ def _get_download_output_manager_cls(self, transfer_future, osutil):
+ """Retrieves a class for managing output for a download
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future for the request
+
+ :type osutil: s3transfer.utils.OSUtils
+ :param osutil: The os utility associated to the transfer
+
+ :rtype: class of DownloadOutputManager
+ :returns: The appropriate class to use for managing a specific type of
+ input for downloads.
+ """
+ download_manager_resolver_chain = [
+ DownloadSpecialFilenameOutputManager,
+ DownloadFilenameOutputManager,
+ DownloadSeekableOutputManager,
+ DownloadNonSeekableOutputManager,
+ ]
+
+ fileobj = transfer_future.meta.call_args.fileobj
+ for download_manager_cls in download_manager_resolver_chain:
+ if download_manager_cls.is_compatible(fileobj, osutil):
+ return download_manager_cls
+ raise RuntimeError(
+ f'Output {fileobj} of type: {type(fileobj)} is not supported.'
+ )
+
+ def _submit(
+ self,
+ client,
+ config,
+ osutil,
+ request_executor,
+ io_executor,
+ transfer_future,
+ bandwidth_limiter=None,
+ ):
+ """
+ :param client: The client associated with the transfer manager
+
+ :type config: s3transfer.manager.TransferConfig
+ :param config: The transfer config associated with the transfer
+ manager
+
+ :type osutil: s3transfer.utils.OSUtil
+ :param osutil: The os utility associated to the transfer manager
+
+ :type request_executor: s3transfer.futures.BoundedExecutor
+ :param request_executor: The request executor associated with the
+ transfer manager
+
+ :type io_executor: s3transfer.futures.BoundedExecutor
+ :param io_executor: The io executor associated with the
+ transfer manager
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future associated with the
+ transfer request that tasks are being submitted for
+
+ :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter
+ :param bandwidth_limiter: The bandwidth limiter to use when
+ downloading streams
+ """
+ if transfer_future.meta.size is None:
+ # If a size was not provided figure out the size for the
+ # user.
+ response = client.head_object(
+ Bucket=transfer_future.meta.call_args.bucket,
+ Key=transfer_future.meta.call_args.key,
+ **transfer_future.meta.call_args.extra_args,
+ )
+ transfer_future.meta.provide_transfer_size(
+ response['ContentLength']
+ )
+
+ download_output_manager = self._get_download_output_manager_cls(
+ transfer_future, osutil
+ )(osutil, self._transfer_coordinator, io_executor)
+
+ # If it is greater than threshold do a ranged download, otherwise
+ # do a regular GetObject download.
+ if transfer_future.meta.size < config.multipart_threshold:
+ self._submit_download_request(
+ client,
+ config,
+ osutil,
+ request_executor,
+ io_executor,
+ download_output_manager,
+ transfer_future,
+ bandwidth_limiter,
+ )
+ else:
+ self._submit_ranged_download_request(
+ client,
+ config,
+ osutil,
+ request_executor,
+ io_executor,
+ download_output_manager,
+ transfer_future,
+ bandwidth_limiter,
+ )
+
+ def _submit_download_request(
+ self,
+ client,
+ config,
+ osutil,
+ request_executor,
+ io_executor,
+ download_output_manager,
+ transfer_future,
+ bandwidth_limiter,
+ ):
+ call_args = transfer_future.meta.call_args
+
+ # Get a handle to the file that will be used for writing downloaded
+ # contents
+ fileobj = download_output_manager.get_fileobj_for_io_writes(
+ transfer_future
+ )
+
+ # Get the needed callbacks for the task
+ progress_callbacks = get_callbacks(transfer_future, 'progress')
+
+ # Get any associated tags for the get object task.
+ get_object_tag = download_output_manager.get_download_task_tag()
+
+ # Get the final io task to run once the download is complete.
+ final_task = download_output_manager.get_final_io_task()
+
+ # Submit the task to download the object.
+ self._transfer_coordinator.submit(
+ request_executor,
+ ImmediatelyWriteIOGetObjectTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'fileobj': fileobj,
+ 'extra_args': call_args.extra_args,
+ 'callbacks': progress_callbacks,
+ 'max_attempts': config.num_download_attempts,
+ 'download_output_manager': download_output_manager,
+ 'io_chunksize': config.io_chunksize,
+ 'bandwidth_limiter': bandwidth_limiter,
+ },
+ done_callbacks=[final_task],
+ ),
+ tag=get_object_tag,
+ )
+
+ def _submit_ranged_download_request(
+ self,
+ client,
+ config,
+ osutil,
+ request_executor,
+ io_executor,
+ download_output_manager,
+ transfer_future,
+ bandwidth_limiter,
+ ):
+ call_args = transfer_future.meta.call_args
+
+ # Get the needed progress callbacks for the task
+ progress_callbacks = get_callbacks(transfer_future, 'progress')
+
+ # Get a handle to the file that will be used for writing downloaded
+ # contents
+ fileobj = download_output_manager.get_fileobj_for_io_writes(
+ transfer_future
+ )
+
+ # Determine the number of parts
+ part_size = config.multipart_chunksize
+ num_parts = calculate_num_parts(transfer_future.meta.size, part_size)
+
+ # Get any associated tags for the get object task.
+ get_object_tag = download_output_manager.get_download_task_tag()
+
+ # Callback invoker to submit the final io task once all downloads
+ # are complete.
+ finalize_download_invoker = CountCallbackInvoker(
+ self._get_final_io_task_submission_callback(
+ download_output_manager, io_executor
+ )
+ )
+ for i in range(num_parts):
+ # Calculate the range parameter
+ range_parameter = calculate_range_parameter(
+ part_size, i, num_parts
+ )
+
+ # Inject the Range parameter to the parameters to be passed in
+ # as extra args
+ extra_args = {'Range': range_parameter}
+ extra_args.update(call_args.extra_args)
+ finalize_download_invoker.increment()
+ # Submit the ranged downloads
+ self._transfer_coordinator.submit(
+ request_executor,
+ GetObjectTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'fileobj': fileobj,
+ 'extra_args': extra_args,
+ 'callbacks': progress_callbacks,
+ 'max_attempts': config.num_download_attempts,
+ 'start_index': i * part_size,
+ 'download_output_manager': download_output_manager,
+ 'io_chunksize': config.io_chunksize,
+ 'bandwidth_limiter': bandwidth_limiter,
+ },
+ done_callbacks=[finalize_download_invoker.decrement],
+ ),
+ tag=get_object_tag,
+ )
+ finalize_download_invoker.finalize()
+
+ def _get_final_io_task_submission_callback(
+ self, download_manager, io_executor
+ ):
+ final_task = download_manager.get_final_io_task()
+ return FunctionContainer(
+ self._transfer_coordinator.submit, io_executor, final_task
+ )
+
+ def _calculate_range_param(self, part_size, part_index, num_parts):
+ # Used to calculate the Range parameter
+ start_range = part_index * part_size
+ if part_index == num_parts - 1:
+ end_range = ''
+ else:
+ end_range = start_range + part_size - 1
+ range_param = f'bytes={start_range}-{end_range}'
+ return range_param
+
+
+class GetObjectTask(Task):
+ def _main(
+ self,
+ client,
+ bucket,
+ key,
+ fileobj,
+ extra_args,
+ callbacks,
+ max_attempts,
+ download_output_manager,
+ io_chunksize,
+ start_index=0,
+ bandwidth_limiter=None,
+ ):
+ """Downloads an object and places content into io queue
+
+ :param client: The client to use when calling GetObject
+ :param bucket: The bucket to download from
+ :param key: The key to download from
+ :param fileobj: The file handle to write content to
+ :param exta_args: Any extra arguments to include in GetObject request
+ :param callbacks: List of progress callbacks to invoke on download
+ :param max_attempts: The number of retries to do when downloading
+ :param download_output_manager: The download output manager associated
+ with the current download.
+ :param io_chunksize: The size of each io chunk to read from the
+ download stream and queue in the io queue.
+ :param start_index: The location in the file to start writing the
+ content of the key to.
+ :param bandwidth_limiter: The bandwidth limiter to use when throttling
+ the downloading of data in streams.
+ """
+ last_exception = None
+ for i in range(max_attempts):
+ try:
+ current_index = start_index
+ response = client.get_object(
+ Bucket=bucket, Key=key, **extra_args
+ )
+ streaming_body = StreamReaderProgress(
+ response['Body'], callbacks
+ )
+ if bandwidth_limiter:
+ streaming_body = (
+ bandwidth_limiter.get_bandwith_limited_stream(
+ streaming_body, self._transfer_coordinator
+ )
+ )
+
+ chunks = DownloadChunkIterator(streaming_body, io_chunksize)
+ for chunk in chunks:
+ # If the transfer is done because of a cancellation
+ # or error somewhere else, stop trying to submit more
+ # data to be written and break out of the download.
+ if not self._transfer_coordinator.done():
+ self._handle_io(
+ download_output_manager,
+ fileobj,
+ chunk,
+ current_index,
+ )
+ current_index += len(chunk)
+ else:
+ return
+ return
+ except S3_RETRYABLE_DOWNLOAD_ERRORS as e:
+ logger.debug(
+ "Retrying exception caught (%s), "
+ "retrying request, (attempt %s / %s)",
+ e,
+ i,
+ max_attempts,
+ exc_info=True,
+ )
+ last_exception = e
+ # Also invoke the progress callbacks to indicate that we
+ # are trying to download the stream again and all progress
+ # for this GetObject has been lost.
+ invoke_progress_callbacks(
+ callbacks, start_index - current_index
+ )
+ continue
+ raise RetriesExceededError(last_exception)
+
+ def _handle_io(self, download_output_manager, fileobj, chunk, index):
+ download_output_manager.queue_file_io_task(fileobj, chunk, index)
+
+
+class ImmediatelyWriteIOGetObjectTask(GetObjectTask):
+ """GetObjectTask that immediately writes to the provided file object
+
+ This is useful for downloads where it is known only one thread is
+ downloading the object so there is no reason to go through the
+ overhead of using an IO queue and executor.
+ """
+
+ def _handle_io(self, download_output_manager, fileobj, chunk, index):
+ task = download_output_manager.get_io_write_task(fileobj, chunk, index)
+ task()
+
+
+class IOWriteTask(Task):
+ def _main(self, fileobj, data, offset):
+ """Pulls off an io queue to write contents to a file
+
+ :param fileobj: The file handle to write content to
+ :param data: The data to write
+ :param offset: The offset to write the data to.
+ """
+ fileobj.seek(offset)
+ fileobj.write(data)
+
+
+class IOStreamingWriteTask(Task):
+ """Task for writing data to a non-seekable stream."""
+
+ def _main(self, fileobj, data):
+ """Write data to a fileobj.
+
+ Data will be written directly to the fileobj without
+ any prior seeking.
+
+ :param fileobj: The fileobj to write content to
+ :param data: The data to write
+
+ """
+ fileobj.write(data)
+
+
+class IORenameFileTask(Task):
+ """A task to rename a temporary file to its final filename
+
+ :param fileobj: The file handle that content was written to.
+ :param final_filename: The final name of the file to rename to
+ upon completion of writing the contents.
+ :param osutil: OS utility
+ """
+
+ def _main(self, fileobj, final_filename, osutil):
+ fileobj.close()
+ osutil.rename_file(fileobj.name, final_filename)
+
+
+class IOCloseTask(Task):
+ """A task to close out a file once the download is complete.
+
+ :param fileobj: The fileobj to close.
+ """
+
+ def _main(self, fileobj):
+ fileobj.close()
+
+
+class CompleteDownloadNOOPTask(Task):
+ """A NOOP task to serve as an indicator that the download is complete
+
+ Note that the default for is_final is set to True because this should
+ always be the last task.
+ """
+
+ def __init__(
+ self,
+ transfer_coordinator,
+ main_kwargs=None,
+ pending_main_kwargs=None,
+ done_callbacks=None,
+ is_final=True,
+ ):
+ super().__init__(
+ transfer_coordinator=transfer_coordinator,
+ main_kwargs=main_kwargs,
+ pending_main_kwargs=pending_main_kwargs,
+ done_callbacks=done_callbacks,
+ is_final=is_final,
+ )
+
+ def _main(self):
+ pass
+
+
+class DownloadChunkIterator:
+ def __init__(self, body, chunksize):
+ """Iterator to chunk out a downloaded S3 stream
+
+ :param body: A readable file-like object
+ :param chunksize: The amount to read each time
+ """
+ self._body = body
+ self._chunksize = chunksize
+ self._num_reads = 0
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ chunk = self._body.read(self._chunksize)
+ self._num_reads += 1
+ if chunk:
+ return chunk
+ elif self._num_reads == 1:
+ # Even though the response may have not had any
+ # content, we still want to account for an empty object's
+ # existence so return the empty chunk for that initial
+ # read.
+ return chunk
+ raise StopIteration()
+
+ next = __next__
+
+
+class DeferQueue:
+ """IO queue that defers write requests until they are queued sequentially.
+
+ This class is used to track IO data for a *single* fileobj.
+
+ You can send data to this queue, and it will defer any IO write requests
+ until it has the next contiguous block available (starting at 0).
+
+ """
+
+ def __init__(self):
+ self._writes = []
+ self._pending_offsets = set()
+ self._next_offset = 0
+
+ def request_writes(self, offset, data):
+ """Request any available writes given new incoming data.
+
+ You call this method by providing new data along with the
+ offset associated with the data. If that new data unlocks
+ any contiguous writes that can now be submitted, this
+ method will return all applicable writes.
+
+ This is done with 1 method call so you don't have to
+ make two method calls (put(), get()) which acquires a lock
+ each method call.
+
+ """
+ if offset < self._next_offset:
+ # This is a request for a write that we've already
+ # seen. This can happen in the event of a retry
+ # where if we retry at at offset N/2, we'll requeue
+ # offsets 0-N/2 again.
+ return []
+ writes = []
+ if offset in self._pending_offsets:
+ # We've already queued this offset so this request is
+ # a duplicate. In this case we should ignore
+ # this request and prefer what's already queued.
+ return []
+ heapq.heappush(self._writes, (offset, data))
+ self._pending_offsets.add(offset)
+ while self._writes and self._writes[0][0] == self._next_offset:
+ next_write = heapq.heappop(self._writes)
+ writes.append({'offset': next_write[0], 'data': next_write[1]})
+ self._pending_offsets.remove(next_write[0])
+ self._next_offset += len(next_write[1])
+ return writes
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/exceptions.py b/.venv/lib/python3.12/site-packages/s3transfer/exceptions.py
new file mode 100644
index 00000000..6150fe65
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/exceptions.py
@@ -0,0 +1,37 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+from concurrent.futures import CancelledError
+
+
+class RetriesExceededError(Exception):
+ def __init__(self, last_exception, msg='Max Retries Exceeded'):
+ super().__init__(msg)
+ self.last_exception = last_exception
+
+
+class S3UploadFailedError(Exception):
+ pass
+
+
+class InvalidSubscriberMethodError(Exception):
+ pass
+
+
+class TransferNotDoneError(Exception):
+ pass
+
+
+class FatalError(CancelledError):
+ """A CancelledError raised from an error in the TransferManager"""
+
+ pass
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/futures.py b/.venv/lib/python3.12/site-packages/s3transfer/futures.py
new file mode 100644
index 00000000..9ab30a7a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/futures.py
@@ -0,0 +1,613 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import copy
+import logging
+import sys
+import threading
+from collections import namedtuple
+from concurrent import futures
+
+from s3transfer.compat import MAXINT
+from s3transfer.exceptions import CancelledError, TransferNotDoneError
+from s3transfer.utils import FunctionContainer, TaskSemaphore
+
+try:
+ from botocore.context import get_context
+except ImportError:
+
+ def get_context():
+ return None
+
+
+logger = logging.getLogger(__name__)
+
+
+class BaseTransferFuture:
+ @property
+ def meta(self):
+ """The metadata associated to the TransferFuture"""
+ raise NotImplementedError('meta')
+
+ def done(self):
+ """Determines if a TransferFuture has completed
+
+ :returns: True if completed. False, otherwise.
+ """
+ raise NotImplementedError('done()')
+
+ def result(self):
+ """Waits until TransferFuture is done and returns the result
+
+ If the TransferFuture succeeded, it will return the result. If the
+ TransferFuture failed, it will raise the exception associated to the
+ failure.
+ """
+ raise NotImplementedError('result()')
+
+ def cancel(self):
+ """Cancels the request associated with the TransferFuture"""
+ raise NotImplementedError('cancel()')
+
+
+class BaseTransferMeta:
+ @property
+ def call_args(self):
+ """The call args used in the transfer request"""
+ raise NotImplementedError('call_args')
+
+ @property
+ def transfer_id(self):
+ """The unique id of the transfer"""
+ raise NotImplementedError('transfer_id')
+
+ @property
+ def user_context(self):
+ """A dictionary that requesters can store data in"""
+ raise NotImplementedError('user_context')
+
+
+class TransferFuture(BaseTransferFuture):
+ def __init__(self, meta=None, coordinator=None):
+ """The future associated to a submitted transfer request
+
+ :type meta: TransferMeta
+ :param meta: The metadata associated to the request. This object
+ is visible to the requester.
+
+ :type coordinator: TransferCoordinator
+ :param coordinator: The coordinator associated to the request. This
+ object is not visible to the requester.
+ """
+ self._meta = meta
+ if meta is None:
+ self._meta = TransferMeta()
+
+ self._coordinator = coordinator
+ if coordinator is None:
+ self._coordinator = TransferCoordinator()
+
+ @property
+ def meta(self):
+ return self._meta
+
+ def done(self):
+ return self._coordinator.done()
+
+ def result(self):
+ try:
+ # Usually the result() method blocks until the transfer is done,
+ # however if a KeyboardInterrupt is raised we want want to exit
+ # out of this and propagate the exception.
+ return self._coordinator.result()
+ except KeyboardInterrupt as e:
+ self.cancel()
+ raise e
+
+ def cancel(self):
+ self._coordinator.cancel()
+
+ def set_exception(self, exception):
+ """Sets the exception on the future."""
+ if not self.done():
+ raise TransferNotDoneError(
+ 'set_exception can only be called once the transfer is '
+ 'complete.'
+ )
+ self._coordinator.set_exception(exception, override=True)
+
+
+class TransferMeta(BaseTransferMeta):
+ """Holds metadata about the TransferFuture"""
+
+ def __init__(self, call_args=None, transfer_id=None):
+ self._call_args = call_args
+ self._transfer_id = transfer_id
+ self._size = None
+ self._user_context = {}
+
+ @property
+ def call_args(self):
+ """The call args used in the transfer request"""
+ return self._call_args
+
+ @property
+ def transfer_id(self):
+ """The unique id of the transfer"""
+ return self._transfer_id
+
+ @property
+ def size(self):
+ """The size of the transfer request if known"""
+ return self._size
+
+ @property
+ def user_context(self):
+ """A dictionary that requesters can store data in"""
+ return self._user_context
+
+ def provide_transfer_size(self, size):
+ """A method to provide the size of a transfer request
+
+ By providing this value, the TransferManager will not try to
+ call HeadObject or use the use OS to determine the size of the
+ transfer.
+ """
+ self._size = size
+
+
+class TransferCoordinator:
+ """A helper class for managing TransferFuture"""
+
+ def __init__(self, transfer_id=None):
+ self.transfer_id = transfer_id
+ self._status = 'not-started'
+ self._result = None
+ self._exception = None
+ self._associated_futures = set()
+ self._failure_cleanups = []
+ self._done_callbacks = []
+ self._done_event = threading.Event()
+ self._lock = threading.Lock()
+ self._associated_futures_lock = threading.Lock()
+ self._done_callbacks_lock = threading.Lock()
+ self._failure_cleanups_lock = threading.Lock()
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}(transfer_id={self.transfer_id})'
+
+ @property
+ def exception(self):
+ return self._exception
+
+ @property
+ def associated_futures(self):
+ """The list of futures associated to the inprogress TransferFuture
+
+ Once the transfer finishes this list becomes empty as the transfer
+ is considered done and there should be no running futures left.
+ """
+ with self._associated_futures_lock:
+ # We return a copy of the list because we do not want to
+ # processing the returned list while another thread is adding
+ # more futures to the actual list.
+ return copy.copy(self._associated_futures)
+
+ @property
+ def failure_cleanups(self):
+ """The list of callbacks to call when the TransferFuture fails"""
+ return self._failure_cleanups
+
+ @property
+ def status(self):
+ """The status of the TransferFuture
+
+ The currently supported states are:
+ * not-started - Has yet to start. If in this state, a transfer
+ can be canceled immediately and nothing will happen.
+ * queued - SubmissionTask is about to submit tasks
+ * running - Is inprogress. In-progress as of now means that
+ the SubmissionTask that runs the transfer is being executed. So
+ there is no guarantee any transfer requests had been made to
+ S3 if this state is reached.
+ * cancelled - Was cancelled
+ * failed - An exception other than CancelledError was thrown
+ * success - No exceptions were thrown and is done.
+ """
+ return self._status
+
+ def set_result(self, result):
+ """Set a result for the TransferFuture
+
+ Implies that the TransferFuture succeeded. This will always set a
+ result because it is invoked on the final task where there is only
+ ever one final task and it is ran at the very end of a transfer
+ process. So if a result is being set for this final task, the transfer
+ succeeded even if something came a long and canceled the transfer
+ on the final task.
+ """
+ with self._lock:
+ self._exception = None
+ self._result = result
+ self._status = 'success'
+
+ def set_exception(self, exception, override=False):
+ """Set an exception for the TransferFuture
+
+ Implies the TransferFuture failed.
+
+ :param exception: The exception that cause the transfer to fail.
+ :param override: If True, override any existing state.
+ """
+ with self._lock:
+ if not self.done() or override:
+ self._exception = exception
+ self._status = 'failed'
+
+ def result(self):
+ """Waits until TransferFuture is done and returns the result
+
+ If the TransferFuture succeeded, it will return the result. If the
+ TransferFuture failed, it will raise the exception associated to the
+ failure.
+ """
+ # Doing a wait() with no timeout cannot be interrupted in python2 but
+ # can be interrupted in python3 so we just wait with the largest
+ # possible value integer value, which is on the scale of billions of
+ # years...
+ self._done_event.wait(MAXINT)
+
+ # Once done waiting, raise an exception if present or return the
+ # final result.
+ if self._exception:
+ raise self._exception
+ return self._result
+
+ def cancel(self, msg='', exc_type=CancelledError):
+ """Cancels the TransferFuture
+
+ :param msg: The message to attach to the cancellation
+ :param exc_type: The type of exception to set for the cancellation
+ """
+ with self._lock:
+ if not self.done():
+ should_announce_done = False
+ logger.debug('%s cancel(%s) called', self, msg)
+ self._exception = exc_type(msg)
+ if self._status == 'not-started':
+ should_announce_done = True
+ self._status = 'cancelled'
+ if should_announce_done:
+ self.announce_done()
+
+ def set_status_to_queued(self):
+ """Sets the TransferFutrue's status to running"""
+ self._transition_to_non_done_state('queued')
+
+ def set_status_to_running(self):
+ """Sets the TransferFuture's status to running"""
+ self._transition_to_non_done_state('running')
+
+ def _transition_to_non_done_state(self, desired_state):
+ with self._lock:
+ if self.done():
+ raise RuntimeError(
+ f'Unable to transition from done state {self.status} to non-done '
+ f'state {desired_state}.'
+ )
+ self._status = desired_state
+
+ def submit(self, executor, task, tag=None):
+ """Submits a task to a provided executor
+
+ :type executor: s3transfer.futures.BoundedExecutor
+ :param executor: The executor to submit the callable to
+
+ :type task: s3transfer.tasks.Task
+ :param task: The task to submit to the executor
+
+ :type tag: s3transfer.futures.TaskTag
+ :param tag: A tag to associate to the submitted task
+
+ :rtype: concurrent.futures.Future
+ :returns: A future representing the submitted task
+ """
+ logger.debug(
+ f"Submitting task {task} to executor {executor} for transfer request: {self.transfer_id}."
+ )
+ future = executor.submit(task, tag=tag)
+ # Add this created future to the list of associated future just
+ # in case it is needed during cleanups.
+ self.add_associated_future(future)
+ future.add_done_callback(
+ FunctionContainer(self.remove_associated_future, future)
+ )
+ return future
+
+ def done(self):
+ """Determines if a TransferFuture has completed
+
+ :returns: False if status is equal to 'failed', 'cancelled', or
+ 'success'. True, otherwise
+ """
+ return self.status in ['failed', 'cancelled', 'success']
+
+ def add_associated_future(self, future):
+ """Adds a future to be associated with the TransferFuture"""
+ with self._associated_futures_lock:
+ self._associated_futures.add(future)
+
+ def remove_associated_future(self, future):
+ """Removes a future's association to the TransferFuture"""
+ with self._associated_futures_lock:
+ self._associated_futures.remove(future)
+
+ def add_done_callback(self, function, *args, **kwargs):
+ """Add a done callback to be invoked when transfer is done"""
+ with self._done_callbacks_lock:
+ self._done_callbacks.append(
+ FunctionContainer(function, *args, **kwargs)
+ )
+
+ def add_failure_cleanup(self, function, *args, **kwargs):
+ """Adds a callback to call upon failure"""
+ with self._failure_cleanups_lock:
+ self._failure_cleanups.append(
+ FunctionContainer(function, *args, **kwargs)
+ )
+
+ def announce_done(self):
+ """Announce that future is done running and run associated callbacks
+
+ This will run any failure cleanups if the transfer failed if not
+ they have not been run, allows the result() to be unblocked, and will
+ run any done callbacks associated to the TransferFuture if they have
+ not already been ran.
+ """
+ if self.status != 'success':
+ self._run_failure_cleanups()
+ self._done_event.set()
+ self._run_done_callbacks()
+
+ def _run_done_callbacks(self):
+ # Run the callbacks and remove the callbacks from the internal
+ # list so they do not get ran again if done is announced more than
+ # once.
+ with self._done_callbacks_lock:
+ self._run_callbacks(self._done_callbacks)
+ self._done_callbacks = []
+
+ def _run_failure_cleanups(self):
+ # Run the cleanup callbacks and remove the callbacks from the internal
+ # list so they do not get ran again if done is announced more than
+ # once.
+ with self._failure_cleanups_lock:
+ self._run_callbacks(self.failure_cleanups)
+ self._failure_cleanups = []
+
+ def _run_callbacks(self, callbacks):
+ for callback in callbacks:
+ self._run_callback(callback)
+
+ def _run_callback(self, callback):
+ try:
+ callback()
+ # We do not want a callback interrupting the process, especially
+ # in the failure cleanups. So log and catch, the exception.
+ except Exception:
+ logger.debug(f"Exception raised in {callback}.", exc_info=True)
+
+
+class BoundedExecutor:
+ EXECUTOR_CLS = futures.ThreadPoolExecutor
+
+ def __init__(
+ self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None
+ ):
+ """An executor implementation that has a maximum queued up tasks
+
+ The executor will block if the number of tasks that have been
+ submitted and is currently working on is past its maximum.
+
+ :params max_size: The maximum number of inflight futures. An inflight
+ future means that the task is either queued up or is currently
+ being executed. A size of None or 0 means that the executor will
+ have no bound in terms of the number of inflight futures.
+
+ :params max_num_threads: The maximum number of threads the executor
+ uses.
+
+ :type tag_semaphores: dict
+ :params tag_semaphores: A dictionary where the key is the name of the
+ tag and the value is the semaphore to use when limiting the
+ number of tasks the executor is processing at a time.
+
+ :type executor_cls: BaseExecutor
+ :param underlying_executor_cls: The executor class that
+ get bounded by this executor. If None is provided, the
+ concurrent.futures.ThreadPoolExecutor class is used.
+ """
+ self._max_num_threads = max_num_threads
+ if executor_cls is None:
+ executor_cls = self.EXECUTOR_CLS
+ self._executor = executor_cls(max_workers=self._max_num_threads)
+ self._semaphore = TaskSemaphore(max_size)
+ self._tag_semaphores = tag_semaphores
+
+ def submit(self, task, tag=None, block=True):
+ """Submit a task to complete
+
+ :type task: s3transfer.tasks.Task
+ :param task: The task to run __call__ on
+
+
+ :type tag: s3transfer.futures.TaskTag
+ :param tag: An optional tag to associate to the task. This
+ is used to override which semaphore to use.
+
+ :type block: boolean
+ :param block: True if to wait till it is possible to submit a task.
+ False, if not to wait and raise an error if not able to submit
+ a task.
+
+ :returns: The future associated to the submitted task
+ """
+ semaphore = self._semaphore
+ # If a tag was provided, use the semaphore associated to that
+ # tag.
+ if tag:
+ semaphore = self._tag_semaphores[tag]
+
+ # Call acquire on the semaphore.
+ acquire_token = semaphore.acquire(task.transfer_id, block)
+ # Create a callback to invoke when task is done in order to call
+ # release on the semaphore.
+ release_callback = FunctionContainer(
+ semaphore.release, task.transfer_id, acquire_token
+ )
+ # Submit the task to the underlying executor.
+ # Pass the current context to ensure child threads persist the
+ # parent thread's context.
+ future = ExecutorFuture(self._executor.submit(task, get_context()))
+ # Add the Semaphore.release() callback to the future such that
+ # it is invoked once the future completes.
+ future.add_done_callback(release_callback)
+ return future
+
+ def shutdown(self, wait=True):
+ self._executor.shutdown(wait)
+
+
+class ExecutorFuture:
+ def __init__(self, future):
+ """A future returned from the executor
+
+ Currently, it is just a wrapper around a concurrent.futures.Future.
+ However, this can eventually grow to implement the needed functionality
+ of concurrent.futures.Future if we move off of the library and not
+ affect the rest of the codebase.
+
+ :type future: concurrent.futures.Future
+ :param future: The underlying future
+ """
+ self._future = future
+
+ def result(self):
+ return self._future.result()
+
+ def add_done_callback(self, fn):
+ """Adds a callback to be completed once future is done
+
+ :param fn: A callable that takes no arguments. Note that is different
+ than concurrent.futures.Future.add_done_callback that requires
+ a single argument for the future.
+ """
+
+ # The done callback for concurrent.futures.Future will always pass a
+ # the future in as the only argument. So we need to create the
+ # proper signature wrapper that will invoke the callback provided.
+ def done_callback(future_passed_to_callback):
+ return fn()
+
+ self._future.add_done_callback(done_callback)
+
+ def done(self):
+ return self._future.done()
+
+
+class BaseExecutor:
+ """Base Executor class implementation needed to work with s3transfer"""
+
+ def __init__(self, max_workers=None):
+ pass
+
+ def submit(self, fn, *args, **kwargs):
+ raise NotImplementedError('submit()')
+
+ def shutdown(self, wait=True):
+ raise NotImplementedError('shutdown()')
+
+
+class NonThreadedExecutor(BaseExecutor):
+ """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
+
+ def submit(self, fn, *args, **kwargs):
+ future = NonThreadedExecutorFuture()
+ try:
+ result = fn(*args, **kwargs)
+ future.set_result(result)
+ except Exception:
+ e, tb = sys.exc_info()[1:]
+ logger.debug(
+ 'Setting exception for %s to %s with traceback %s',
+ future,
+ e,
+ tb,
+ )
+ future.set_exception_info(e, tb)
+ return future
+
+ def shutdown(self, wait=True):
+ pass
+
+
+class NonThreadedExecutorFuture:
+ """The Future returned from NonThreadedExecutor
+
+ Note that this future is **not** thread-safe as it is being used
+ from the context of a non-threaded environment.
+ """
+
+ def __init__(self):
+ self._result = None
+ self._exception = None
+ self._traceback = None
+ self._done = False
+ self._done_callbacks = []
+
+ def set_result(self, result):
+ self._result = result
+ self._set_done()
+
+ def set_exception_info(self, exception, traceback):
+ self._exception = exception
+ self._traceback = traceback
+ self._set_done()
+
+ def result(self, timeout=None):
+ if self._exception:
+ raise self._exception.with_traceback(self._traceback)
+ return self._result
+
+ def _set_done(self):
+ self._done = True
+ for done_callback in self._done_callbacks:
+ self._invoke_done_callback(done_callback)
+ self._done_callbacks = []
+
+ def _invoke_done_callback(self, done_callback):
+ return done_callback(self)
+
+ def done(self):
+ return self._done
+
+ def add_done_callback(self, fn):
+ if self._done:
+ self._invoke_done_callback(fn)
+ else:
+ self._done_callbacks.append(fn)
+
+
+TaskTag = namedtuple('TaskTag', ['name'])
+
+IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
+IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/manager.py b/.venv/lib/python3.12/site-packages/s3transfer/manager.py
new file mode 100644
index 00000000..8e1cdf06
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/manager.py
@@ -0,0 +1,754 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import copy
+import logging
+import re
+import threading
+
+from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
+from s3transfer.constants import (
+ ALLOWED_DOWNLOAD_ARGS,
+ FULL_OBJECT_CHECKSUM_ARGS,
+ KB,
+ MB,
+)
+from s3transfer.copies import CopySubmissionTask
+from s3transfer.delete import DeleteSubmissionTask
+from s3transfer.download import DownloadSubmissionTask
+from s3transfer.exceptions import CancelledError, FatalError
+from s3transfer.futures import (
+ IN_MEMORY_DOWNLOAD_TAG,
+ IN_MEMORY_UPLOAD_TAG,
+ BoundedExecutor,
+ TransferCoordinator,
+ TransferFuture,
+ TransferMeta,
+)
+from s3transfer.upload import UploadSubmissionTask
+from s3transfer.utils import (
+ CallArgs,
+ OSUtils,
+ SlidingWindowSemaphore,
+ TaskSemaphore,
+ get_callbacks,
+ set_default_checksum_algorithm,
+ signal_not_transferring,
+ signal_transferring,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class TransferConfig:
+ def __init__(
+ self,
+ multipart_threshold=8 * MB,
+ multipart_chunksize=8 * MB,
+ max_request_concurrency=10,
+ max_submission_concurrency=5,
+ max_request_queue_size=1000,
+ max_submission_queue_size=1000,
+ max_io_queue_size=1000,
+ io_chunksize=256 * KB,
+ num_download_attempts=5,
+ max_in_memory_upload_chunks=10,
+ max_in_memory_download_chunks=10,
+ max_bandwidth=None,
+ ):
+ """Configurations for the transfer manager
+
+ :param multipart_threshold: The threshold for which multipart
+ transfers occur.
+
+ :param max_request_concurrency: The maximum number of S3 API
+ transfer-related requests that can happen at a time.
+
+ :param max_submission_concurrency: The maximum number of threads
+ processing a call to a TransferManager method. Processing a
+ call usually entails determining which S3 API requests that need
+ to be enqueued, but does **not** entail making any of the
+ S3 API data transferring requests needed to perform the transfer.
+ The threads controlled by ``max_request_concurrency`` is
+ responsible for that.
+
+ :param multipart_chunksize: The size of each transfer if a request
+ becomes a multipart transfer.
+
+ :param max_request_queue_size: The maximum amount of S3 API requests
+ that can be queued at a time.
+
+ :param max_submission_queue_size: The maximum amount of
+ TransferManager method calls that can be queued at a time.
+
+ :param max_io_queue_size: The maximum amount of read parts that
+ can be queued to be written to disk per download. The default
+ size for each elementin this queue is 8 KB.
+
+ :param io_chunksize: The max size of each chunk in the io queue.
+ Currently, this is size used when reading from the downloaded
+ stream as well.
+
+ :param num_download_attempts: The number of download attempts that
+ will be tried upon errors with downloading an object in S3. Note
+ that these retries account for errors that occur when streaming
+ down the data from s3 (i.e. socket errors and read timeouts that
+ occur after receiving an OK response from s3).
+ Other retryable exceptions such as throttling errors and 5xx errors
+ are already retried by botocore (this default is 5). The
+ ``num_download_attempts`` does not take into account the
+ number of exceptions retried by botocore.
+
+ :param max_in_memory_upload_chunks: The number of chunks that can
+ be stored in memory at a time for all ongoing upload requests.
+ This pertains to chunks of data that need to be stored in memory
+ during an upload if the data is sourced from a file-like object.
+ The total maximum memory footprint due to a in-memory upload
+ chunks is roughly equal to:
+
+ max_in_memory_upload_chunks * multipart_chunksize
+ + max_submission_concurrency * multipart_chunksize
+
+ ``max_submission_concurrency`` has an affect on this value because
+ for each thread pulling data off of a file-like object, they may
+ be waiting with a single read chunk to be submitted for upload
+ because the ``max_in_memory_upload_chunks`` value has been reached
+ by the threads making the upload request.
+
+ :param max_in_memory_download_chunks: The number of chunks that can
+ be buffered in memory and **not** in the io queue at a time for all
+ ongoing download requests. This pertains specifically to file-like
+ objects that cannot be seeked. The total maximum memory footprint
+ due to a in-memory download chunks is roughly equal to:
+
+ max_in_memory_download_chunks * multipart_chunksize
+
+ :param max_bandwidth: The maximum bandwidth that will be consumed
+ in uploading and downloading file content. The value is in terms of
+ bytes per second.
+ """
+ self.multipart_threshold = multipart_threshold
+ self.multipart_chunksize = multipart_chunksize
+ self.max_request_concurrency = max_request_concurrency
+ self.max_submission_concurrency = max_submission_concurrency
+ self.max_request_queue_size = max_request_queue_size
+ self.max_submission_queue_size = max_submission_queue_size
+ self.max_io_queue_size = max_io_queue_size
+ self.io_chunksize = io_chunksize
+ self.num_download_attempts = num_download_attempts
+ self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
+ self.max_in_memory_download_chunks = max_in_memory_download_chunks
+ self.max_bandwidth = max_bandwidth
+ self._validate_attrs_are_nonzero()
+
+ def _validate_attrs_are_nonzero(self):
+ for attr, attr_val in self.__dict__.items():
+ if attr_val is not None and attr_val <= 0:
+ raise ValueError(
+ f'Provided parameter {attr} of value {attr_val} must '
+ 'be greater than 0.'
+ )
+
+
+class TransferManager:
+ ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS
+
+ _ALLOWED_SHARED_ARGS = [
+ 'ACL',
+ 'CacheControl',
+ 'ChecksumAlgorithm',
+ 'ContentDisposition',
+ 'ContentEncoding',
+ 'ContentLanguage',
+ 'ContentType',
+ 'ExpectedBucketOwner',
+ 'Expires',
+ 'GrantFullControl',
+ 'GrantRead',
+ 'GrantReadACP',
+ 'GrantWriteACP',
+ 'Metadata',
+ 'ObjectLockLegalHoldStatus',
+ 'ObjectLockMode',
+ 'ObjectLockRetainUntilDate',
+ 'RequestPayer',
+ 'ServerSideEncryption',
+ 'StorageClass',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKey',
+ 'SSECustomerKeyMD5',
+ 'SSEKMSKeyId',
+ 'SSEKMSEncryptionContext',
+ 'Tagging',
+ 'WebsiteRedirectLocation',
+ ]
+
+ ALLOWED_UPLOAD_ARGS = (
+ _ALLOWED_SHARED_ARGS
+ + [
+ 'ChecksumType',
+ 'MpuObjectSize',
+ ]
+ + FULL_OBJECT_CHECKSUM_ARGS
+ )
+
+ ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [
+ 'CopySourceIfMatch',
+ 'CopySourceIfModifiedSince',
+ 'CopySourceIfNoneMatch',
+ 'CopySourceIfUnmodifiedSince',
+ 'CopySourceSSECustomerAlgorithm',
+ 'CopySourceSSECustomerKey',
+ 'CopySourceSSECustomerKeyMD5',
+ 'MetadataDirective',
+ 'TaggingDirective',
+ ]
+
+ ALLOWED_DELETE_ARGS = [
+ 'MFA',
+ 'VersionId',
+ 'RequestPayer',
+ 'ExpectedBucketOwner',
+ ]
+
+ VALIDATE_SUPPORTED_BUCKET_VALUES = True
+
+ _UNSUPPORTED_BUCKET_PATTERNS = {
+ 'S3 Object Lambda': re.compile(
+ r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:'
+ r'accesspoint[/:][a-zA-Z0-9\-]{1,63}'
+ ),
+ }
+
+ def __init__(self, client, config=None, osutil=None, executor_cls=None):
+ """A transfer manager interface for Amazon S3
+
+ :param client: Client to be used by the manager
+ :param config: TransferConfig to associate specific configurations
+ :param osutil: OSUtils object to use for os-related behavior when
+ using with transfer manager.
+
+ :type executor_cls: s3transfer.futures.BaseExecutor
+ :param executor_cls: The class of executor to use with the transfer
+ manager. By default, concurrent.futures.ThreadPoolExecutor is used.
+ """
+ self._client = client
+ self._config = config
+ if config is None:
+ self._config = TransferConfig()
+ self._osutil = osutil
+ if osutil is None:
+ self._osutil = OSUtils()
+ self._coordinator_controller = TransferCoordinatorController()
+ # A counter to create unique id's for each transfer submitted.
+ self._id_counter = 0
+
+ # The executor responsible for making S3 API transfer requests
+ self._request_executor = BoundedExecutor(
+ max_size=self._config.max_request_queue_size,
+ max_num_threads=self._config.max_request_concurrency,
+ tag_semaphores={
+ IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
+ self._config.max_in_memory_upload_chunks
+ ),
+ IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
+ self._config.max_in_memory_download_chunks
+ ),
+ },
+ executor_cls=executor_cls,
+ )
+
+ # The executor responsible for submitting the necessary tasks to
+ # perform the desired transfer
+ self._submission_executor = BoundedExecutor(
+ max_size=self._config.max_submission_queue_size,
+ max_num_threads=self._config.max_submission_concurrency,
+ executor_cls=executor_cls,
+ )
+
+ # There is one thread available for writing to disk. It will handle
+ # downloads for all files.
+ self._io_executor = BoundedExecutor(
+ max_size=self._config.max_io_queue_size,
+ max_num_threads=1,
+ executor_cls=executor_cls,
+ )
+
+ # The component responsible for limiting bandwidth usage if it
+ # is configured.
+ self._bandwidth_limiter = None
+ if self._config.max_bandwidth is not None:
+ logger.debug(
+ 'Setting max_bandwidth to %s', self._config.max_bandwidth
+ )
+ leaky_bucket = LeakyBucket(self._config.max_bandwidth)
+ self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
+
+ self._register_handlers()
+
+ @property
+ def client(self):
+ return self._client
+
+ @property
+ def config(self):
+ return self._config
+
+ def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
+ """Uploads a file to S3
+
+ :type fileobj: str or seekable file-like object
+ :param fileobj: The name of a file to upload or a seekable file-like
+ object to upload. It is recommended to use a filename because
+ file-like objects may result in higher memory usage.
+
+ :type bucket: str
+ :param bucket: The name of the bucket to upload to
+
+ :type key: str
+ :param key: The name of the key to upload to
+
+ :type extra_args: dict
+ :param extra_args: Extra arguments that may be passed to the
+ client operation
+
+ :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
+ :param subscribers: The list of subscribers to be invoked in the
+ order provided based on the event emit during the process of
+ the transfer request.
+
+ :rtype: s3transfer.futures.TransferFuture
+ :returns: Transfer future representing the upload
+ """
+
+ extra_args = extra_args.copy() if extra_args else {}
+ if subscribers is None:
+ subscribers = []
+ self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
+ self._validate_if_bucket_supported(bucket)
+ self._add_operation_defaults(extra_args)
+ call_args = CallArgs(
+ fileobj=fileobj,
+ bucket=bucket,
+ key=key,
+ extra_args=extra_args,
+ subscribers=subscribers,
+ )
+ extra_main_kwargs = {}
+ if self._bandwidth_limiter:
+ extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
+ return self._submit_transfer(
+ call_args, UploadSubmissionTask, extra_main_kwargs
+ )
+
+ def download(
+ self, bucket, key, fileobj, extra_args=None, subscribers=None
+ ):
+ """Downloads a file from S3
+
+ :type bucket: str
+ :param bucket: The name of the bucket to download from
+
+ :type key: str
+ :param key: The name of the key to download from
+
+ :type fileobj: str or seekable file-like object
+ :param fileobj: The name of a file to download or a seekable file-like
+ object to download. It is recommended to use a filename because
+ file-like objects may result in higher memory usage.
+
+ :type extra_args: dict
+ :param extra_args: Extra arguments that may be passed to the
+ client operation
+
+ :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
+ :param subscribers: The list of subscribers to be invoked in the
+ order provided based on the event emit during the process of
+ the transfer request.
+
+ :rtype: s3transfer.futures.TransferFuture
+ :returns: Transfer future representing the download
+ """
+ if extra_args is None:
+ extra_args = {}
+ if subscribers is None:
+ subscribers = []
+ self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
+ self._validate_if_bucket_supported(bucket)
+ call_args = CallArgs(
+ bucket=bucket,
+ key=key,
+ fileobj=fileobj,
+ extra_args=extra_args,
+ subscribers=subscribers,
+ )
+ extra_main_kwargs = {'io_executor': self._io_executor}
+ if self._bandwidth_limiter:
+ extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
+ return self._submit_transfer(
+ call_args, DownloadSubmissionTask, extra_main_kwargs
+ )
+
+ def copy(
+ self,
+ copy_source,
+ bucket,
+ key,
+ extra_args=None,
+ subscribers=None,
+ source_client=None,
+ ):
+ """Copies a file in S3
+
+ :type copy_source: dict
+ :param copy_source: The name of the source bucket, key name of the
+ source object, and optional version ID of the source object. The
+ dictionary format is:
+ ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note
+ that the ``VersionId`` key is optional and may be omitted.
+
+ :type bucket: str
+ :param bucket: The name of the bucket to copy to
+
+ :type key: str
+ :param key: The name of the key to copy to
+
+ :type extra_args: dict
+ :param extra_args: Extra arguments that may be passed to the
+ client operation
+
+ :type subscribers: a list of subscribers
+ :param subscribers: The list of subscribers to be invoked in the
+ order provided based on the event emit during the process of
+ the transfer request.
+
+ :type source_client: botocore or boto3 Client
+ :param source_client: The client to be used for operation that
+ may happen at the source object. For example, this client is
+ used for the head_object that determines the size of the copy.
+ If no client is provided, the transfer manager's client is used
+ as the client for the source object.
+
+ :rtype: s3transfer.futures.TransferFuture
+ :returns: Transfer future representing the copy
+ """
+ if extra_args is None:
+ extra_args = {}
+ if subscribers is None:
+ subscribers = []
+ if source_client is None:
+ source_client = self._client
+ self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
+ if isinstance(copy_source, dict):
+ self._validate_if_bucket_supported(copy_source.get('Bucket'))
+ self._validate_if_bucket_supported(bucket)
+ call_args = CallArgs(
+ copy_source=copy_source,
+ bucket=bucket,
+ key=key,
+ extra_args=extra_args,
+ subscribers=subscribers,
+ source_client=source_client,
+ )
+ return self._submit_transfer(call_args, CopySubmissionTask)
+
+ def delete(self, bucket, key, extra_args=None, subscribers=None):
+ """Delete an S3 object.
+
+ :type bucket: str
+ :param bucket: The name of the bucket.
+
+ :type key: str
+ :param key: The name of the S3 object to delete.
+
+ :type extra_args: dict
+ :param extra_args: Extra arguments that may be passed to the
+ DeleteObject call.
+
+ :type subscribers: list
+ :param subscribers: A list of subscribers to be invoked during the
+ process of the transfer request. Note that the ``on_progress``
+ callback is not invoked during object deletion.
+
+ :rtype: s3transfer.futures.TransferFuture
+ :return: Transfer future representing the deletion.
+
+ """
+ if extra_args is None:
+ extra_args = {}
+ if subscribers is None:
+ subscribers = []
+ self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
+ self._validate_if_bucket_supported(bucket)
+ call_args = CallArgs(
+ bucket=bucket,
+ key=key,
+ extra_args=extra_args,
+ subscribers=subscribers,
+ )
+ return self._submit_transfer(call_args, DeleteSubmissionTask)
+
+ def _validate_if_bucket_supported(self, bucket):
+ # s3 high level operations don't support some resources
+ # (eg. S3 Object Lambda) only direct API calls are available
+ # for such resources
+ if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
+ for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
+ match = pattern.match(bucket)
+ if match:
+ raise ValueError(
+ f'TransferManager methods do not support {resource} '
+ 'resource. Use direct client calls instead.'
+ )
+
+ def _validate_all_known_args(self, actual, allowed):
+ for kwarg in actual:
+ if kwarg not in allowed:
+ raise ValueError(
+ "Invalid extra_args key '{}', "
+ "must be one of: {}".format(kwarg, ', '.join(allowed))
+ )
+
+ def _add_operation_defaults(self, extra_args):
+ if (
+ self.client.meta.config.request_checksum_calculation
+ == "when_supported"
+ ):
+ set_default_checksum_algorithm(extra_args)
+
+ def _submit_transfer(
+ self, call_args, submission_task_cls, extra_main_kwargs=None
+ ):
+ if not extra_main_kwargs:
+ extra_main_kwargs = {}
+
+ # Create a TransferFuture to return back to the user
+ transfer_future, components = self._get_future_with_components(
+ call_args
+ )
+
+ # Add any provided done callbacks to the created transfer future
+ # to be invoked on the transfer future being complete.
+ for callback in get_callbacks(transfer_future, 'done'):
+ components['coordinator'].add_done_callback(callback)
+
+ # Get the main kwargs needed to instantiate the submission task
+ main_kwargs = self._get_submission_task_main_kwargs(
+ transfer_future, extra_main_kwargs
+ )
+
+ # Submit a SubmissionTask that will submit all of the necessary
+ # tasks needed to complete the S3 transfer.
+ self._submission_executor.submit(
+ submission_task_cls(
+ transfer_coordinator=components['coordinator'],
+ main_kwargs=main_kwargs,
+ )
+ )
+
+ # Increment the unique id counter for future transfer requests
+ self._id_counter += 1
+
+ return transfer_future
+
+ def _get_future_with_components(self, call_args):
+ transfer_id = self._id_counter
+ # Creates a new transfer future along with its components
+ transfer_coordinator = TransferCoordinator(transfer_id=transfer_id)
+ # Track the transfer coordinator for transfers to manage.
+ self._coordinator_controller.add_transfer_coordinator(
+ transfer_coordinator
+ )
+ # Also make sure that the transfer coordinator is removed once
+ # the transfer completes so it does not stick around in memory.
+ transfer_coordinator.add_done_callback(
+ self._coordinator_controller.remove_transfer_coordinator,
+ transfer_coordinator,
+ )
+ components = {
+ 'meta': TransferMeta(call_args, transfer_id=transfer_id),
+ 'coordinator': transfer_coordinator,
+ }
+ transfer_future = TransferFuture(**components)
+ return transfer_future, components
+
+ def _get_submission_task_main_kwargs(
+ self, transfer_future, extra_main_kwargs
+ ):
+ main_kwargs = {
+ 'client': self._client,
+ 'config': self._config,
+ 'osutil': self._osutil,
+ 'request_executor': self._request_executor,
+ 'transfer_future': transfer_future,
+ }
+ main_kwargs.update(extra_main_kwargs)
+ return main_kwargs
+
+ def _register_handlers(self):
+ # Register handlers to enable/disable callbacks on uploads.
+ event_name = 'request-created.s3'
+ self._client.meta.events.register_first(
+ event_name,
+ signal_not_transferring,
+ unique_id='s3upload-not-transferring',
+ )
+ self._client.meta.events.register_last(
+ event_name, signal_transferring, unique_id='s3upload-transferring'
+ )
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, *args):
+ cancel = False
+ cancel_msg = ''
+ cancel_exc_type = FatalError
+ # If a exception was raised in the context handler, signal to cancel
+ # all of the inprogress futures in the shutdown.
+ if exc_type:
+ cancel = True
+ cancel_msg = str(exc_value)
+ if not cancel_msg:
+ cancel_msg = repr(exc_value)
+ # If it was a KeyboardInterrupt, the cancellation was initiated
+ # by the user.
+ if isinstance(exc_value, KeyboardInterrupt):
+ cancel_exc_type = CancelledError
+ self._shutdown(cancel, cancel_msg, cancel_exc_type)
+
+ def shutdown(self, cancel=False, cancel_msg=''):
+ """Shutdown the TransferManager
+
+ It will wait till all transfers complete before it completely shuts
+ down.
+
+ :type cancel: boolean
+ :param cancel: If True, calls TransferFuture.cancel() for
+ all in-progress in transfers. This is useful if you want the
+ shutdown to happen quicker.
+
+ :type cancel_msg: str
+ :param cancel_msg: The message to specify if canceling all in-progress
+ transfers.
+ """
+ self._shutdown(cancel, cancel, cancel_msg)
+
+ def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
+ if cancel:
+ # Cancel all in-flight transfers if requested, before waiting
+ # for them to complete.
+ self._coordinator_controller.cancel(cancel_msg, exc_type)
+ try:
+ # Wait until there are no more in-progress transfers. This is
+ # wrapped in a try statement because this can be interrupted
+ # with a KeyboardInterrupt that needs to be caught.
+ self._coordinator_controller.wait()
+ except KeyboardInterrupt:
+ # If not errors were raised in the try block, the cancel should
+ # have no coordinators it needs to run cancel on. If there was
+ # an error raised in the try statement we want to cancel all of
+ # the inflight transfers before shutting down to speed that
+ # process up.
+ self._coordinator_controller.cancel('KeyboardInterrupt()')
+ raise
+ finally:
+ # Shutdown all of the executors.
+ self._submission_executor.shutdown()
+ self._request_executor.shutdown()
+ self._io_executor.shutdown()
+
+
+class TransferCoordinatorController:
+ def __init__(self):
+ """Abstraction to control all transfer coordinators
+
+ This abstraction allows the manager to wait for inprogress transfers
+ to complete and cancel all inprogress transfers.
+ """
+ self._lock = threading.Lock()
+ self._tracked_transfer_coordinators = set()
+
+ @property
+ def tracked_transfer_coordinators(self):
+ """The set of transfer coordinators being tracked"""
+ with self._lock:
+ # We return a copy because the set is mutable and if you were to
+ # iterate over the set, it may be changing in length due to
+ # additions and removals of transfer coordinators.
+ return copy.copy(self._tracked_transfer_coordinators)
+
+ def add_transfer_coordinator(self, transfer_coordinator):
+ """Adds a transfer coordinator of a transfer to be canceled if needed
+
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ :param transfer_coordinator: The transfer coordinator for the
+ particular transfer
+ """
+ with self._lock:
+ self._tracked_transfer_coordinators.add(transfer_coordinator)
+
+ def remove_transfer_coordinator(self, transfer_coordinator):
+ """Remove a transfer coordinator from cancellation consideration
+
+ Typically, this method is invoked by the transfer coordinator itself
+ to remove its self when it completes its transfer.
+
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ :param transfer_coordinator: The transfer coordinator for the
+ particular transfer
+ """
+ with self._lock:
+ self._tracked_transfer_coordinators.remove(transfer_coordinator)
+
+ def cancel(self, msg='', exc_type=CancelledError):
+ """Cancels all inprogress transfers
+
+ This cancels the inprogress transfers by calling cancel() on all
+ tracked transfer coordinators.
+
+ :param msg: The message to pass on to each transfer coordinator that
+ gets cancelled.
+
+ :param exc_type: The type of exception to set for the cancellation
+ """
+ for transfer_coordinator in self.tracked_transfer_coordinators:
+ transfer_coordinator.cancel(msg, exc_type)
+
+ def wait(self):
+ """Wait until there are no more inprogress transfers
+
+ This will not stop when failures are encountered and not propagate any
+ of these errors from failed transfers, but it can be interrupted with
+ a KeyboardInterrupt.
+ """
+ try:
+ transfer_coordinator = None
+ for transfer_coordinator in self.tracked_transfer_coordinators:
+ transfer_coordinator.result()
+ except KeyboardInterrupt:
+ logger.debug('Received KeyboardInterrupt in wait()')
+ # If Keyboard interrupt is raised while waiting for
+ # the result, then exit out of the wait and raise the
+ # exception
+ if transfer_coordinator:
+ logger.debug(
+ 'On KeyboardInterrupt was waiting for %s',
+ transfer_coordinator,
+ )
+ raise
+ except Exception:
+ # A general exception could have been thrown because
+ # of result(). We just want to ignore this and continue
+ # because we at least know that the transfer coordinator
+ # has completed.
+ pass
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/processpool.py b/.venv/lib/python3.12/site-packages/s3transfer/processpool.py
new file mode 100644
index 00000000..318f1e7f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/processpool.py
@@ -0,0 +1,1009 @@
+# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""Speeds up S3 throughput by using processes
+
+Getting Started
+===============
+
+The :class:`ProcessPoolDownloader` can be used to download a single file by
+calling :meth:`ProcessPoolDownloader.download_file`:
+
+.. code:: python
+
+ from s3transfer.processpool import ProcessPoolDownloader
+
+ with ProcessPoolDownloader() as downloader:
+ downloader.download_file('mybucket', 'mykey', 'myfile')
+
+
+This snippet downloads the S3 object located in the bucket ``mybucket`` at the
+key ``mykey`` to the local file ``myfile``. Any errors encountered during the
+transfer are not propagated. To determine if a transfer succeeded or
+failed, use the `Futures`_ interface.
+
+
+The :class:`ProcessPoolDownloader` can be used to download multiple files as
+well:
+
+.. code:: python
+
+ from s3transfer.processpool import ProcessPoolDownloader
+
+ with ProcessPoolDownloader() as downloader:
+ downloader.download_file('mybucket', 'mykey', 'myfile')
+ downloader.download_file('mybucket', 'myotherkey', 'myotherfile')
+
+
+When running this snippet, the downloading of ``mykey`` and ``myotherkey``
+happen in parallel. The first ``download_file`` call does not block the
+second ``download_file`` call. The snippet blocks when exiting
+the context manager and blocks until both downloads are complete.
+
+Alternatively, the ``ProcessPoolDownloader`` can be instantiated
+and explicitly be shutdown using :meth:`ProcessPoolDownloader.shutdown`:
+
+.. code:: python
+
+ from s3transfer.processpool import ProcessPoolDownloader
+
+ downloader = ProcessPoolDownloader()
+ downloader.download_file('mybucket', 'mykey', 'myfile')
+ downloader.download_file('mybucket', 'myotherkey', 'myotherfile')
+ downloader.shutdown()
+
+
+For this code snippet, the call to ``shutdown`` blocks until both
+downloads are complete.
+
+
+Additional Parameters
+=====================
+
+Additional parameters can be provided to the ``download_file`` method:
+
+* ``extra_args``: A dictionary containing any additional client arguments
+ to include in the
+ `GetObject <https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object>`_
+ API request. For example:
+
+ .. code:: python
+
+ from s3transfer.processpool import ProcessPoolDownloader
+
+ with ProcessPoolDownloader() as downloader:
+ downloader.download_file(
+ 'mybucket', 'mykey', 'myfile',
+ extra_args={'VersionId': 'myversion'})
+
+
+* ``expected_size``: By default, the downloader will make a HeadObject
+ call to determine the size of the object. To opt-out of this additional
+ API call, you can provide the size of the object in bytes:
+
+ .. code:: python
+
+ from s3transfer.processpool import ProcessPoolDownloader
+
+ MB = 1024 * 1024
+ with ProcessPoolDownloader() as downloader:
+ downloader.download_file(
+ 'mybucket', 'mykey', 'myfile', expected_size=2 * MB)
+
+
+Futures
+=======
+
+When ``download_file`` is called, it immediately returns a
+:class:`ProcessPoolTransferFuture`. The future can be used to poll the state
+of a particular transfer. To get the result of the download,
+call :meth:`ProcessPoolTransferFuture.result`. The method blocks
+until the transfer completes, whether it succeeds or fails. For example:
+
+.. code:: python
+
+ from s3transfer.processpool import ProcessPoolDownloader
+
+ with ProcessPoolDownloader() as downloader:
+ future = downloader.download_file('mybucket', 'mykey', 'myfile')
+ print(future.result())
+
+
+If the download succeeds, the future returns ``None``:
+
+.. code:: python
+
+ None
+
+
+If the download fails, the exception causing the failure is raised. For
+example, if ``mykey`` did not exist, the following error would be raised
+
+
+.. code:: python
+
+ botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
+
+
+.. note::
+
+ :meth:`ProcessPoolTransferFuture.result` can only be called while the
+ ``ProcessPoolDownloader`` is running (e.g. before calling ``shutdown`` or
+ inside the context manager).
+
+
+Process Pool Configuration
+==========================
+
+By default, the downloader has the following configuration options:
+
+* ``multipart_threshold``: The threshold size for performing ranged downloads
+ in bytes. By default, ranged downloads happen for S3 objects that are
+ greater than or equal to 8 MB in size.
+
+* ``multipart_chunksize``: The size of each ranged download in bytes. By
+ default, the size of each ranged download is 8 MB.
+
+* ``max_request_processes``: The maximum number of processes used to download
+ S3 objects. By default, the maximum is 10 processes.
+
+
+To change the default configuration, use the :class:`ProcessTransferConfig`:
+
+.. code:: python
+
+ from s3transfer.processpool import ProcessPoolDownloader
+ from s3transfer.processpool import ProcessTransferConfig
+
+ config = ProcessTransferConfig(
+ multipart_threshold=64 * 1024 * 1024, # 64 MB
+ max_request_processes=50
+ )
+ downloader = ProcessPoolDownloader(config=config)
+
+
+Client Configuration
+====================
+
+The process pool downloader creates ``botocore`` clients on your behalf. In
+order to affect how the client is created, pass the keyword arguments
+that would have been used in the :meth:`botocore.Session.create_client` call:
+
+.. code:: python
+
+
+ from s3transfer.processpool import ProcessPoolDownloader
+ from s3transfer.processpool import ProcessTransferConfig
+
+ downloader = ProcessPoolDownloader(
+ client_kwargs={'region_name': 'us-west-2'})
+
+
+This snippet ensures that all clients created by the ``ProcessPoolDownloader``
+are using ``us-west-2`` as their region.
+
+"""
+
+import collections
+import contextlib
+import logging
+import multiprocessing
+import signal
+import threading
+from copy import deepcopy
+
+import botocore.session
+from botocore.config import Config
+
+from s3transfer.compat import MAXINT, BaseManager
+from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, MB, PROCESS_USER_AGENT
+from s3transfer.exceptions import CancelledError, RetriesExceededError
+from s3transfer.futures import BaseTransferFuture, BaseTransferMeta
+from s3transfer.utils import (
+ S3_RETRYABLE_DOWNLOAD_ERRORS,
+ CallArgs,
+ OSUtils,
+ calculate_num_parts,
+ calculate_range_parameter,
+)
+
+logger = logging.getLogger(__name__)
+
+SHUTDOWN_SIGNAL = 'SHUTDOWN'
+
+# The DownloadFileRequest tuple is submitted from the ProcessPoolDownloader
+# to the GetObjectSubmitter in order for the submitter to begin submitting
+# GetObjectJobs to the GetObjectWorkers.
+DownloadFileRequest = collections.namedtuple(
+ 'DownloadFileRequest',
+ [
+ 'transfer_id', # The unique id for the transfer
+ 'bucket', # The bucket to download the object from
+ 'key', # The key to download the object from
+ 'filename', # The user-requested download location
+ 'extra_args', # Extra arguments to provide to client calls
+ 'expected_size', # The user-provided expected size of the download
+ ],
+)
+
+# The GetObjectJob tuple is submitted from the GetObjectSubmitter
+# to the GetObjectWorkers to download the file or parts of the file.
+GetObjectJob = collections.namedtuple(
+ 'GetObjectJob',
+ [
+ 'transfer_id', # The unique id for the transfer
+ 'bucket', # The bucket to download the object from
+ 'key', # The key to download the object from
+ 'temp_filename', # The temporary file to write the content to via
+ # completed GetObject calls.
+ 'extra_args', # Extra arguments to provide to the GetObject call
+ 'offset', # The offset to write the content for the temp file.
+ 'filename', # The user-requested download location. The worker
+ # of final GetObjectJob will move the file located at
+ # temp_filename to the location of filename.
+ ],
+)
+
+
+@contextlib.contextmanager
+def ignore_ctrl_c():
+ original_handler = _add_ignore_handler_for_interrupts()
+ yield
+ signal.signal(signal.SIGINT, original_handler)
+
+
+def _add_ignore_handler_for_interrupts():
+ # Windows is unable to pickle signal.signal directly so it needs to
+ # be wrapped in a function defined at the module level
+ return signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+
+class ProcessTransferConfig:
+ def __init__(
+ self,
+ multipart_threshold=8 * MB,
+ multipart_chunksize=8 * MB,
+ max_request_processes=10,
+ ):
+ """Configuration for the ProcessPoolDownloader
+
+ :param multipart_threshold: The threshold for which ranged downloads
+ occur.
+
+ :param multipart_chunksize: The chunk size of each ranged download.
+
+ :param max_request_processes: The maximum number of processes that
+ will be making S3 API transfer-related requests at a time.
+ """
+ self.multipart_threshold = multipart_threshold
+ self.multipart_chunksize = multipart_chunksize
+ self.max_request_processes = max_request_processes
+
+
+class ProcessPoolDownloader:
+ def __init__(self, client_kwargs=None, config=None):
+ """Downloads S3 objects using process pools
+
+ :type client_kwargs: dict
+ :param client_kwargs: The keyword arguments to provide when
+ instantiating S3 clients. The arguments must match the keyword
+ arguments provided to the
+ `botocore.session.Session.create_client()` method.
+
+ :type config: ProcessTransferConfig
+ :param config: Configuration for the downloader
+ """
+ if client_kwargs is None:
+ client_kwargs = {}
+ self._client_factory = ClientFactory(client_kwargs)
+
+ self._transfer_config = config
+ if config is None:
+ self._transfer_config = ProcessTransferConfig()
+
+ self._download_request_queue = multiprocessing.Queue(1000)
+ self._worker_queue = multiprocessing.Queue(1000)
+ self._osutil = OSUtils()
+
+ self._started = False
+ self._start_lock = threading.Lock()
+
+ # These below are initialized in the start() method
+ self._manager = None
+ self._transfer_monitor = None
+ self._submitter = None
+ self._workers = []
+
+ def download_file(
+ self, bucket, key, filename, extra_args=None, expected_size=None
+ ):
+ """Downloads the object's contents to a file
+
+ :type bucket: str
+ :param bucket: The name of the bucket to download from
+
+ :type key: str
+ :param key: The name of the key to download from
+
+ :type filename: str
+ :param filename: The name of a file to download to.
+
+ :type extra_args: dict
+ :param extra_args: Extra arguments that may be passed to the
+ client operation
+
+ :type expected_size: int
+ :param expected_size: The expected size in bytes of the download. If
+ provided, the downloader will not call HeadObject to determine the
+ object's size and use the provided value instead. The size is
+ needed to determine whether to do a multipart download.
+
+ :rtype: s3transfer.futures.TransferFuture
+ :returns: Transfer future representing the download
+ """
+ self._start_if_needed()
+ if extra_args is None:
+ extra_args = {}
+ self._validate_all_known_args(extra_args)
+ transfer_id = self._transfer_monitor.notify_new_transfer()
+ download_file_request = DownloadFileRequest(
+ transfer_id=transfer_id,
+ bucket=bucket,
+ key=key,
+ filename=filename,
+ extra_args=extra_args,
+ expected_size=expected_size,
+ )
+ logger.debug(
+ 'Submitting download file request: %s.', download_file_request
+ )
+ self._download_request_queue.put(download_file_request)
+ call_args = CallArgs(
+ bucket=bucket,
+ key=key,
+ filename=filename,
+ extra_args=extra_args,
+ expected_size=expected_size,
+ )
+ future = self._get_transfer_future(transfer_id, call_args)
+ return future
+
+ def shutdown(self):
+ """Shutdown the downloader
+
+ It will wait till all downloads are complete before returning.
+ """
+ self._shutdown_if_needed()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, *args):
+ if isinstance(exc_value, KeyboardInterrupt):
+ if self._transfer_monitor is not None:
+ self._transfer_monitor.notify_cancel_all_in_progress()
+ self.shutdown()
+
+ def _start_if_needed(self):
+ with self._start_lock:
+ if not self._started:
+ self._start()
+
+ def _start(self):
+ self._start_transfer_monitor_manager()
+ self._start_submitter()
+ self._start_get_object_workers()
+ self._started = True
+
+ def _validate_all_known_args(self, provided):
+ for kwarg in provided:
+ if kwarg not in ALLOWED_DOWNLOAD_ARGS:
+ download_args = ', '.join(ALLOWED_DOWNLOAD_ARGS)
+ raise ValueError(
+ f"Invalid extra_args key '{kwarg}', "
+ f"must be one of: {download_args}"
+ )
+
+ def _get_transfer_future(self, transfer_id, call_args):
+ meta = ProcessPoolTransferMeta(
+ call_args=call_args, transfer_id=transfer_id
+ )
+ future = ProcessPoolTransferFuture(
+ monitor=self._transfer_monitor, meta=meta
+ )
+ return future
+
+ def _start_transfer_monitor_manager(self):
+ logger.debug('Starting the TransferMonitorManager.')
+ self._manager = TransferMonitorManager()
+ # We do not want Ctrl-C's to cause the manager to shutdown immediately
+ # as worker processes will still need to communicate with it when they
+ # are shutting down. So instead we ignore Ctrl-C and let the manager
+ # be explicitly shutdown when shutting down the downloader.
+ self._manager.start(_add_ignore_handler_for_interrupts)
+ self._transfer_monitor = self._manager.TransferMonitor()
+
+ def _start_submitter(self):
+ logger.debug('Starting the GetObjectSubmitter.')
+ self._submitter = GetObjectSubmitter(
+ transfer_config=self._transfer_config,
+ client_factory=self._client_factory,
+ transfer_monitor=self._transfer_monitor,
+ osutil=self._osutil,
+ download_request_queue=self._download_request_queue,
+ worker_queue=self._worker_queue,
+ )
+ self._submitter.start()
+
+ def _start_get_object_workers(self):
+ logger.debug(
+ 'Starting %s GetObjectWorkers.',
+ self._transfer_config.max_request_processes,
+ )
+ for _ in range(self._transfer_config.max_request_processes):
+ worker = GetObjectWorker(
+ queue=self._worker_queue,
+ client_factory=self._client_factory,
+ transfer_monitor=self._transfer_monitor,
+ osutil=self._osutil,
+ )
+ worker.start()
+ self._workers.append(worker)
+
+ def _shutdown_if_needed(self):
+ with self._start_lock:
+ if self._started:
+ self._shutdown()
+
+ def _shutdown(self):
+ self._shutdown_submitter()
+ self._shutdown_get_object_workers()
+ self._shutdown_transfer_monitor_manager()
+ self._started = False
+
+ def _shutdown_transfer_monitor_manager(self):
+ logger.debug('Shutting down the TransferMonitorManager.')
+ self._manager.shutdown()
+
+ def _shutdown_submitter(self):
+ logger.debug('Shutting down the GetObjectSubmitter.')
+ self._download_request_queue.put(SHUTDOWN_SIGNAL)
+ self._submitter.join()
+
+ def _shutdown_get_object_workers(self):
+ logger.debug('Shutting down the GetObjectWorkers.')
+ for _ in self._workers:
+ self._worker_queue.put(SHUTDOWN_SIGNAL)
+ for worker in self._workers:
+ worker.join()
+
+
+class ProcessPoolTransferFuture(BaseTransferFuture):
+ def __init__(self, monitor, meta):
+ """The future associated to a submitted process pool transfer request
+
+ :type monitor: TransferMonitor
+ :param monitor: The monitor associated to the process pool downloader
+
+ :type meta: ProcessPoolTransferMeta
+ :param meta: The metadata associated to the request. This object
+ is visible to the requester.
+ """
+ self._monitor = monitor
+ self._meta = meta
+
+ @property
+ def meta(self):
+ return self._meta
+
+ def done(self):
+ return self._monitor.is_done(self._meta.transfer_id)
+
+ def result(self):
+ try:
+ return self._monitor.poll_for_result(self._meta.transfer_id)
+ except KeyboardInterrupt:
+ # For the multiprocessing Manager, a thread is given a single
+ # connection to reuse in communicating between the thread in the
+ # main process and the Manager's process. If a Ctrl-C happens when
+ # polling for the result, it will make the main thread stop trying
+ # to receive from the connection, but the Manager process will not
+ # know that the main process has stopped trying to receive and
+ # will not close the connection. As a result if another message is
+ # sent to the Manager process, the listener in the Manager
+ # processes will not process the new message as it is still trying
+ # trying to process the previous message (that was Ctrl-C'd) and
+ # thus cause the thread in the main process to hang on its send.
+ # The only way around this is to create a new connection and send
+ # messages from that new connection instead.
+ self._monitor._connect()
+ self.cancel()
+ raise
+
+ def cancel(self):
+ self._monitor.notify_exception(
+ self._meta.transfer_id, CancelledError()
+ )
+
+
+class ProcessPoolTransferMeta(BaseTransferMeta):
+ """Holds metadata about the ProcessPoolTransferFuture"""
+
+ def __init__(self, transfer_id, call_args):
+ self._transfer_id = transfer_id
+ self._call_args = call_args
+ self._user_context = {}
+
+ @property
+ def call_args(self):
+ return self._call_args
+
+ @property
+ def transfer_id(self):
+ return self._transfer_id
+
+ @property
+ def user_context(self):
+ return self._user_context
+
+
+class ClientFactory:
+ def __init__(self, client_kwargs=None):
+ """Creates S3 clients for processes
+
+ Botocore sessions and clients are not pickleable so they cannot be
+ inherited across Process boundaries. Instead, they must be instantiated
+ once a process is running.
+ """
+ self._client_kwargs = client_kwargs
+ if self._client_kwargs is None:
+ self._client_kwargs = {}
+
+ client_config = deepcopy(self._client_kwargs.get('config', Config()))
+ if not client_config.user_agent_extra:
+ client_config.user_agent_extra = PROCESS_USER_AGENT
+ else:
+ client_config.user_agent_extra += " " + PROCESS_USER_AGENT
+ self._client_kwargs['config'] = client_config
+
+ def create_client(self):
+ """Create a botocore S3 client"""
+ return botocore.session.Session().create_client(
+ 's3', **self._client_kwargs
+ )
+
+
+class TransferMonitor:
+ def __init__(self):
+ """Monitors transfers for cross-process communication
+
+ Notifications can be sent to the monitor and information can be
+ retrieved from the monitor for a particular transfer. This abstraction
+ is ran in a ``multiprocessing.managers.BaseManager`` in order to be
+ shared across processes.
+ """
+ # TODO: Add logic that removes the TransferState if the transfer is
+ # marked as done and the reference to the future is no longer being
+ # held onto. Without this logic, this dictionary will continue to
+ # grow in size with no limit.
+ self._transfer_states = {}
+ self._id_count = 0
+ self._init_lock = threading.Lock()
+
+ def notify_new_transfer(self):
+ with self._init_lock:
+ transfer_id = self._id_count
+ self._transfer_states[transfer_id] = TransferState()
+ self._id_count += 1
+ return transfer_id
+
+ def is_done(self, transfer_id):
+ """Determine a particular transfer is complete
+
+ :param transfer_id: Unique identifier for the transfer
+ :return: True, if done. False, otherwise.
+ """
+ return self._transfer_states[transfer_id].done
+
+ def notify_done(self, transfer_id):
+ """Notify a particular transfer is complete
+
+ :param transfer_id: Unique identifier for the transfer
+ """
+ self._transfer_states[transfer_id].set_done()
+
+ def poll_for_result(self, transfer_id):
+ """Poll for the result of a transfer
+
+ :param transfer_id: Unique identifier for the transfer
+ :return: If the transfer succeeded, it will return the result. If the
+ transfer failed, it will raise the exception associated to the
+ failure.
+ """
+ self._transfer_states[transfer_id].wait_till_done()
+ exception = self._transfer_states[transfer_id].exception
+ if exception:
+ raise exception
+ return None
+
+ def notify_exception(self, transfer_id, exception):
+ """Notify an exception was encountered for a transfer
+
+ :param transfer_id: Unique identifier for the transfer
+ :param exception: The exception encountered for that transfer
+ """
+ # TODO: Not all exceptions are pickleable so if we are running
+ # this in a multiprocessing.BaseManager we will want to
+ # make sure to update this signature to ensure pickleability of the
+ # arguments or have the ProxyObject do the serialization.
+ self._transfer_states[transfer_id].exception = exception
+
+ def notify_cancel_all_in_progress(self):
+ for transfer_state in self._transfer_states.values():
+ if not transfer_state.done:
+ transfer_state.exception = CancelledError()
+
+ def get_exception(self, transfer_id):
+ """Retrieve the exception encountered for the transfer
+
+ :param transfer_id: Unique identifier for the transfer
+ :return: The exception encountered for that transfer. Otherwise
+ if there were no exceptions, returns None.
+ """
+ return self._transfer_states[transfer_id].exception
+
+ def notify_expected_jobs_to_complete(self, transfer_id, num_jobs):
+ """Notify the amount of jobs expected for a transfer
+
+ :param transfer_id: Unique identifier for the transfer
+ :param num_jobs: The number of jobs to complete the transfer
+ """
+ self._transfer_states[transfer_id].jobs_to_complete = num_jobs
+
+ def notify_job_complete(self, transfer_id):
+ """Notify that a single job is completed for a transfer
+
+ :param transfer_id: Unique identifier for the transfer
+ :return: The number of jobs remaining to complete the transfer
+ """
+ return self._transfer_states[transfer_id].decrement_jobs_to_complete()
+
+
+class TransferState:
+ """Represents the current state of an individual transfer"""
+
+ # NOTE: Ideally the TransferState object would be used directly by the
+ # various different abstractions in the ProcessPoolDownloader and remove
+ # the need for the TransferMonitor. However, it would then impose the
+ # constraint that two hops are required to make or get any changes in the
+ # state of a transfer across processes: one hop to get a proxy object for
+ # the TransferState and then a second hop to communicate calling the
+ # specific TransferState method.
+ def __init__(self):
+ self._exception = None
+ self._done_event = threading.Event()
+ self._job_lock = threading.Lock()
+ self._jobs_to_complete = 0
+
+ @property
+ def done(self):
+ return self._done_event.is_set()
+
+ def set_done(self):
+ self._done_event.set()
+
+ def wait_till_done(self):
+ self._done_event.wait(MAXINT)
+
+ @property
+ def exception(self):
+ return self._exception
+
+ @exception.setter
+ def exception(self, val):
+ self._exception = val
+
+ @property
+ def jobs_to_complete(self):
+ return self._jobs_to_complete
+
+ @jobs_to_complete.setter
+ def jobs_to_complete(self, val):
+ self._jobs_to_complete = val
+
+ def decrement_jobs_to_complete(self):
+ with self._job_lock:
+ self._jobs_to_complete -= 1
+ return self._jobs_to_complete
+
+
+class TransferMonitorManager(BaseManager):
+ pass
+
+
+TransferMonitorManager.register('TransferMonitor', TransferMonitor)
+
+
+class BaseS3TransferProcess(multiprocessing.Process):
+ def __init__(self, client_factory):
+ super().__init__()
+ self._client_factory = client_factory
+ self._client = None
+
+ def run(self):
+ # Clients are not pickleable so their instantiation cannot happen
+ # in the __init__ for processes that are created under the
+ # spawn method.
+ self._client = self._client_factory.create_client()
+ with ignore_ctrl_c():
+ # By default these processes are ran as child processes to the
+ # main process. Any Ctrl-c encountered in the main process is
+ # propagated to the child process and interrupt it at any time.
+ # To avoid any potentially bad states caused from an interrupt
+ # (i.e. a transfer failing to notify its done or making the
+ # communication protocol become out of sync with the
+ # TransferMonitor), we ignore all Ctrl-C's and allow the main
+ # process to notify these child processes when to stop processing
+ # jobs.
+ self._do_run()
+
+ def _do_run(self):
+ raise NotImplementedError('_do_run()')
+
+
+class GetObjectSubmitter(BaseS3TransferProcess):
+ def __init__(
+ self,
+ transfer_config,
+ client_factory,
+ transfer_monitor,
+ osutil,
+ download_request_queue,
+ worker_queue,
+ ):
+ """Submit GetObjectJobs to fulfill a download file request
+
+ :param transfer_config: Configuration for transfers.
+ :param client_factory: ClientFactory for creating S3 clients.
+ :param transfer_monitor: Monitor for notifying and retrieving state
+ of transfer.
+ :param osutil: OSUtils object to use for os-related behavior when
+ performing the transfer.
+ :param download_request_queue: Queue to retrieve download file
+ requests.
+ :param worker_queue: Queue to submit GetObjectJobs for workers
+ to perform.
+ """
+ super().__init__(client_factory)
+ self._transfer_config = transfer_config
+ self._transfer_monitor = transfer_monitor
+ self._osutil = osutil
+ self._download_request_queue = download_request_queue
+ self._worker_queue = worker_queue
+
+ def _do_run(self):
+ while True:
+ download_file_request = self._download_request_queue.get()
+ if download_file_request == SHUTDOWN_SIGNAL:
+ logger.debug('Submitter shutdown signal received.')
+ return
+ try:
+ self._submit_get_object_jobs(download_file_request)
+ except Exception as e:
+ logger.debug(
+ 'Exception caught when submitting jobs for '
+ 'download file request %s: %s',
+ download_file_request,
+ e,
+ exc_info=True,
+ )
+ self._transfer_monitor.notify_exception(
+ download_file_request.transfer_id, e
+ )
+ self._transfer_monitor.notify_done(
+ download_file_request.transfer_id
+ )
+
+ def _submit_get_object_jobs(self, download_file_request):
+ size = self._get_size(download_file_request)
+ temp_filename = self._allocate_temp_file(download_file_request, size)
+ if size < self._transfer_config.multipart_threshold:
+ self._submit_single_get_object_job(
+ download_file_request, temp_filename
+ )
+ else:
+ self._submit_ranged_get_object_jobs(
+ download_file_request, temp_filename, size
+ )
+
+ def _get_size(self, download_file_request):
+ expected_size = download_file_request.expected_size
+ if expected_size is None:
+ expected_size = self._client.head_object(
+ Bucket=download_file_request.bucket,
+ Key=download_file_request.key,
+ **download_file_request.extra_args,
+ )['ContentLength']
+ return expected_size
+
+ def _allocate_temp_file(self, download_file_request, size):
+ temp_filename = self._osutil.get_temp_filename(
+ download_file_request.filename
+ )
+ self._osutil.allocate(temp_filename, size)
+ return temp_filename
+
+ def _submit_single_get_object_job(
+ self, download_file_request, temp_filename
+ ):
+ self._notify_jobs_to_complete(download_file_request.transfer_id, 1)
+ self._submit_get_object_job(
+ transfer_id=download_file_request.transfer_id,
+ bucket=download_file_request.bucket,
+ key=download_file_request.key,
+ temp_filename=temp_filename,
+ offset=0,
+ extra_args=download_file_request.extra_args,
+ filename=download_file_request.filename,
+ )
+
+ def _submit_ranged_get_object_jobs(
+ self, download_file_request, temp_filename, size
+ ):
+ part_size = self._transfer_config.multipart_chunksize
+ num_parts = calculate_num_parts(size, part_size)
+ self._notify_jobs_to_complete(
+ download_file_request.transfer_id, num_parts
+ )
+ for i in range(num_parts):
+ offset = i * part_size
+ range_parameter = calculate_range_parameter(
+ part_size, i, num_parts
+ )
+ get_object_kwargs = {'Range': range_parameter}
+ get_object_kwargs.update(download_file_request.extra_args)
+ self._submit_get_object_job(
+ transfer_id=download_file_request.transfer_id,
+ bucket=download_file_request.bucket,
+ key=download_file_request.key,
+ temp_filename=temp_filename,
+ offset=offset,
+ extra_args=get_object_kwargs,
+ filename=download_file_request.filename,
+ )
+
+ def _submit_get_object_job(self, **get_object_job_kwargs):
+ self._worker_queue.put(GetObjectJob(**get_object_job_kwargs))
+
+ def _notify_jobs_to_complete(self, transfer_id, jobs_to_complete):
+ logger.debug(
+ 'Notifying %s job(s) to complete for transfer_id %s.',
+ jobs_to_complete,
+ transfer_id,
+ )
+ self._transfer_monitor.notify_expected_jobs_to_complete(
+ transfer_id, jobs_to_complete
+ )
+
+
+class GetObjectWorker(BaseS3TransferProcess):
+ # TODO: It may make sense to expose these class variables as configuration
+ # options if users want to tweak them.
+ _MAX_ATTEMPTS = 5
+ _IO_CHUNKSIZE = 2 * MB
+
+ def __init__(self, queue, client_factory, transfer_monitor, osutil):
+ """Fulfills GetObjectJobs
+
+ Downloads the S3 object, writes it to the specified file, and
+ renames the file to its final location if it completes the final
+ job for a particular transfer.
+
+ :param queue: Queue for retrieving GetObjectJob's
+ :param client_factory: ClientFactory for creating S3 clients
+ :param transfer_monitor: Monitor for notifying
+ :param osutil: OSUtils object to use for os-related behavior when
+ performing the transfer.
+ """
+ super().__init__(client_factory)
+ self._queue = queue
+ self._client_factory = client_factory
+ self._transfer_monitor = transfer_monitor
+ self._osutil = osutil
+
+ def _do_run(self):
+ while True:
+ job = self._queue.get()
+ if job == SHUTDOWN_SIGNAL:
+ logger.debug('Worker shutdown signal received.')
+ return
+ if not self._transfer_monitor.get_exception(job.transfer_id):
+ self._run_get_object_job(job)
+ else:
+ logger.debug(
+ 'Skipping get object job %s because there was a previous '
+ 'exception.',
+ job,
+ )
+ remaining = self._transfer_monitor.notify_job_complete(
+ job.transfer_id
+ )
+ logger.debug(
+ '%s jobs remaining for transfer_id %s.',
+ remaining,
+ job.transfer_id,
+ )
+ if not remaining:
+ self._finalize_download(
+ job.transfer_id, job.temp_filename, job.filename
+ )
+
+ def _run_get_object_job(self, job):
+ try:
+ self._do_get_object(
+ bucket=job.bucket,
+ key=job.key,
+ temp_filename=job.temp_filename,
+ extra_args=job.extra_args,
+ offset=job.offset,
+ )
+ except Exception as e:
+ logger.debug(
+ 'Exception caught when downloading object for '
+ 'get object job %s: %s',
+ job,
+ e,
+ exc_info=True,
+ )
+ self._transfer_monitor.notify_exception(job.transfer_id, e)
+
+ def _do_get_object(self, bucket, key, extra_args, temp_filename, offset):
+ last_exception = None
+ for i in range(self._MAX_ATTEMPTS):
+ try:
+ response = self._client.get_object(
+ Bucket=bucket, Key=key, **extra_args
+ )
+ self._write_to_file(temp_filename, offset, response['Body'])
+ return
+ except S3_RETRYABLE_DOWNLOAD_ERRORS as e:
+ logger.debug(
+ 'Retrying exception caught (%s), '
+ 'retrying request, (attempt %s / %s)',
+ e,
+ i + 1,
+ self._MAX_ATTEMPTS,
+ exc_info=True,
+ )
+ last_exception = e
+ raise RetriesExceededError(last_exception)
+
+ def _write_to_file(self, filename, offset, body):
+ with open(filename, 'rb+') as f:
+ f.seek(offset)
+ chunks = iter(lambda: body.read(self._IO_CHUNKSIZE), b'')
+ for chunk in chunks:
+ f.write(chunk)
+
+ def _finalize_download(self, transfer_id, temp_filename, filename):
+ if self._transfer_monitor.get_exception(transfer_id):
+ self._osutil.remove_file(temp_filename)
+ else:
+ self._do_file_rename(transfer_id, temp_filename, filename)
+ self._transfer_monitor.notify_done(transfer_id)
+
+ def _do_file_rename(self, transfer_id, temp_filename, filename):
+ try:
+ self._osutil.rename_file(temp_filename, filename)
+ except Exception as e:
+ self._transfer_monitor.notify_exception(transfer_id, e)
+ self._osutil.remove_file(temp_filename)
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/subscribers.py b/.venv/lib/python3.12/site-packages/s3transfer/subscribers.py
new file mode 100644
index 00000000..fe773233
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/subscribers.py
@@ -0,0 +1,94 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+from functools import lru_cache
+
+from s3transfer.compat import accepts_kwargs
+from s3transfer.exceptions import InvalidSubscriberMethodError
+
+
+class BaseSubscriber:
+ """The base subscriber class
+
+ It is recommended that all subscriber implementations subclass and then
+ override the subscription methods (i.e. on_{subsribe_type}() methods).
+ """
+
+ VALID_SUBSCRIBER_TYPES = ['queued', 'progress', 'done']
+
+ def __new__(cls, *args, **kwargs):
+ cls._validate_subscriber_methods()
+ return super().__new__(cls)
+
+ @classmethod
+ @lru_cache
+ def _validate_subscriber_methods(cls):
+ for subscriber_type in cls.VALID_SUBSCRIBER_TYPES:
+ subscriber_method = getattr(cls, 'on_' + subscriber_type)
+ if not callable(subscriber_method):
+ raise InvalidSubscriberMethodError(
+ f'Subscriber method {subscriber_method} must be callable.'
+ )
+
+ if not accepts_kwargs(subscriber_method):
+ raise InvalidSubscriberMethodError(
+ f'Subscriber method {subscriber_method} must accept keyword '
+ 'arguments (**kwargs)'
+ )
+
+ def on_queued(self, future, **kwargs):
+ """Callback to be invoked when transfer request gets queued
+
+ This callback can be useful for:
+
+ * Keeping track of how many transfers have been requested
+ * Providing the expected transfer size through
+ future.meta.provide_transfer_size() so a HeadObject would not
+ need to be made for copies and downloads.
+
+ :type future: s3transfer.futures.TransferFuture
+ :param future: The TransferFuture representing the requested transfer.
+ """
+ pass
+
+ def on_progress(self, future, bytes_transferred, **kwargs):
+ """Callback to be invoked when progress is made on transfer
+
+ This callback can be useful for:
+
+ * Recording and displaying progress
+
+ :type future: s3transfer.futures.TransferFuture
+ :param future: The TransferFuture representing the requested transfer.
+
+ :type bytes_transferred: int
+ :param bytes_transferred: The number of bytes transferred for that
+ invocation of the callback. Note that a negative amount can be
+ provided, which usually indicates that an in-progress request
+ needed to be retried and thus progress was rewound.
+ """
+ pass
+
+ def on_done(self, future, **kwargs):
+ """Callback to be invoked once a transfer is done
+
+ This callback can be useful for:
+
+ * Recording and displaying whether the transfer succeeded or
+ failed using future.result()
+ * Running some task after the transfer completed like changing
+ the last modified time of a downloaded file.
+
+ :type future: s3transfer.futures.TransferFuture
+ :param future: The TransferFuture representing the requested transfer.
+ """
+ pass
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/tasks.py b/.venv/lib/python3.12/site-packages/s3transfer/tasks.py
new file mode 100644
index 00000000..4ed3b88c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/tasks.py
@@ -0,0 +1,390 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import copy
+import logging
+
+from s3transfer.utils import get_callbacks
+
+try:
+ from botocore.context import start_as_current_context
+except ImportError:
+ from contextlib import nullcontext as start_as_current_context
+
+
+logger = logging.getLogger(__name__)
+
+
+class Task:
+ """A task associated to a TransferFuture request
+
+ This is a base class for other classes to subclass from. All subclassed
+ classes must implement the main() method.
+ """
+
+ def __init__(
+ self,
+ transfer_coordinator,
+ main_kwargs=None,
+ pending_main_kwargs=None,
+ done_callbacks=None,
+ is_final=False,
+ ):
+ """
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ :param transfer_coordinator: The context associated to the
+ TransferFuture for which this Task is associated with.
+
+ :type main_kwargs: dict
+ :param main_kwargs: The keyword args that can be immediately supplied
+ to the _main() method of the task
+
+ :type pending_main_kwargs: dict
+ :param pending_main_kwargs: The keyword args that are depended upon
+ by the result from a dependent future(s). The result returned by
+ the future(s) will be used as the value for the keyword argument
+ when _main() is called. The values for each key can be:
+ * a single future - Once completed, its value will be the
+ result of that single future
+ * a list of futures - Once all of the futures complete, the
+ value used will be a list of each completed future result
+ value in order of when they were originally supplied.
+
+ :type done_callbacks: list of callbacks
+ :param done_callbacks: A list of callbacks to call once the task is
+ done completing. Each callback will be called with no arguments
+ and will be called no matter if the task succeeds or an exception
+ is raised.
+
+ :type is_final: boolean
+ :param is_final: True, to indicate that this task is the final task
+ for the TransferFuture request. By setting this value to True, it
+ will set the result of the entire TransferFuture to the result
+ returned by this task's main() method.
+ """
+ self._transfer_coordinator = transfer_coordinator
+
+ self._main_kwargs = main_kwargs
+ if self._main_kwargs is None:
+ self._main_kwargs = {}
+
+ self._pending_main_kwargs = pending_main_kwargs
+ if pending_main_kwargs is None:
+ self._pending_main_kwargs = {}
+
+ self._done_callbacks = done_callbacks
+ if self._done_callbacks is None:
+ self._done_callbacks = []
+
+ self._is_final = is_final
+
+ def __repr__(self):
+ # These are the general main_kwarg parameters that we want to
+ # display in the repr.
+ params_to_display = [
+ 'bucket',
+ 'key',
+ 'part_number',
+ 'final_filename',
+ 'transfer_future',
+ 'offset',
+ 'extra_args',
+ ]
+ main_kwargs_to_display = self._get_kwargs_with_params_to_include(
+ self._main_kwargs, params_to_display
+ )
+ return f'{self.__class__.__name__}(transfer_id={self._transfer_coordinator.transfer_id}, {main_kwargs_to_display})'
+
+ @property
+ def transfer_id(self):
+ """The id for the transfer request that the task belongs to"""
+ return self._transfer_coordinator.transfer_id
+
+ def _get_kwargs_with_params_to_include(self, kwargs, include):
+ filtered_kwargs = {}
+ for param in include:
+ if param in kwargs:
+ filtered_kwargs[param] = kwargs[param]
+ return filtered_kwargs
+
+ def _get_kwargs_with_params_to_exclude(self, kwargs, exclude):
+ filtered_kwargs = {}
+ for param, value in kwargs.items():
+ if param in exclude:
+ continue
+ filtered_kwargs[param] = value
+ return filtered_kwargs
+
+ def __call__(self, ctx=None):
+ """The callable to use when submitting a Task to an executor"""
+ with start_as_current_context(ctx):
+ try:
+ # Wait for all of futures this task depends on.
+ self._wait_on_dependent_futures()
+ # Gather up all of the main keyword arguments for main().
+ # This includes the immediately provided main_kwargs and
+ # the values for pending_main_kwargs that source from the return
+ # values from the task's dependent futures.
+ kwargs = self._get_all_main_kwargs()
+ # If the task is not done (really only if some other related
+ # task to the TransferFuture had failed) then execute the task's
+ # main() method.
+ if not self._transfer_coordinator.done():
+ return self._execute_main(kwargs)
+ except Exception as e:
+ self._log_and_set_exception(e)
+ finally:
+ # Run any done callbacks associated to the task no matter what.
+ for done_callback in self._done_callbacks:
+ done_callback()
+
+ if self._is_final:
+ # If this is the final task announce that it is done if results
+ # are waiting on its completion.
+ self._transfer_coordinator.announce_done()
+
+ def _execute_main(self, kwargs):
+ # Do not display keyword args that should not be printed, especially
+ # if they are going to make the logs hard to follow.
+ params_to_exclude = ['data']
+ kwargs_to_display = self._get_kwargs_with_params_to_exclude(
+ kwargs, params_to_exclude
+ )
+ # Log what is about to be executed.
+ logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}")
+
+ return_value = self._main(**kwargs)
+ # If the task is the final task, then set the TransferFuture's
+ # value to the return value from main().
+ if self._is_final:
+ self._transfer_coordinator.set_result(return_value)
+ return return_value
+
+ def _log_and_set_exception(self, exception):
+ # If an exception is ever thrown than set the exception for the
+ # entire TransferFuture.
+ logger.debug("Exception raised.", exc_info=True)
+ self._transfer_coordinator.set_exception(exception)
+
+ def _main(self, **kwargs):
+ """The method that will be ran in the executor
+
+ This method must be implemented by subclasses from Task. main() can
+ be implemented with any arguments decided upon by the subclass.
+ """
+ raise NotImplementedError('_main() must be implemented')
+
+ def _wait_on_dependent_futures(self):
+ # Gather all of the futures into that main() depends on.
+ futures_to_wait_on = []
+ for _, future in self._pending_main_kwargs.items():
+ # If the pending main keyword arg is a list then extend the list.
+ if isinstance(future, list):
+ futures_to_wait_on.extend(future)
+ # If the pending main keyword arg is a future append it to the list.
+ else:
+ futures_to_wait_on.append(future)
+ # Now wait for all of the futures to complete.
+ self._wait_until_all_complete(futures_to_wait_on)
+
+ def _wait_until_all_complete(self, futures):
+ # This is a basic implementation of the concurrent.futures.wait()
+ #
+ # concurrent.futures.wait() is not used instead because of this
+ # reported issue: https://bugs.python.org/issue20319.
+ # The issue would occasionally cause multipart uploads to hang
+ # when wait() was called. With this approach, it avoids the
+ # concurrency bug by removing any association with concurrent.futures
+ # implementation of waiters.
+ logger.debug(
+ '%s about to wait for the following futures %s', self, futures
+ )
+ for future in futures:
+ try:
+ logger.debug('%s about to wait for %s', self, future)
+ future.result()
+ except Exception:
+ # result() can also produce exceptions. We want to ignore
+ # these to be deferred to error handling down the road.
+ pass
+ logger.debug('%s done waiting for dependent futures', self)
+
+ def _get_all_main_kwargs(self):
+ # Copy over all of the kwargs that we know is available.
+ kwargs = copy.copy(self._main_kwargs)
+
+ # Iterate through the kwargs whose values are pending on the result
+ # of a future.
+ for key, pending_value in self._pending_main_kwargs.items():
+ # If the value is a list of futures, iterate though the list
+ # appending on the result from each future.
+ if isinstance(pending_value, list):
+ result = []
+ for future in pending_value:
+ result.append(future.result())
+ # Otherwise if the pending_value is a future, just wait for it.
+ else:
+ result = pending_value.result()
+ # Add the retrieved value to the kwargs to be sent to the
+ # main() call.
+ kwargs[key] = result
+ return kwargs
+
+
+class SubmissionTask(Task):
+ """A base class for any submission task
+
+ Submission tasks are the top-level task used to submit a series of tasks
+ to execute a particular transfer.
+ """
+
+ def _main(self, transfer_future, **kwargs):
+ """
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future associated with the
+ transfer request that tasks are being submitted for
+
+ :param kwargs: Any additional kwargs that you may want to pass
+ to the _submit() method
+ """
+ try:
+ self._transfer_coordinator.set_status_to_queued()
+
+ # Before submitting any tasks, run all of the on_queued callbacks
+ on_queued_callbacks = get_callbacks(transfer_future, 'queued')
+ for on_queued_callback in on_queued_callbacks:
+ on_queued_callback()
+
+ # Once callbacks have been ran set the status to running.
+ self._transfer_coordinator.set_status_to_running()
+
+ # Call the submit method to start submitting tasks to execute the
+ # transfer.
+ self._submit(transfer_future=transfer_future, **kwargs)
+ except BaseException as e:
+ # If there was an exception raised during the submission of task
+ # there is a chance that the final task that signals if a transfer
+ # is done and too run the cleanup may never have been submitted in
+ # the first place so we need to account accordingly.
+ #
+ # Note that BaseException is caught, instead of Exception, because
+ # for some implementations of executors, specifically the serial
+ # implementation, the SubmissionTask is directly exposed to
+ # KeyboardInterupts and so needs to cleanup and signal done
+ # for those as well.
+
+ # Set the exception, that caused the process to fail.
+ self._log_and_set_exception(e)
+
+ # Wait for all possibly associated futures that may have spawned
+ # from this submission task have finished before we announce the
+ # transfer done.
+ self._wait_for_all_submitted_futures_to_complete()
+
+ # Announce the transfer as done, which will run any cleanups
+ # and done callbacks as well.
+ self._transfer_coordinator.announce_done()
+
+ def _submit(self, transfer_future, **kwargs):
+ """The submission method to be implemented
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future associated with the
+ transfer request that tasks are being submitted for
+
+ :param kwargs: Any additional keyword arguments you want to be passed
+ in
+ """
+ raise NotImplementedError('_submit() must be implemented')
+
+ def _wait_for_all_submitted_futures_to_complete(self):
+ # We want to wait for all futures that were submitted to
+ # complete as we do not want the cleanup callbacks or done callbacks
+ # to be called to early. The main problem is any task that was
+ # submitted may have submitted even more during its process and so
+ # we need to account accordingly.
+
+ # First get all of the futures that were submitted up to this point.
+ submitted_futures = self._transfer_coordinator.associated_futures
+ while submitted_futures:
+ # Wait for those futures to complete.
+ self._wait_until_all_complete(submitted_futures)
+ # However, more futures may have been submitted as we waited so
+ # we need to check again for any more associated futures.
+ possibly_more_submitted_futures = (
+ self._transfer_coordinator.associated_futures
+ )
+ # If the current list of submitted futures is equal to the
+ # the list of associated futures for when after the wait completes,
+ # we can ensure no more futures were submitted in waiting on
+ # the current list of futures to complete ultimately meaning all
+ # futures that may have spawned from the original submission task
+ # have completed.
+ if submitted_futures == possibly_more_submitted_futures:
+ break
+ submitted_futures = possibly_more_submitted_futures
+
+
+class CreateMultipartUploadTask(Task):
+ """Task to initiate a multipart upload"""
+
+ def _main(self, client, bucket, key, extra_args):
+ """
+ :param client: The client to use when calling CreateMultipartUpload
+ :param bucket: The name of the bucket to upload to
+ :param key: The name of the key to upload to
+ :param extra_args: A dictionary of any extra arguments that may be
+ used in the initialization.
+
+ :returns: The upload id of the multipart upload
+ """
+ # Create the multipart upload.
+ response = client.create_multipart_upload(
+ Bucket=bucket, Key=key, **extra_args
+ )
+ upload_id = response['UploadId']
+
+ # Add a cleanup if the multipart upload fails at any point.
+ self._transfer_coordinator.add_failure_cleanup(
+ client.abort_multipart_upload,
+ Bucket=bucket,
+ Key=key,
+ UploadId=upload_id,
+ )
+ return upload_id
+
+
+class CompleteMultipartUploadTask(Task):
+ """Task to complete a multipart upload"""
+
+ def _main(self, client, bucket, key, upload_id, parts, extra_args):
+ """
+ :param client: The client to use when calling CompleteMultipartUpload
+ :param bucket: The name of the bucket to upload to
+ :param key: The name of the key to upload to
+ :param upload_id: The id of the upload
+ :param parts: A list of parts to use to complete the multipart upload::
+
+ [{'Etag': etag_value, 'PartNumber': part_number}, ...]
+
+ Each element in the list consists of a return value from
+ ``UploadPartTask.main()``.
+ :param extra_args: A dictionary of any extra arguments that may be
+ used in completing the multipart transfer.
+ """
+ client.complete_multipart_upload(
+ Bucket=bucket,
+ Key=key,
+ UploadId=upload_id,
+ MultipartUpload={'Parts': parts},
+ **extra_args,
+ )
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/upload.py b/.venv/lib/python3.12/site-packages/s3transfer/upload.py
new file mode 100644
index 00000000..a3db8f89
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/upload.py
@@ -0,0 +1,840 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import math
+from io import BytesIO
+
+from s3transfer.compat import readable, seekable
+from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS
+from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
+from s3transfer.tasks import (
+ CompleteMultipartUploadTask,
+ CreateMultipartUploadTask,
+ SubmissionTask,
+ Task,
+)
+from s3transfer.utils import (
+ ChunksizeAdjuster,
+ DeferredOpenFile,
+ get_callbacks,
+ get_filtered_dict,
+)
+
+
+class AggregatedProgressCallback:
+ def __init__(self, callbacks, threshold=1024 * 256):
+ """Aggregates progress updates for every provided progress callback
+
+ :type callbacks: A list of functions that accepts bytes_transferred
+ as a single argument
+ :param callbacks: The callbacks to invoke when threshold is reached
+
+ :type threshold: int
+ :param threshold: The progress threshold in which to take the
+ aggregated progress and invoke the progress callback with that
+ aggregated progress total
+ """
+ self._callbacks = callbacks
+ self._threshold = threshold
+ self._bytes_seen = 0
+
+ def __call__(self, bytes_transferred):
+ self._bytes_seen += bytes_transferred
+ if self._bytes_seen >= self._threshold:
+ self._trigger_callbacks()
+
+ def flush(self):
+ """Flushes out any progress that has not been sent to its callbacks"""
+ if self._bytes_seen > 0:
+ self._trigger_callbacks()
+
+ def _trigger_callbacks(self):
+ for callback in self._callbacks:
+ callback(bytes_transferred=self._bytes_seen)
+ self._bytes_seen = 0
+
+
+class InterruptReader:
+ """Wrapper that can interrupt reading using an error
+
+ It uses a transfer coordinator to propagate an error if it notices
+ that a read is being made while the file is being read from.
+
+ :type fileobj: file-like obj
+ :param fileobj: The file-like object to read from
+
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ :param transfer_coordinator: The transfer coordinator to use if the
+ reader needs to be interrupted.
+ """
+
+ def __init__(self, fileobj, transfer_coordinator):
+ self._fileobj = fileobj
+ self._transfer_coordinator = transfer_coordinator
+
+ def read(self, amount=None):
+ # If there is an exception, then raise the exception.
+ # We raise an error instead of returning no bytes because for
+ # requests where the content length and md5 was sent, it will
+ # cause md5 mismatches and retries as there was no indication that
+ # the stream being read from encountered any issues.
+ if self._transfer_coordinator.exception:
+ raise self._transfer_coordinator.exception
+ return self._fileobj.read(amount)
+
+ def seek(self, where, whence=0):
+ self._fileobj.seek(where, whence)
+
+ def tell(self):
+ return self._fileobj.tell()
+
+ def close(self):
+ self._fileobj.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+
+class UploadInputManager:
+ """Base manager class for handling various types of files for uploads
+
+ This class is typically used for the UploadSubmissionTask class to help
+ determine the following:
+
+ * How to determine the size of the file
+ * How to determine if a multipart upload is required
+ * How to retrieve the body for a PutObject
+ * How to retrieve the bodies for a set of UploadParts
+
+ The answers/implementations differ for the various types of file inputs
+ that may be accepted. All implementations must subclass and override
+ public methods from this class.
+ """
+
+ def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
+ self._osutil = osutil
+ self._transfer_coordinator = transfer_coordinator
+ self._bandwidth_limiter = bandwidth_limiter
+
+ @classmethod
+ def is_compatible(cls, upload_source):
+ """Determines if the source for the upload is compatible with manager
+
+ :param upload_source: The source for which the upload will pull data
+ from.
+
+ :returns: True if the manager can handle the type of source specified
+ otherwise returns False.
+ """
+ raise NotImplementedError('must implement _is_compatible()')
+
+ def stores_body_in_memory(self, operation_name):
+ """Whether the body it provides are stored in-memory
+
+ :type operation_name: str
+ :param operation_name: The name of the client operation that the body
+ is being used for. Valid operation_names are ``put_object`` and
+ ``upload_part``.
+
+ :rtype: boolean
+ :returns: True if the body returned by the manager will be stored in
+ memory. False if the manager will not directly store the body in
+ memory.
+ """
+ raise NotImplementedError('must implement store_body_in_memory()')
+
+ def provide_transfer_size(self, transfer_future):
+ """Provides the transfer size of an upload
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The future associated with upload request
+ """
+ raise NotImplementedError('must implement provide_transfer_size()')
+
+ def requires_multipart_upload(self, transfer_future, config):
+ """Determines where a multipart upload is required
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The future associated with upload request
+
+ :type config: s3transfer.manager.TransferConfig
+ :param config: The config associated to the transfer manager
+
+ :rtype: boolean
+ :returns: True, if the upload should be multipart based on
+ configuration and size. False, otherwise.
+ """
+ raise NotImplementedError('must implement requires_multipart_upload()')
+
+ def get_put_object_body(self, transfer_future):
+ """Returns the body to use for PutObject
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The future associated with upload request
+
+ :type config: s3transfer.manager.TransferConfig
+ :param config: The config associated to the transfer manager
+
+ :rtype: s3transfer.utils.ReadFileChunk
+ :returns: A ReadFileChunk including all progress callbacks
+ associated with the transfer future.
+ """
+ raise NotImplementedError('must implement get_put_object_body()')
+
+ def yield_upload_part_bodies(self, transfer_future, chunksize):
+ """Yields the part number and body to use for each UploadPart
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The future associated with upload request
+
+ :type chunksize: int
+ :param chunksize: The chunksize to use for this upload.
+
+ :rtype: int, s3transfer.utils.ReadFileChunk
+ :returns: Yields the part number and the ReadFileChunk including all
+ progress callbacks associated with the transfer future for that
+ specific yielded part.
+ """
+ raise NotImplementedError('must implement yield_upload_part_bodies()')
+
+ def _wrap_fileobj(self, fileobj):
+ fileobj = InterruptReader(fileobj, self._transfer_coordinator)
+ if self._bandwidth_limiter:
+ fileobj = self._bandwidth_limiter.get_bandwith_limited_stream(
+ fileobj, self._transfer_coordinator, enabled=False
+ )
+ return fileobj
+
+ def _get_progress_callbacks(self, transfer_future):
+ callbacks = get_callbacks(transfer_future, 'progress')
+ # We only want to be wrapping the callbacks if there are callbacks to
+ # invoke because we do not want to be doing any unnecessary work if
+ # there are no callbacks to invoke.
+ if callbacks:
+ return [AggregatedProgressCallback(callbacks)]
+ return []
+
+ def _get_close_callbacks(self, aggregated_progress_callbacks):
+ return [callback.flush for callback in aggregated_progress_callbacks]
+
+
+class UploadFilenameInputManager(UploadInputManager):
+ """Upload utility for filenames"""
+
+ @classmethod
+ def is_compatible(cls, upload_source):
+ return isinstance(upload_source, str)
+
+ def stores_body_in_memory(self, operation_name):
+ return False
+
+ def provide_transfer_size(self, transfer_future):
+ transfer_future.meta.provide_transfer_size(
+ self._osutil.get_file_size(transfer_future.meta.call_args.fileobj)
+ )
+
+ def requires_multipart_upload(self, transfer_future, config):
+ return transfer_future.meta.size >= config.multipart_threshold
+
+ def get_put_object_body(self, transfer_future):
+ # Get a file-like object for the given input
+ fileobj, full_size = self._get_put_object_fileobj_with_full_size(
+ transfer_future
+ )
+
+ # Wrap fileobj with interrupt reader that will quickly cancel
+ # uploads if needed instead of having to wait for the socket
+ # to completely read all of the data.
+ fileobj = self._wrap_fileobj(fileobj)
+
+ callbacks = self._get_progress_callbacks(transfer_future)
+ close_callbacks = self._get_close_callbacks(callbacks)
+ size = transfer_future.meta.size
+ # Return the file-like object wrapped into a ReadFileChunk to get
+ # progress.
+ return self._osutil.open_file_chunk_reader_from_fileobj(
+ fileobj=fileobj,
+ chunk_size=size,
+ full_file_size=full_size,
+ callbacks=callbacks,
+ close_callbacks=close_callbacks,
+ )
+
+ def yield_upload_part_bodies(self, transfer_future, chunksize):
+ full_file_size = transfer_future.meta.size
+ num_parts = self._get_num_parts(transfer_future, chunksize)
+ for part_number in range(1, num_parts + 1):
+ callbacks = self._get_progress_callbacks(transfer_future)
+ close_callbacks = self._get_close_callbacks(callbacks)
+ start_byte = chunksize * (part_number - 1)
+ # Get a file-like object for that part and the size of the full
+ # file size for the associated file-like object for that part.
+ fileobj, full_size = self._get_upload_part_fileobj_with_full_size(
+ transfer_future.meta.call_args.fileobj,
+ start_byte=start_byte,
+ part_size=chunksize,
+ full_file_size=full_file_size,
+ )
+
+ # Wrap fileobj with interrupt reader that will quickly cancel
+ # uploads if needed instead of having to wait for the socket
+ # to completely read all of the data.
+ fileobj = self._wrap_fileobj(fileobj)
+
+ # Wrap the file-like object into a ReadFileChunk to get progress.
+ read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj(
+ fileobj=fileobj,
+ chunk_size=chunksize,
+ full_file_size=full_size,
+ callbacks=callbacks,
+ close_callbacks=close_callbacks,
+ )
+ yield part_number, read_file_chunk
+
+ def _get_deferred_open_file(self, fileobj, start_byte):
+ fileobj = DeferredOpenFile(
+ fileobj, start_byte, open_function=self._osutil.open
+ )
+ return fileobj
+
+ def _get_put_object_fileobj_with_full_size(self, transfer_future):
+ fileobj = transfer_future.meta.call_args.fileobj
+ size = transfer_future.meta.size
+ return self._get_deferred_open_file(fileobj, 0), size
+
+ def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
+ start_byte = kwargs['start_byte']
+ full_size = kwargs['full_file_size']
+ return self._get_deferred_open_file(fileobj, start_byte), full_size
+
+ def _get_num_parts(self, transfer_future, part_size):
+ return int(math.ceil(transfer_future.meta.size / float(part_size)))
+
+
+class UploadSeekableInputManager(UploadFilenameInputManager):
+ """Upload utility for an open file object"""
+
+ @classmethod
+ def is_compatible(cls, upload_source):
+ return readable(upload_source) and seekable(upload_source)
+
+ def stores_body_in_memory(self, operation_name):
+ if operation_name == 'put_object':
+ return False
+ else:
+ return True
+
+ def provide_transfer_size(self, transfer_future):
+ fileobj = transfer_future.meta.call_args.fileobj
+ # To determine size, first determine the starting position
+ # Seek to the end and then find the difference in the length
+ # between the end and start positions.
+ start_position = fileobj.tell()
+ fileobj.seek(0, 2)
+ end_position = fileobj.tell()
+ fileobj.seek(start_position)
+ transfer_future.meta.provide_transfer_size(
+ end_position - start_position
+ )
+
+ def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
+ # Note: It is unfortunate that in order to do a multithreaded
+ # multipart upload we cannot simply copy the filelike object
+ # since there is not really a mechanism in python (i.e. os.dup
+ # points to the same OS filehandle which causes concurrency
+ # issues). So instead we need to read from the fileobj and
+ # chunk the data out to separate file-like objects in memory.
+ data = fileobj.read(kwargs['part_size'])
+ # We return the length of the data instead of the full_file_size
+ # because we partitioned the data into separate BytesIO objects
+ # meaning the BytesIO object has no knowledge of its start position
+ # relative the input source nor access to the rest of the input
+ # source. So we must treat it as its own standalone file.
+ return BytesIO(data), len(data)
+
+ def _get_put_object_fileobj_with_full_size(self, transfer_future):
+ fileobj = transfer_future.meta.call_args.fileobj
+ # The current position needs to be taken into account when retrieving
+ # the full size of the file.
+ size = fileobj.tell() + transfer_future.meta.size
+ return fileobj, size
+
+
+class UploadNonSeekableInputManager(UploadInputManager):
+ """Upload utility for a file-like object that cannot seek."""
+
+ def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
+ super().__init__(osutil, transfer_coordinator, bandwidth_limiter)
+ self._initial_data = b''
+
+ @classmethod
+ def is_compatible(cls, upload_source):
+ return readable(upload_source)
+
+ def stores_body_in_memory(self, operation_name):
+ return True
+
+ def provide_transfer_size(self, transfer_future):
+ # No-op because there is no way to do this short of reading the entire
+ # body into memory.
+ return
+
+ def requires_multipart_upload(self, transfer_future, config):
+ # If the user has set the size, we can use that.
+ if transfer_future.meta.size is not None:
+ return transfer_future.meta.size >= config.multipart_threshold
+
+ # This is tricky to determine in this case because we can't know how
+ # large the input is. So to figure it out, we read data into memory
+ # up until the threshold and compare how much data was actually read
+ # against the threshold.
+ fileobj = transfer_future.meta.call_args.fileobj
+ threshold = config.multipart_threshold
+ self._initial_data = self._read(fileobj, threshold, False)
+ if len(self._initial_data) < threshold:
+ return False
+ else:
+ return True
+
+ def get_put_object_body(self, transfer_future):
+ callbacks = self._get_progress_callbacks(transfer_future)
+ close_callbacks = self._get_close_callbacks(callbacks)
+ fileobj = transfer_future.meta.call_args.fileobj
+
+ body = self._wrap_data(
+ self._initial_data + fileobj.read(), callbacks, close_callbacks
+ )
+
+ # Zero out the stored data so we don't have additional copies
+ # hanging around in memory.
+ self._initial_data = None
+ return body
+
+ def yield_upload_part_bodies(self, transfer_future, chunksize):
+ file_object = transfer_future.meta.call_args.fileobj
+ part_number = 0
+
+ # Continue reading parts from the file-like object until it is empty.
+ while True:
+ callbacks = self._get_progress_callbacks(transfer_future)
+ close_callbacks = self._get_close_callbacks(callbacks)
+ part_number += 1
+ part_content = self._read(file_object, chunksize)
+ if not part_content:
+ break
+ part_object = self._wrap_data(
+ part_content, callbacks, close_callbacks
+ )
+
+ # Zero out part_content to avoid hanging on to additional data.
+ part_content = None
+ yield part_number, part_object
+
+ def _read(self, fileobj, amount, truncate=True):
+ """
+ Reads a specific amount of data from a stream and returns it. If there
+ is any data in initial_data, that will be popped out first.
+
+ :type fileobj: A file-like object that implements read
+ :param fileobj: The stream to read from.
+
+ :type amount: int
+ :param amount: The number of bytes to read from the stream.
+
+ :type truncate: bool
+ :param truncate: Whether or not to truncate initial_data after
+ reading from it.
+
+ :return: Generator which generates part bodies from the initial data.
+ """
+ # If the the initial data is empty, we simply read from the fileobj
+ if len(self._initial_data) == 0:
+ return fileobj.read(amount)
+
+ # If the requested number of bytes is less than the amount of
+ # initial data, pull entirely from initial data.
+ if amount <= len(self._initial_data):
+ data = self._initial_data[:amount]
+ # Truncate initial data so we don't hang onto the data longer
+ # than we need.
+ if truncate:
+ self._initial_data = self._initial_data[amount:]
+ return data
+
+ # At this point there is some initial data left, but not enough to
+ # satisfy the number of bytes requested. Pull out the remaining
+ # initial data and read the rest from the fileobj.
+ amount_to_read = amount - len(self._initial_data)
+ data = self._initial_data + fileobj.read(amount_to_read)
+
+ # Zero out initial data so we don't hang onto the data any more.
+ if truncate:
+ self._initial_data = b''
+ return data
+
+ def _wrap_data(self, data, callbacks, close_callbacks):
+ """
+ Wraps data with the interrupt reader and the file chunk reader.
+
+ :type data: bytes
+ :param data: The data to wrap.
+
+ :type callbacks: list
+ :param callbacks: The callbacks associated with the transfer future.
+
+ :type close_callbacks: list
+ :param close_callbacks: The callbacks to be called when closing the
+ wrapper for the data.
+
+ :return: Fully wrapped data.
+ """
+ fileobj = self._wrap_fileobj(BytesIO(data))
+ return self._osutil.open_file_chunk_reader_from_fileobj(
+ fileobj=fileobj,
+ chunk_size=len(data),
+ full_file_size=len(data),
+ callbacks=callbacks,
+ close_callbacks=close_callbacks,
+ )
+
+
+class UploadSubmissionTask(SubmissionTask):
+ """Task for submitting tasks to execute an upload"""
+
+ PUT_OBJECT_BLOCKLIST = ["ChecksumType", "MpuObjectSize"]
+
+ CREATE_MULTIPART_BLOCKLIST = FULL_OBJECT_CHECKSUM_ARGS + ["MpuObjectSize"]
+
+ UPLOAD_PART_ARGS = [
+ 'ChecksumAlgorithm',
+ 'SSECustomerKey',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKeyMD5',
+ 'RequestPayer',
+ 'ExpectedBucketOwner',
+ ]
+
+ COMPLETE_MULTIPART_ARGS = [
+ 'SSECustomerKey',
+ 'SSECustomerAlgorithm',
+ 'SSECustomerKeyMD5',
+ 'RequestPayer',
+ 'ExpectedBucketOwner',
+ 'ChecksumType',
+ 'MpuObjectSize',
+ ] + FULL_OBJECT_CHECKSUM_ARGS
+
+ def _get_upload_input_manager_cls(self, transfer_future):
+ """Retrieves a class for managing input for an upload based on file type
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future for the request
+
+ :rtype: class of UploadInputManager
+ :returns: The appropriate class to use for managing a specific type of
+ input for uploads.
+ """
+ upload_manager_resolver_chain = [
+ UploadFilenameInputManager,
+ UploadSeekableInputManager,
+ UploadNonSeekableInputManager,
+ ]
+
+ fileobj = transfer_future.meta.call_args.fileobj
+ for upload_manager_cls in upload_manager_resolver_chain:
+ if upload_manager_cls.is_compatible(fileobj):
+ return upload_manager_cls
+ raise RuntimeError(
+ f'Input {fileobj} of type: {type(fileobj)} is not supported.'
+ )
+
+ def _submit(
+ self,
+ client,
+ config,
+ osutil,
+ request_executor,
+ transfer_future,
+ bandwidth_limiter=None,
+ ):
+ """
+ :param client: The client associated with the transfer manager
+
+ :type config: s3transfer.manager.TransferConfig
+ :param config: The transfer config associated with the transfer
+ manager
+
+ :type osutil: s3transfer.utils.OSUtil
+ :param osutil: The os utility associated to the transfer manager
+
+ :type request_executor: s3transfer.futures.BoundedExecutor
+ :param request_executor: The request executor associated with the
+ transfer manager
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future associated with the
+ transfer request that tasks are being submitted for
+ """
+ upload_input_manager = self._get_upload_input_manager_cls(
+ transfer_future
+ )(osutil, self._transfer_coordinator, bandwidth_limiter)
+
+ # Determine the size if it was not provided
+ if transfer_future.meta.size is None:
+ upload_input_manager.provide_transfer_size(transfer_future)
+
+ # Do a multipart upload if needed, otherwise do a regular put object.
+ if not upload_input_manager.requires_multipart_upload(
+ transfer_future, config
+ ):
+ self._submit_upload_request(
+ client,
+ config,
+ osutil,
+ request_executor,
+ transfer_future,
+ upload_input_manager,
+ )
+ else:
+ self._submit_multipart_request(
+ client,
+ config,
+ osutil,
+ request_executor,
+ transfer_future,
+ upload_input_manager,
+ )
+
+ def _submit_upload_request(
+ self,
+ client,
+ config,
+ osutil,
+ request_executor,
+ transfer_future,
+ upload_input_manager,
+ ):
+ call_args = transfer_future.meta.call_args
+
+ put_object_extra_args = self._extra_put_object_args(
+ call_args.extra_args
+ )
+
+ # Get any tags that need to be associated to the put object task
+ put_object_tag = self._get_upload_task_tag(
+ upload_input_manager, 'put_object'
+ )
+
+ # Submit the request of a single upload.
+ self._transfer_coordinator.submit(
+ request_executor,
+ PutObjectTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'fileobj': upload_input_manager.get_put_object_body(
+ transfer_future
+ ),
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'extra_args': put_object_extra_args,
+ },
+ is_final=True,
+ ),
+ tag=put_object_tag,
+ )
+
+ def _submit_multipart_request(
+ self,
+ client,
+ config,
+ osutil,
+ request_executor,
+ transfer_future,
+ upload_input_manager,
+ ):
+ call_args = transfer_future.meta.call_args
+
+ # When a user provided checksum is passed, set "ChecksumType" to "FULL_OBJECT"
+ # and "ChecksumAlgorithm" to the related algorithm.
+ for checksum in FULL_OBJECT_CHECKSUM_ARGS:
+ if checksum in call_args.extra_args:
+ call_args.extra_args["ChecksumType"] = "FULL_OBJECT"
+ call_args.extra_args["ChecksumAlgorithm"] = checksum.replace(
+ "Checksum", ""
+ )
+
+ create_multipart_extra_args = self._extra_create_multipart_args(
+ call_args.extra_args
+ )
+
+ # Submit the request to create a multipart upload.
+ create_multipart_future = self._transfer_coordinator.submit(
+ request_executor,
+ CreateMultipartUploadTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'extra_args': create_multipart_extra_args,
+ },
+ ),
+ )
+
+ # Submit requests to upload the parts of the file.
+ part_futures = []
+ extra_part_args = self._extra_upload_part_args(call_args.extra_args)
+
+ # Get any tags that need to be associated to the submitted task
+ # for upload the data
+ upload_part_tag = self._get_upload_task_tag(
+ upload_input_manager, 'upload_part'
+ )
+
+ size = transfer_future.meta.size
+ adjuster = ChunksizeAdjuster()
+ chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size)
+ part_iterator = upload_input_manager.yield_upload_part_bodies(
+ transfer_future, chunksize
+ )
+
+ for part_number, fileobj in part_iterator:
+ part_futures.append(
+ self._transfer_coordinator.submit(
+ request_executor,
+ UploadPartTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'fileobj': fileobj,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'part_number': part_number,
+ 'extra_args': extra_part_args,
+ },
+ pending_main_kwargs={
+ 'upload_id': create_multipart_future
+ },
+ ),
+ tag=upload_part_tag,
+ )
+ )
+
+ complete_multipart_extra_args = self._extra_complete_multipart_args(
+ call_args.extra_args
+ )
+ # Submit the request to complete the multipart upload.
+ self._transfer_coordinator.submit(
+ request_executor,
+ CompleteMultipartUploadTask(
+ transfer_coordinator=self._transfer_coordinator,
+ main_kwargs={
+ 'client': client,
+ 'bucket': call_args.bucket,
+ 'key': call_args.key,
+ 'extra_args': complete_multipart_extra_args,
+ },
+ pending_main_kwargs={
+ 'upload_id': create_multipart_future,
+ 'parts': part_futures,
+ },
+ is_final=True,
+ ),
+ )
+
+ def _extra_upload_part_args(self, extra_args):
+ # Only the args in UPLOAD_PART_ARGS actually need to be passed
+ # onto the upload_part calls.
+ return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS)
+
+ def _extra_complete_multipart_args(self, extra_args):
+ return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
+
+ def _extra_create_multipart_args(self, extra_args):
+ return get_filtered_dict(
+ extra_args, blocklisted_keys=self.CREATE_MULTIPART_BLOCKLIST
+ )
+
+ def _extra_put_object_args(self, extra_args):
+ return get_filtered_dict(
+ extra_args, blocklisted_keys=self.PUT_OBJECT_BLOCKLIST
+ )
+
+ def _get_upload_task_tag(self, upload_input_manager, operation_name):
+ tag = None
+ if upload_input_manager.stores_body_in_memory(operation_name):
+ tag = IN_MEMORY_UPLOAD_TAG
+ return tag
+
+
+class PutObjectTask(Task):
+ """Task to do a nonmultipart upload"""
+
+ def _main(self, client, fileobj, bucket, key, extra_args):
+ """
+ :param client: The client to use when calling PutObject
+ :param fileobj: The file to upload.
+ :param bucket: The name of the bucket to upload to
+ :param key: The name of the key to upload to
+ :param extra_args: A dictionary of any extra arguments that may be
+ used in the upload.
+ """
+ with fileobj as body:
+ client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
+
+
+class UploadPartTask(Task):
+ """Task to upload a part in a multipart upload"""
+
+ def _main(
+ self, client, fileobj, bucket, key, upload_id, part_number, extra_args
+ ):
+ """
+ :param client: The client to use when calling PutObject
+ :param fileobj: The file to upload.
+ :param bucket: The name of the bucket to upload to
+ :param key: The name of the key to upload to
+ :param upload_id: The id of the upload
+ :param part_number: The number representing the part of the multipart
+ upload
+ :param extra_args: A dictionary of any extra arguments that may be
+ used in the upload.
+
+ :rtype: dict
+ :returns: A dictionary representing a part::
+
+ {'Etag': etag_value, 'PartNumber': part_number}
+
+ This value can be appended to a list to be used to complete
+ the multipart upload.
+ """
+ with fileobj as body:
+ response = client.upload_part(
+ Bucket=bucket,
+ Key=key,
+ UploadId=upload_id,
+ PartNumber=part_number,
+ Body=body,
+ **extra_args,
+ )
+ etag = response['ETag']
+ part_metadata = {'ETag': etag, 'PartNumber': part_number}
+ if 'ChecksumAlgorithm' in extra_args:
+ algorithm_name = extra_args['ChecksumAlgorithm'].upper()
+ checksum_member = f'Checksum{algorithm_name}'
+ if checksum_member in response:
+ part_metadata[checksum_member] = response[checksum_member]
+ return part_metadata
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/utils.py b/.venv/lib/python3.12/site-packages/s3transfer/utils.py
new file mode 100644
index 00000000..f2c4e06c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/utils.py
@@ -0,0 +1,833 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import functools
+import logging
+import math
+import os
+import random
+import socket
+import stat
+import string
+import threading
+from collections import defaultdict
+
+from botocore.exceptions import (
+ IncompleteReadError,
+ ReadTimeoutError,
+ ResponseStreamingError,
+)
+from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper
+from botocore.utils import is_s3express_bucket
+
+from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file
+from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS
+
+MAX_PARTS = 10000
+# The maximum file size you can upload via S3 per request.
+# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
+# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
+MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3)
+MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2)
+logger = logging.getLogger(__name__)
+
+
+S3_RETRYABLE_DOWNLOAD_ERRORS = (
+ socket.timeout,
+ SOCKET_ERROR,
+ ReadTimeoutError,
+ IncompleteReadError,
+ ResponseStreamingError,
+)
+
+
+def random_file_extension(num_digits=8):
+ return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
+
+
+def signal_not_transferring(request, operation_name, **kwargs):
+ if operation_name in ['PutObject', 'UploadPart'] and hasattr(
+ request.body, 'signal_not_transferring'
+ ):
+ request.body.signal_not_transferring()
+
+
+def signal_transferring(request, operation_name, **kwargs):
+ if operation_name in ['PutObject', 'UploadPart']:
+ body = request.body
+ if isinstance(body, AwsChunkedWrapper):
+ body = getattr(body, '_raw', None)
+ if hasattr(body, 'signal_transferring'):
+ body.signal_transferring()
+
+
+def calculate_num_parts(size, part_size):
+ return int(math.ceil(size / float(part_size)))
+
+
+def calculate_range_parameter(
+ part_size, part_index, num_parts, total_size=None
+):
+ """Calculate the range parameter for multipart downloads/copies
+
+ :type part_size: int
+ :param part_size: The size of the part
+
+ :type part_index: int
+ :param part_index: The index for which this parts starts. This index starts
+ at zero
+
+ :type num_parts: int
+ :param num_parts: The total number of parts in the transfer
+
+ :returns: The value to use for Range parameter on downloads or
+ the CopySourceRange parameter for copies
+ """
+ # Used to calculate the Range parameter
+ start_range = part_index * part_size
+ if part_index == num_parts - 1:
+ end_range = ''
+ if total_size is not None:
+ end_range = str(total_size - 1)
+ else:
+ end_range = start_range + part_size - 1
+ range_param = f'bytes={start_range}-{end_range}'
+ return range_param
+
+
+def get_callbacks(transfer_future, callback_type):
+ """Retrieves callbacks from a subscriber
+
+ :type transfer_future: s3transfer.futures.TransferFuture
+ :param transfer_future: The transfer future the subscriber is associated
+ to.
+
+ :type callback_type: str
+ :param callback_type: The type of callback to retrieve from the subscriber.
+ Valid types include:
+ * 'queued'
+ * 'progress'
+ * 'done'
+
+ :returns: A list of callbacks for the type specified. All callbacks are
+ preinjected with the transfer future.
+ """
+ callbacks = []
+ for subscriber in transfer_future.meta.call_args.subscribers:
+ callback_name = 'on_' + callback_type
+ if hasattr(subscriber, callback_name):
+ callbacks.append(
+ functools.partial(
+ getattr(subscriber, callback_name), future=transfer_future
+ )
+ )
+ return callbacks
+
+
+def invoke_progress_callbacks(callbacks, bytes_transferred):
+ """Calls all progress callbacks
+
+ :param callbacks: A list of progress callbacks to invoke
+ :param bytes_transferred: The number of bytes transferred. This is passed
+ to the callbacks. If no bytes were transferred the callbacks will not
+ be invoked because no progress was achieved. It is also possible
+ to receive a negative amount which comes from retrying a transfer
+ request.
+ """
+ # Only invoke the callbacks if bytes were actually transferred.
+ if bytes_transferred:
+ for callback in callbacks:
+ callback(bytes_transferred=bytes_transferred)
+
+
+def get_filtered_dict(
+ original_dict, whitelisted_keys=None, blocklisted_keys=None
+):
+ """Gets a dictionary filtered by whitelisted and blocklisted keys.
+
+ :param original_dict: The original dictionary of arguments to source keys
+ and values.
+ :param whitelisted_key: A list of keys to include in the filtered
+ dictionary.
+ :param blocklisted_key: A list of keys to exclude in the filtered
+ dictionary.
+
+ :returns: A dictionary containing key/values from the original dictionary
+ whose key was included in the whitelist and/or not included in the
+ blocklist.
+ """
+ filtered_dict = {}
+ for key, value in original_dict.items():
+ if (whitelisted_keys and key in whitelisted_keys) or (
+ blocklisted_keys and key not in blocklisted_keys
+ ):
+ filtered_dict[key] = value
+ return filtered_dict
+
+
+class CallArgs:
+ def __init__(self, **kwargs):
+ """A class that records call arguments
+
+ The call arguments must be passed as keyword arguments. It will set
+ each keyword argument as an attribute of the object along with its
+ associated value.
+ """
+ for arg, value in kwargs.items():
+ setattr(self, arg, value)
+
+
+class FunctionContainer:
+ """An object that contains a function and any args or kwargs to call it
+
+ When called the provided function will be called with provided args
+ and kwargs.
+ """
+
+ def __init__(self, func, *args, **kwargs):
+ self._func = func
+ self._args = args
+ self._kwargs = kwargs
+
+ def __repr__(self):
+ return f'Function: {self._func} with args {self._args} and kwargs {self._kwargs}'
+
+ def __call__(self):
+ return self._func(*self._args, **self._kwargs)
+
+
+class CountCallbackInvoker:
+ """An abstraction to invoke a callback when a shared count reaches zero
+
+ :param callback: Callback invoke when finalized count reaches zero
+ """
+
+ def __init__(self, callback):
+ self._lock = threading.Lock()
+ self._callback = callback
+ self._count = 0
+ self._is_finalized = False
+
+ @property
+ def current_count(self):
+ with self._lock:
+ return self._count
+
+ def increment(self):
+ """Increment the count by one"""
+ with self._lock:
+ if self._is_finalized:
+ raise RuntimeError(
+ 'Counter has been finalized it can no longer be '
+ 'incremented.'
+ )
+ self._count += 1
+
+ def decrement(self):
+ """Decrement the count by one"""
+ with self._lock:
+ if self._count == 0:
+ raise RuntimeError(
+ 'Counter is at zero. It cannot dip below zero'
+ )
+ self._count -= 1
+ if self._is_finalized and self._count == 0:
+ self._callback()
+
+ def finalize(self):
+ """Finalize the counter
+
+ Once finalized, the counter never be incremented and the callback
+ can be invoked once the count reaches zero
+ """
+ with self._lock:
+ self._is_finalized = True
+ if self._count == 0:
+ self._callback()
+
+
+class OSUtils:
+ _MAX_FILENAME_LEN = 255
+
+ def get_file_size(self, filename):
+ return os.path.getsize(filename)
+
+ def open_file_chunk_reader(self, filename, start_byte, size, callbacks):
+ return ReadFileChunk.from_filename(
+ filename, start_byte, size, callbacks, enable_callbacks=False
+ )
+
+ def open_file_chunk_reader_from_fileobj(
+ self,
+ fileobj,
+ chunk_size,
+ full_file_size,
+ callbacks,
+ close_callbacks=None,
+ ):
+ return ReadFileChunk(
+ fileobj,
+ chunk_size,
+ full_file_size,
+ callbacks=callbacks,
+ enable_callbacks=False,
+ close_callbacks=close_callbacks,
+ )
+
+ def open(self, filename, mode):
+ return open(filename, mode)
+
+ def remove_file(self, filename):
+ """Remove a file, noop if file does not exist."""
+ # Unlike os.remove, if the file does not exist,
+ # then this method does nothing.
+ try:
+ os.remove(filename)
+ except OSError:
+ pass
+
+ def rename_file(self, current_filename, new_filename):
+ rename_file(current_filename, new_filename)
+
+ def is_special_file(cls, filename):
+ """Checks to see if a file is a special UNIX file.
+
+ It checks if the file is a character special device, block special
+ device, FIFO, or socket.
+
+ :param filename: Name of the file
+
+ :returns: True if the file is a special file. False, if is not.
+ """
+ # If it does not exist, it must be a new file so it cannot be
+ # a special file.
+ if not os.path.exists(filename):
+ return False
+ mode = os.stat(filename).st_mode
+ # Character special device.
+ if stat.S_ISCHR(mode):
+ return True
+ # Block special device
+ if stat.S_ISBLK(mode):
+ return True
+ # Named pipe / FIFO
+ if stat.S_ISFIFO(mode):
+ return True
+ # Socket.
+ if stat.S_ISSOCK(mode):
+ return True
+ return False
+
+ def get_temp_filename(self, filename):
+ suffix = os.extsep + random_file_extension()
+ path = os.path.dirname(filename)
+ name = os.path.basename(filename)
+ temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix
+ return os.path.join(path, temp_filename)
+
+ def allocate(self, filename, size):
+ try:
+ with self.open(filename, 'wb') as f:
+ fallocate(f, size)
+ except OSError:
+ self.remove_file(filename)
+ raise
+
+
+class DeferredOpenFile:
+ def __init__(self, filename, start_byte=0, mode='rb', open_function=open):
+ """A class that defers the opening of a file till needed
+
+ This is useful for deferring opening of a file till it is needed
+ in a separate thread, as there is a limit of how many open files
+ there can be in a single thread for most operating systems. The
+ file gets opened in the following methods: ``read()``, ``seek()``,
+ and ``__enter__()``
+
+ :type filename: str
+ :param filename: The name of the file to open
+
+ :type start_byte: int
+ :param start_byte: The byte to seek to when the file is opened.
+
+ :type mode: str
+ :param mode: The mode to use to open the file
+
+ :type open_function: function
+ :param open_function: The function to use to open the file
+ """
+ self._filename = filename
+ self._fileobj = None
+ self._start_byte = start_byte
+ self._mode = mode
+ self._open_function = open_function
+
+ def _open_if_needed(self):
+ if self._fileobj is None:
+ self._fileobj = self._open_function(self._filename, self._mode)
+ if self._start_byte != 0:
+ self._fileobj.seek(self._start_byte)
+
+ @property
+ def name(self):
+ return self._filename
+
+ def read(self, amount=None):
+ self._open_if_needed()
+ return self._fileobj.read(amount)
+
+ def write(self, data):
+ self._open_if_needed()
+ self._fileobj.write(data)
+
+ def seek(self, where, whence=0):
+ self._open_if_needed()
+ self._fileobj.seek(where, whence)
+
+ def tell(self):
+ if self._fileobj is None:
+ return self._start_byte
+ return self._fileobj.tell()
+
+ def close(self):
+ if self._fileobj:
+ self._fileobj.close()
+
+ def __enter__(self):
+ self._open_if_needed()
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+
+class ReadFileChunk:
+ def __init__(
+ self,
+ fileobj,
+ chunk_size,
+ full_file_size,
+ callbacks=None,
+ enable_callbacks=True,
+ close_callbacks=None,
+ ):
+ """
+
+ Given a file object shown below::
+
+ |___________________________________________________|
+ 0 | | full_file_size
+ |----chunk_size---|
+ f.tell()
+
+ :type fileobj: file
+ :param fileobj: File like object
+
+ :type chunk_size: int
+ :param chunk_size: The max chunk size to read. Trying to read
+ pass the end of the chunk size will behave like you've
+ reached the end of the file.
+
+ :type full_file_size: int
+ :param full_file_size: The entire content length associated
+ with ``fileobj``.
+
+ :type callbacks: A list of function(amount_read)
+ :param callbacks: Called whenever data is read from this object in the
+ order provided.
+
+ :type enable_callbacks: boolean
+ :param enable_callbacks: True if to run callbacks. Otherwise, do not
+ run callbacks
+
+ :type close_callbacks: A list of function()
+ :param close_callbacks: Called when close is called. The function
+ should take no arguments.
+ """
+ self._fileobj = fileobj
+ self._start_byte = self._fileobj.tell()
+ self._size = self._calculate_file_size(
+ self._fileobj,
+ requested_size=chunk_size,
+ start_byte=self._start_byte,
+ actual_file_size=full_file_size,
+ )
+ # _amount_read represents the position in the chunk and may exceed
+ # the chunk size, but won't allow reads out of bounds.
+ self._amount_read = 0
+ self._callbacks = callbacks
+ if callbacks is None:
+ self._callbacks = []
+ self._callbacks_enabled = enable_callbacks
+ self._close_callbacks = close_callbacks
+ if close_callbacks is None:
+ self._close_callbacks = close_callbacks
+
+ @classmethod
+ def from_filename(
+ cls,
+ filename,
+ start_byte,
+ chunk_size,
+ callbacks=None,
+ enable_callbacks=True,
+ ):
+ """Convenience factory function to create from a filename.
+
+ :type start_byte: int
+ :param start_byte: The first byte from which to start reading.
+
+ :type chunk_size: int
+ :param chunk_size: The max chunk size to read. Trying to read
+ pass the end of the chunk size will behave like you've
+ reached the end of the file.
+
+ :type full_file_size: int
+ :param full_file_size: The entire content length associated
+ with ``fileobj``.
+
+ :type callbacks: function(amount_read)
+ :param callbacks: Called whenever data is read from this object.
+
+ :type enable_callbacks: bool
+ :param enable_callbacks: Indicate whether to invoke callback
+ during read() calls.
+
+ :rtype: ``ReadFileChunk``
+ :return: A new instance of ``ReadFileChunk``
+
+ """
+ f = open(filename, 'rb')
+ f.seek(start_byte)
+ file_size = os.fstat(f.fileno()).st_size
+ return cls(f, chunk_size, file_size, callbacks, enable_callbacks)
+
+ def _calculate_file_size(
+ self, fileobj, requested_size, start_byte, actual_file_size
+ ):
+ max_chunk_size = actual_file_size - start_byte
+ return min(max_chunk_size, requested_size)
+
+ def read(self, amount=None):
+ amount_left = max(self._size - self._amount_read, 0)
+ if amount is None:
+ amount_to_read = amount_left
+ else:
+ amount_to_read = min(amount_left, amount)
+ data = self._fileobj.read(amount_to_read)
+ self._amount_read += len(data)
+ if self._callbacks is not None and self._callbacks_enabled:
+ invoke_progress_callbacks(self._callbacks, len(data))
+ return data
+
+ def signal_transferring(self):
+ self.enable_callback()
+ if hasattr(self._fileobj, 'signal_transferring'):
+ self._fileobj.signal_transferring()
+
+ def signal_not_transferring(self):
+ self.disable_callback()
+ if hasattr(self._fileobj, 'signal_not_transferring'):
+ self._fileobj.signal_not_transferring()
+
+ def enable_callback(self):
+ self._callbacks_enabled = True
+
+ def disable_callback(self):
+ self._callbacks_enabled = False
+
+ def seek(self, where, whence=0):
+ if whence not in (0, 1, 2):
+ # Mimic io's error for invalid whence values
+ raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
+
+ # Recalculate where based on chunk attributes so seek from file
+ # start (whence=0) is always used
+ where += self._start_byte
+ if whence == 1:
+ where += self._amount_read
+ elif whence == 2:
+ where += self._size
+
+ self._fileobj.seek(max(where, self._start_byte))
+ if self._callbacks is not None and self._callbacks_enabled:
+ # To also rewind the callback() for an accurate progress report
+ bounded_where = max(min(where - self._start_byte, self._size), 0)
+ bounded_amount_read = min(self._amount_read, self._size)
+ amount = bounded_where - bounded_amount_read
+ invoke_progress_callbacks(
+ self._callbacks, bytes_transferred=amount
+ )
+ self._amount_read = max(where - self._start_byte, 0)
+
+ def close(self):
+ if self._close_callbacks is not None and self._callbacks_enabled:
+ for callback in self._close_callbacks:
+ callback()
+ self._fileobj.close()
+
+ def tell(self):
+ return self._amount_read
+
+ def __len__(self):
+ # __len__ is defined because requests will try to determine the length
+ # of the stream to set a content length. In the normal case
+ # of the file it will just stat the file, but we need to change that
+ # behavior. By providing a __len__, requests will use that instead
+ # of stat'ing the file.
+ return self._size
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+ def __iter__(self):
+ # This is a workaround for http://bugs.python.org/issue17575
+ # Basically httplib will try to iterate over the contents, even
+ # if its a file like object. This wasn't noticed because we've
+ # already exhausted the stream so iterating over the file immediately
+ # stops, which is what we're simulating here.
+ return iter([])
+
+
+class StreamReaderProgress:
+ """Wrapper for a read only stream that adds progress callbacks."""
+
+ def __init__(self, stream, callbacks=None):
+ self._stream = stream
+ self._callbacks = callbacks
+ if callbacks is None:
+ self._callbacks = []
+
+ def read(self, *args, **kwargs):
+ value = self._stream.read(*args, **kwargs)
+ invoke_progress_callbacks(self._callbacks, len(value))
+ return value
+
+
+class NoResourcesAvailable(Exception):
+ pass
+
+
+class TaskSemaphore:
+ def __init__(self, count):
+ """A semaphore for the purpose of limiting the number of tasks
+
+ :param count: The size of semaphore
+ """
+ self._semaphore = threading.Semaphore(count)
+
+ def acquire(self, tag, blocking=True):
+ """Acquire the semaphore
+
+ :param tag: A tag identifying what is acquiring the semaphore. Note
+ that this is not really needed to directly use this class but is
+ needed for API compatibility with the SlidingWindowSemaphore
+ implementation.
+ :param block: If True, block until it can be acquired. If False,
+ do not block and raise an exception if cannot be acquired.
+
+ :returns: A token (can be None) to use when releasing the semaphore
+ """
+ logger.debug("Acquiring %s", tag)
+ if not self._semaphore.acquire(blocking):
+ raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'")
+
+ def release(self, tag, acquire_token):
+ """Release the semaphore
+
+ :param tag: A tag identifying what is releasing the semaphore
+ :param acquire_token: The token returned from when the semaphore was
+ acquired. Note that this is not really needed to directly use this
+ class but is needed for API compatibility with the
+ SlidingWindowSemaphore implementation.
+ """
+ logger.debug(f"Releasing acquire {tag}/{acquire_token}")
+ self._semaphore.release()
+
+
+class SlidingWindowSemaphore(TaskSemaphore):
+ """A semaphore used to coordinate sequential resource access.
+
+ This class is similar to the stdlib BoundedSemaphore:
+
+ * It's initialized with a count.
+ * Each call to ``acquire()`` decrements the counter.
+ * If the count is at zero, then ``acquire()`` will either block until the
+ count increases, or if ``blocking=False``, then it will raise
+ a NoResourcesAvailable exception indicating that it failed to acquire the
+ semaphore.
+
+ The main difference is that this semaphore is used to limit
+ access to a resource that requires sequential access. For example,
+ if I want to access resource R that has 20 subresources R_0 - R_19,
+ this semaphore can also enforce that you only have a max range of
+ 10 at any given point in time. You must also specify a tag name
+ when you acquire the semaphore. The sliding window semantics apply
+ on a per tag basis. The internal count will only be incremented
+ when the minimum sequence number for a tag is released.
+
+ """
+
+ def __init__(self, count):
+ self._count = count
+ # Dict[tag, next_sequence_number].
+ self._tag_sequences = defaultdict(int)
+ self._lowest_sequence = {}
+ self._lock = threading.Lock()
+ self._condition = threading.Condition(self._lock)
+ # Dict[tag, List[sequence_number]]
+ self._pending_release = {}
+
+ def current_count(self):
+ with self._lock:
+ return self._count
+
+ def acquire(self, tag, blocking=True):
+ logger.debug("Acquiring %s", tag)
+ self._condition.acquire()
+ try:
+ if self._count == 0:
+ if not blocking:
+ raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'")
+ else:
+ while self._count == 0:
+ self._condition.wait()
+ # self._count is no longer zero.
+ # First, check if this is the first time we're seeing this tag.
+ sequence_number = self._tag_sequences[tag]
+ if sequence_number == 0:
+ # First time seeing the tag, so record we're at 0.
+ self._lowest_sequence[tag] = sequence_number
+ self._tag_sequences[tag] += 1
+ self._count -= 1
+ return sequence_number
+ finally:
+ self._condition.release()
+
+ def release(self, tag, acquire_token):
+ sequence_number = acquire_token
+ logger.debug("Releasing acquire %s/%s", tag, sequence_number)
+ self._condition.acquire()
+ try:
+ if tag not in self._tag_sequences:
+ raise ValueError(f"Attempted to release unknown tag: {tag}")
+ max_sequence = self._tag_sequences[tag]
+ if self._lowest_sequence[tag] == sequence_number:
+ # We can immediately process this request and free up
+ # resources.
+ self._lowest_sequence[tag] += 1
+ self._count += 1
+ self._condition.notify()
+ queued = self._pending_release.get(tag, [])
+ while queued:
+ if self._lowest_sequence[tag] == queued[-1]:
+ queued.pop()
+ self._lowest_sequence[tag] += 1
+ self._count += 1
+ else:
+ break
+ elif self._lowest_sequence[tag] < sequence_number < max_sequence:
+ # We can't do anything right now because we're still waiting
+ # for the min sequence for the tag to be released. We have
+ # to queue this for pending release.
+ self._pending_release.setdefault(tag, []).append(
+ sequence_number
+ )
+ self._pending_release[tag].sort(reverse=True)
+ else:
+ raise ValueError(
+ "Attempted to release unknown sequence number "
+ f"{sequence_number} for tag: {tag}"
+ )
+ finally:
+ self._condition.release()
+
+
+class ChunksizeAdjuster:
+ def __init__(
+ self,
+ max_size=MAX_SINGLE_UPLOAD_SIZE,
+ min_size=MIN_UPLOAD_CHUNKSIZE,
+ max_parts=MAX_PARTS,
+ ):
+ self.max_size = max_size
+ self.min_size = min_size
+ self.max_parts = max_parts
+
+ def adjust_chunksize(self, current_chunksize, file_size=None):
+ """Get a chunksize close to current that fits within all S3 limits.
+
+ :type current_chunksize: int
+ :param current_chunksize: The currently configured chunksize.
+
+ :type file_size: int or None
+ :param file_size: The size of the file to upload. This might be None
+ if the object being transferred has an unknown size.
+
+ :returns: A valid chunksize that fits within configured limits.
+ """
+ chunksize = current_chunksize
+ if file_size is not None:
+ chunksize = self._adjust_for_max_parts(chunksize, file_size)
+ return self._adjust_for_chunksize_limits(chunksize)
+
+ def _adjust_for_chunksize_limits(self, current_chunksize):
+ if current_chunksize > self.max_size:
+ logger.debug(
+ "Chunksize greater than maximum chunksize. "
+ f"Setting to {self.max_size} from {current_chunksize}."
+ )
+ return self.max_size
+ elif current_chunksize < self.min_size:
+ logger.debug(
+ "Chunksize less than minimum chunksize. "
+ f"Setting to {self.min_size} from {current_chunksize}."
+ )
+ return self.min_size
+ else:
+ return current_chunksize
+
+ def _adjust_for_max_parts(self, current_chunksize, file_size):
+ chunksize = current_chunksize
+ num_parts = int(math.ceil(file_size / float(chunksize)))
+
+ while num_parts > self.max_parts:
+ chunksize *= 2
+ num_parts = int(math.ceil(file_size / float(chunksize)))
+
+ if chunksize != current_chunksize:
+ logger.debug(
+ "Chunksize would result in the number of parts exceeding the "
+ f"maximum. Setting to {chunksize} from {current_chunksize}."
+ )
+
+ return chunksize
+
+
+def add_s3express_defaults(bucket, extra_args):
+ """
+ This function has been deprecated, but is kept for backwards compatibility.
+ This function is subject to removal in a future release.
+ """
+ if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args:
+ # Default Transfer Operations to S3Express to use CRC32
+ extra_args["ChecksumAlgorithm"] = "crc32"
+
+
+def set_default_checksum_algorithm(extra_args):
+ """Set the default algorithm to CRC32 if not specified by the user."""
+ if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS):
+ return
+ extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM)