diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/s3transfer | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/s3transfer')
16 files changed, 8264 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/__init__.py b/.venv/lib/python3.12/site-packages/s3transfer/__init__.py new file mode 100644 index 00000000..b8abbf47 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/__init__.py @@ -0,0 +1,887 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Abstractions over S3's upload/download operations. + +This module provides high level abstractions for efficient +uploads/downloads. It handles several things for the user: + +* Automatically switching to multipart transfers when + a file is over a specific size threshold +* Uploading/downloading a file in parallel +* Throttling based on max bandwidth +* Progress callbacks to monitor transfers +* Retries. While botocore handles retries for streaming uploads, + it is not possible for it to handle retries for streaming + downloads. This module handles retries for both cases so + you don't need to implement any retry logic yourself. + +This module has a reasonable set of defaults. It also allows you +to configure many aspects of the transfer process including: + +* Multipart threshold size +* Max parallel downloads +* Max bandwidth +* Socket timeouts +* Retry amounts + +There is no support for s3->s3 multipart copies at this +time. + + +.. _ref_s3transfer_usage: + +Usage +===== + +The simplest way to use this module is: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + transfer = S3Transfer(client) + # Upload /tmp/myfile to s3://bucket/key + transfer.upload_file('/tmp/myfile', 'bucket', 'key') + + # Download s3://bucket/key to /tmp/myfile + transfer.download_file('bucket', 'key', '/tmp/myfile') + +The ``upload_file`` and ``download_file`` methods also accept +``**kwargs``, which will be forwarded through to the corresponding +client operation. Here are a few examples using ``upload_file``:: + + # Making the object public + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'ACL': 'public-read'}) + + # Setting metadata + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) + + # Setting content type + transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', + extra_args={'ContentType': "application/json"}) + + +The ``S3Transfer`` class also supports progress callbacks so you can +provide transfer progress to users. Both the ``upload_file`` and +``download_file`` methods take an optional ``callback`` parameter. +Here's an example of how to print a simple progress percentage +to the user: + +.. code-block:: python + + class ProgressPercentage(object): + def __init__(self, filename): + self._filename = filename + self._size = float(os.path.getsize(filename)) + self._seen_so_far = 0 + self._lock = threading.Lock() + + def __call__(self, bytes_amount): + # To simplify we'll assume this is hooked up + # to a single filename. + with self._lock: + self._seen_so_far += bytes_amount + percentage = (self._seen_so_far / self._size) * 100 + sys.stdout.write( + "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, + self._size, percentage)) + sys.stdout.flush() + + + transfer = S3Transfer(boto3.client('s3', 'us-west-2')) + # Upload /tmp/myfile to s3://bucket/key and print upload progress. + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + callback=ProgressPercentage('/tmp/myfile')) + + + +You can also provide a TransferConfig object to the S3Transfer +object that gives you more fine grained control over the +transfer. For example: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + config = TransferConfig( + multipart_threshold=8 * 1024 * 1024, + max_concurrency=10, + num_download_attempts=10, + ) + transfer = S3Transfer(client, config) + transfer.upload_file('/tmp/foo', 'bucket', 'key') + + +""" + +import concurrent.futures +import functools +import logging +import math +import os +import queue +import random +import socket +import string +import threading + +from botocore.compat import six # noqa: F401 +from botocore.exceptions import IncompleteReadError, ResponseStreamingError +from botocore.vendored.requests.packages.urllib3.exceptions import ( + ReadTimeoutError, +) + +import s3transfer.compat +from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError + +__author__ = 'Amazon Web Services' +__version__ = '0.11.4' + + +class NullHandler(logging.Handler): + def emit(self, record): + pass + + +logger = logging.getLogger(__name__) +logger.addHandler(NullHandler()) + +MB = 1024 * 1024 +SHUTDOWN_SENTINEL = object() + + +def random_file_extension(num_digits=8): + return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) + + +def disable_upload_callbacks(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart'] and hasattr( + request.body, 'disable_callback' + ): + request.body.disable_callback() + + +def enable_upload_callbacks(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart'] and hasattr( + request.body, 'enable_callback' + ): + request.body.enable_callback() + + +class QueueShutdownError(Exception): + pass + + +class ReadFileChunk: + def __init__( + self, + fileobj, + start_byte, + chunk_size, + full_file_size, + callback=None, + enable_callback=True, + ): + """ + + Given a file object shown below: + + |___________________________________________________| + 0 | | full_file_size + |----chunk_size---| + start_byte + + :type fileobj: file + :param fileobj: File like object + + :type start_byte: int + :param start_byte: The first byte from which to start reading. + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callback: function(amount_read) + :param callback: Called whenever data is read from this object. + + """ + self._fileobj = fileobj + self._start_byte = start_byte + self._size = self._calculate_file_size( + self._fileobj, + requested_size=chunk_size, + start_byte=start_byte, + actual_file_size=full_file_size, + ) + self._fileobj.seek(self._start_byte) + self._amount_read = 0 + self._callback = callback + self._callback_enabled = enable_callback + + @classmethod + def from_filename( + cls, + filename, + start_byte, + chunk_size, + callback=None, + enable_callback=True, + ): + """Convenience factory function to create from a filename. + + :type start_byte: int + :param start_byte: The first byte from which to start reading. + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callback: function(amount_read) + :param callback: Called whenever data is read from this object. + + :type enable_callback: bool + :param enable_callback: Indicate whether to invoke callback + during read() calls. + + :rtype: ``ReadFileChunk`` + :return: A new instance of ``ReadFileChunk`` + + """ + f = open(filename, 'rb') + file_size = os.fstat(f.fileno()).st_size + return cls( + f, start_byte, chunk_size, file_size, callback, enable_callback + ) + + def _calculate_file_size( + self, fileobj, requested_size, start_byte, actual_file_size + ): + max_chunk_size = actual_file_size - start_byte + return min(max_chunk_size, requested_size) + + def read(self, amount=None): + if amount is None: + amount_to_read = self._size - self._amount_read + else: + amount_to_read = min(self._size - self._amount_read, amount) + data = self._fileobj.read(amount_to_read) + self._amount_read += len(data) + if self._callback is not None and self._callback_enabled: + self._callback(len(data)) + return data + + def enable_callback(self): + self._callback_enabled = True + + def disable_callback(self): + self._callback_enabled = False + + def seek(self, where): + self._fileobj.seek(self._start_byte + where) + if self._callback is not None and self._callback_enabled: + # To also rewind the callback() for an accurate progress report + self._callback(where - self._amount_read) + self._amount_read = where + + def close(self): + self._fileobj.close() + + def tell(self): + return self._amount_read + + def __len__(self): + # __len__ is defined because requests will try to determine the length + # of the stream to set a content length. In the normal case + # of the file it will just stat the file, but we need to change that + # behavior. By providing a __len__, requests will use that instead + # of stat'ing the file. + return self._size + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + def __iter__(self): + # This is a workaround for http://bugs.python.org/issue17575 + # Basically httplib will try to iterate over the contents, even + # if its a file like object. This wasn't noticed because we've + # already exhausted the stream so iterating over the file immediately + # stops, which is what we're simulating here. + return iter([]) + + +class StreamReaderProgress: + """Wrapper for a read only stream that adds progress callbacks.""" + + def __init__(self, stream, callback=None): + self._stream = stream + self._callback = callback + + def read(self, *args, **kwargs): + value = self._stream.read(*args, **kwargs) + if self._callback is not None: + self._callback(len(value)) + return value + + +class OSUtils: + def get_file_size(self, filename): + return os.path.getsize(filename) + + def open_file_chunk_reader(self, filename, start_byte, size, callback): + return ReadFileChunk.from_filename( + filename, start_byte, size, callback, enable_callback=False + ) + + def open(self, filename, mode): + return open(filename, mode) + + def remove_file(self, filename): + """Remove a file, noop if file does not exist.""" + # Unlike os.remove, if the file does not exist, + # then this method does nothing. + try: + os.remove(filename) + except OSError: + pass + + def rename_file(self, current_filename, new_filename): + s3transfer.compat.rename_file(current_filename, new_filename) + + +class MultipartUploader: + # These are the extra_args that need to be forwarded onto + # subsequent upload_parts. + UPLOAD_PART_ARGS = [ + 'SSECustomerKey', + 'SSECustomerAlgorithm', + 'SSECustomerKeyMD5', + 'RequestPayer', + ] + + def __init__( + self, + client, + config, + osutil, + executor_cls=concurrent.futures.ThreadPoolExecutor, + ): + self._client = client + self._config = config + self._os = osutil + self._executor_cls = executor_cls + + def _extra_upload_part_args(self, extra_args): + # Only the args in UPLOAD_PART_ARGS actually need to be passed + # onto the upload_part calls. + upload_parts_args = {} + for key, value in extra_args.items(): + if key in self.UPLOAD_PART_ARGS: + upload_parts_args[key] = value + return upload_parts_args + + def upload_file(self, filename, bucket, key, callback, extra_args): + response = self._client.create_multipart_upload( + Bucket=bucket, Key=key, **extra_args + ) + upload_id = response['UploadId'] + try: + parts = self._upload_parts( + upload_id, filename, bucket, key, callback, extra_args + ) + except Exception as e: + logger.debug( + "Exception raised while uploading parts, " + "aborting multipart upload.", + exc_info=True, + ) + self._client.abort_multipart_upload( + Bucket=bucket, Key=key, UploadId=upload_id + ) + raise S3UploadFailedError( + "Failed to upload {} to {}: {}".format( + filename, '/'.join([bucket, key]), e + ) + ) + self._client.complete_multipart_upload( + Bucket=bucket, + Key=key, + UploadId=upload_id, + MultipartUpload={'Parts': parts}, + ) + + def _upload_parts( + self, upload_id, filename, bucket, key, callback, extra_args + ): + upload_parts_extra_args = self._extra_upload_part_args(extra_args) + parts = [] + part_size = self._config.multipart_chunksize + num_parts = int( + math.ceil(self._os.get_file_size(filename) / float(part_size)) + ) + max_workers = self._config.max_concurrency + with self._executor_cls(max_workers=max_workers) as executor: + upload_partial = functools.partial( + self._upload_one_part, + filename, + bucket, + key, + upload_id, + part_size, + upload_parts_extra_args, + callback, + ) + for part in executor.map(upload_partial, range(1, num_parts + 1)): + parts.append(part) + return parts + + def _upload_one_part( + self, + filename, + bucket, + key, + upload_id, + part_size, + extra_args, + callback, + part_number, + ): + open_chunk_reader = self._os.open_file_chunk_reader + with open_chunk_reader( + filename, part_size * (part_number - 1), part_size, callback + ) as body: + response = self._client.upload_part( + Bucket=bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + Body=body, + **extra_args, + ) + etag = response['ETag'] + return {'ETag': etag, 'PartNumber': part_number} + + +class ShutdownQueue(queue.Queue): + """A queue implementation that can be shutdown. + + Shutting down a queue means that this class adds a + trigger_shutdown method that will trigger all subsequent + calls to put() to fail with a ``QueueShutdownError``. + + It purposefully deviates from queue.Queue, and is *not* meant + to be a drop in replacement for ``queue.Queue``. + + """ + + def _init(self, maxsize): + self._shutdown = False + self._shutdown_lock = threading.Lock() + # queue.Queue is an old style class so we don't use super(). + return queue.Queue._init(self, maxsize) + + def trigger_shutdown(self): + with self._shutdown_lock: + self._shutdown = True + logger.debug("The IO queue is now shutdown.") + + def put(self, item): + # Note: this is not sufficient, it's still possible to deadlock! + # Need to hook into the condition vars used by this class. + with self._shutdown_lock: + if self._shutdown: + raise QueueShutdownError( + "Cannot put item to queue when " "queue has been shutdown." + ) + return queue.Queue.put(self, item) + + +class MultipartDownloader: + def __init__( + self, + client, + config, + osutil, + executor_cls=concurrent.futures.ThreadPoolExecutor, + ): + self._client = client + self._config = config + self._os = osutil + self._executor_cls = executor_cls + self._ioqueue = ShutdownQueue(self._config.max_io_queue) + + def download_file( + self, bucket, key, filename, object_size, extra_args, callback=None + ): + with self._executor_cls(max_workers=2) as controller: + # 1 thread for the future that manages the uploading of files + # 1 thread for the future that manages IO writes. + download_parts_handler = functools.partial( + self._download_file_as_future, + bucket, + key, + filename, + object_size, + callback, + ) + parts_future = controller.submit(download_parts_handler) + + io_writes_handler = functools.partial( + self._perform_io_writes, filename + ) + io_future = controller.submit(io_writes_handler) + results = concurrent.futures.wait( + [parts_future, io_future], + return_when=concurrent.futures.FIRST_EXCEPTION, + ) + self._process_future_results(results) + + def _process_future_results(self, futures): + finished, unfinished = futures + for future in finished: + future.result() + + def _download_file_as_future( + self, bucket, key, filename, object_size, callback + ): + part_size = self._config.multipart_chunksize + num_parts = int(math.ceil(object_size / float(part_size))) + max_workers = self._config.max_concurrency + download_partial = functools.partial( + self._download_range, + bucket, + key, + filename, + part_size, + num_parts, + callback, + ) + try: + with self._executor_cls(max_workers=max_workers) as executor: + list(executor.map(download_partial, range(num_parts))) + finally: + self._ioqueue.put(SHUTDOWN_SENTINEL) + + def _calculate_range_param(self, part_size, part_index, num_parts): + start_range = part_index * part_size + if part_index == num_parts - 1: + end_range = '' + else: + end_range = start_range + part_size - 1 + range_param = f'bytes={start_range}-{end_range}' + return range_param + + def _download_range( + self, bucket, key, filename, part_size, num_parts, callback, part_index + ): + try: + range_param = self._calculate_range_param( + part_size, part_index, num_parts + ) + + max_attempts = self._config.num_download_attempts + last_exception = None + for i in range(max_attempts): + try: + logger.debug("Making get_object call.") + response = self._client.get_object( + Bucket=bucket, Key=key, Range=range_param + ) + streaming_body = StreamReaderProgress( + response['Body'], callback + ) + buffer_size = 1024 * 16 + current_index = part_size * part_index + for chunk in iter( + lambda: streaming_body.read(buffer_size), b'' + ): + self._ioqueue.put((current_index, chunk)) + current_index += len(chunk) + return + except ( + socket.timeout, + OSError, + ReadTimeoutError, + IncompleteReadError, + ResponseStreamingError, + ) as e: + logger.debug( + "Retrying exception caught (%s), " + "retrying request, (attempt %s / %s)", + e, + i, + max_attempts, + exc_info=True, + ) + last_exception = e + continue + raise RetriesExceededError(last_exception) + finally: + logger.debug("EXITING _download_range for part: %s", part_index) + + def _perform_io_writes(self, filename): + with self._os.open(filename, 'wb') as f: + while True: + task = self._ioqueue.get() + if task is SHUTDOWN_SENTINEL: + logger.debug( + "Shutdown sentinel received in IO handler, " + "shutting down IO handler." + ) + return + else: + try: + offset, data = task + f.seek(offset) + f.write(data) + except Exception as e: + logger.debug( + "Caught exception in IO thread: %s", + e, + exc_info=True, + ) + self._ioqueue.trigger_shutdown() + raise + + +class TransferConfig: + def __init__( + self, + multipart_threshold=8 * MB, + max_concurrency=10, + multipart_chunksize=8 * MB, + num_download_attempts=5, + max_io_queue=100, + ): + self.multipart_threshold = multipart_threshold + self.max_concurrency = max_concurrency + self.multipart_chunksize = multipart_chunksize + self.num_download_attempts = num_download_attempts + self.max_io_queue = max_io_queue + + +class S3Transfer: + ALLOWED_DOWNLOAD_ARGS = [ + 'VersionId', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'RequestPayer', + ] + + ALLOWED_UPLOAD_ARGS = [ + 'ACL', + 'CacheControl', + 'ContentDisposition', + 'ContentEncoding', + 'ContentLanguage', + 'ContentType', + 'Expires', + 'GrantFullControl', + 'GrantRead', + 'GrantReadACP', + 'GrantWriteACL', + 'Metadata', + 'RequestPayer', + 'ServerSideEncryption', + 'StorageClass', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'SSEKMSKeyId', + 'SSEKMSEncryptionContext', + 'Tagging', + ] + + def __init__(self, client, config=None, osutil=None): + self._client = client + self._client.meta.events.register( + 'before-call.s3.*', self._update_checksum_context + ) + if config is None: + config = TransferConfig() + self._config = config + if osutil is None: + osutil = OSUtils() + self._osutil = osutil + + def _update_checksum_context(self, params, **kwargs): + request_context = params.get("context", {}) + checksum_context = request_context.get("checksum", {}) + if "request_algorithm" in checksum_context: + # Force request checksum algorithm in the header if specified. + checksum_context["request_algorithm"]["in"] = "header" + + def upload_file( + self, filename, bucket, key, callback=None, extra_args=None + ): + """Upload a file to an S3 object. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.upload_file() directly. + """ + if extra_args is None: + extra_args = {} + self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) + events = self._client.meta.events + events.register_first( + 'request-created.s3', + disable_upload_callbacks, + unique_id='s3upload-callback-disable', + ) + events.register_last( + 'request-created.s3', + enable_upload_callbacks, + unique_id='s3upload-callback-enable', + ) + if ( + self._osutil.get_file_size(filename) + >= self._config.multipart_threshold + ): + self._multipart_upload(filename, bucket, key, callback, extra_args) + else: + self._put_object(filename, bucket, key, callback, extra_args) + + def _put_object(self, filename, bucket, key, callback, extra_args): + # We're using open_file_chunk_reader so we can take advantage of the + # progress callback functionality. + open_chunk_reader = self._osutil.open_file_chunk_reader + with open_chunk_reader( + filename, + 0, + self._osutil.get_file_size(filename), + callback=callback, + ) as body: + self._client.put_object( + Bucket=bucket, Key=key, Body=body, **extra_args + ) + + def download_file( + self, bucket, key, filename, extra_args=None, callback=None + ): + """Download an S3 object to a file. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.download_file() directly. + """ + # This method will issue a ``head_object`` request to determine + # the size of the S3 object. This is used to determine if the + # object is downloaded in parallel. + if extra_args is None: + extra_args = {} + self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) + object_size = self._object_size(bucket, key, extra_args) + temp_filename = filename + os.extsep + random_file_extension() + try: + self._download_file( + bucket, key, temp_filename, object_size, extra_args, callback + ) + except Exception: + logger.debug( + "Exception caught in download_file, removing partial " + "file: %s", + temp_filename, + exc_info=True, + ) + self._osutil.remove_file(temp_filename) + raise + else: + self._osutil.rename_file(temp_filename, filename) + + def _download_file( + self, bucket, key, filename, object_size, extra_args, callback + ): + if object_size >= self._config.multipart_threshold: + self._ranged_download( + bucket, key, filename, object_size, extra_args, callback + ) + else: + self._get_object(bucket, key, filename, extra_args, callback) + + def _validate_all_known_args(self, actual, allowed): + for kwarg in actual: + if kwarg not in allowed: + raise ValueError( + f"Invalid extra_args key '{kwarg}', " + f"must be one of: {', '.join(allowed)}" + ) + + def _ranged_download( + self, bucket, key, filename, object_size, extra_args, callback + ): + downloader = MultipartDownloader( + self._client, self._config, self._osutil + ) + downloader.download_file( + bucket, key, filename, object_size, extra_args, callback + ) + + def _get_object(self, bucket, key, filename, extra_args, callback): + # precondition: num_download_attempts > 0 + max_attempts = self._config.num_download_attempts + last_exception = None + for i in range(max_attempts): + try: + return self._do_get_object( + bucket, key, filename, extra_args, callback + ) + except ( + socket.timeout, + OSError, + ReadTimeoutError, + IncompleteReadError, + ResponseStreamingError, + ) as e: + # TODO: we need a way to reset the callback if the + # download failed. + logger.debug( + "Retrying exception caught (%s), " + "retrying request, (attempt %s / %s)", + e, + i, + max_attempts, + exc_info=True, + ) + last_exception = e + continue + raise RetriesExceededError(last_exception) + + def _do_get_object(self, bucket, key, filename, extra_args, callback): + response = self._client.get_object( + Bucket=bucket, Key=key, **extra_args + ) + streaming_body = StreamReaderProgress(response['Body'], callback) + with self._osutil.open(filename, 'wb') as f: + for chunk in iter(lambda: streaming_body.read(8192), b''): + f.write(chunk) + + def _object_size(self, bucket, key, extra_args): + return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[ + 'ContentLength' + ] + + def _multipart_upload(self, filename, bucket, key, callback, extra_args): + uploader = MultipartUploader(self._client, self._config, self._osutil) + uploader.upload_file(filename, bucket, key, callback, extra_args) diff --git a/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py b/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py new file mode 100644 index 00000000..9301c8e3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py @@ -0,0 +1,437 @@ +# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import threading +import time + + +class RequestExceededException(Exception): + def __init__(self, requested_amt, retry_time): + """Error when requested amount exceeds what is allowed + + The request that raised this error should be retried after waiting + the time specified by ``retry_time``. + + :type requested_amt: int + :param requested_amt: The originally requested byte amount + + :type retry_time: float + :param retry_time: The length in time to wait to retry for the + requested amount + """ + self.requested_amt = requested_amt + self.retry_time = retry_time + msg = f'Request amount {requested_amt} exceeded the amount available. Retry in {retry_time}' + super().__init__(msg) + + +class RequestToken: + """A token to pass as an identifier when consuming from the LeakyBucket""" + + pass + + +class TimeUtils: + def time(self): + """Get the current time back + + :rtype: float + :returns: The current time in seconds + """ + return time.time() + + def sleep(self, value): + """Sleep for a designated time + + :type value: float + :param value: The time to sleep for in seconds + """ + return time.sleep(value) + + +class BandwidthLimiter: + def __init__(self, leaky_bucket, time_utils=None): + """Limits bandwidth for shared S3 transfers + + :type leaky_bucket: LeakyBucket + :param leaky_bucket: The leaky bucket to use limit bandwidth + + :type time_utils: TimeUtils + :param time_utils: Time utility to use for interacting with time. + """ + self._leaky_bucket = leaky_bucket + self._time_utils = time_utils + if time_utils is None: + self._time_utils = TimeUtils() + + def get_bandwith_limited_stream( + self, fileobj, transfer_coordinator, enabled=True + ): + """Wraps a fileobj in a bandwidth limited stream wrapper + + :type fileobj: file-like obj + :param fileobj: The file-like obj to wrap + + :type transfer_coordinator: s3transfer.futures.TransferCoordinator + param transfer_coordinator: The coordinator for the general transfer + that the wrapped stream is a part of + + :type enabled: boolean + :param enabled: Whether bandwidth limiting should be enabled to start + """ + stream = BandwidthLimitedStream( + fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils + ) + if not enabled: + stream.disable_bandwidth_limiting() + return stream + + +class BandwidthLimitedStream: + def __init__( + self, + fileobj, + leaky_bucket, + transfer_coordinator, + time_utils=None, + bytes_threshold=256 * 1024, + ): + """Limits bandwidth for reads on a wrapped stream + + :type fileobj: file-like object + :param fileobj: The file like object to wrap + + :type leaky_bucket: LeakyBucket + :param leaky_bucket: The leaky bucket to use to throttle reads on + the stream + + :type transfer_coordinator: s3transfer.futures.TransferCoordinator + param transfer_coordinator: The coordinator for the general transfer + that the wrapped stream is a part of + + :type time_utils: TimeUtils + :param time_utils: The time utility to use for interacting with time + """ + self._fileobj = fileobj + self._leaky_bucket = leaky_bucket + self._transfer_coordinator = transfer_coordinator + self._time_utils = time_utils + if time_utils is None: + self._time_utils = TimeUtils() + self._bandwidth_limiting_enabled = True + self._request_token = RequestToken() + self._bytes_seen = 0 + self._bytes_threshold = bytes_threshold + + def enable_bandwidth_limiting(self): + """Enable bandwidth limiting on reads to the stream""" + self._bandwidth_limiting_enabled = True + + def disable_bandwidth_limiting(self): + """Disable bandwidth limiting on reads to the stream""" + self._bandwidth_limiting_enabled = False + + def read(self, amount): + """Read a specified amount + + Reads will only be throttled if bandwidth limiting is enabled. + """ + if not self._bandwidth_limiting_enabled: + return self._fileobj.read(amount) + + # We do not want to be calling consume on every read as the read + # amounts can be small causing the lock of the leaky bucket to + # introduce noticeable overhead. So instead we keep track of + # how many bytes we have seen and only call consume once we pass a + # certain threshold. + self._bytes_seen += amount + if self._bytes_seen < self._bytes_threshold: + return self._fileobj.read(amount) + + self._consume_through_leaky_bucket() + return self._fileobj.read(amount) + + def _consume_through_leaky_bucket(self): + # NOTE: If the read amount on the stream are high, it will result + # in large bursty behavior as there is not an interface for partial + # reads. However given the read's on this abstraction are at most 256KB + # (via downloads), it reduces the burstiness to be small KB bursts at + # worst. + while not self._transfer_coordinator.exception: + try: + self._leaky_bucket.consume( + self._bytes_seen, self._request_token + ) + self._bytes_seen = 0 + return + except RequestExceededException as e: + self._time_utils.sleep(e.retry_time) + else: + raise self._transfer_coordinator.exception + + def signal_transferring(self): + """Signal that data being read is being transferred to S3""" + self.enable_bandwidth_limiting() + + def signal_not_transferring(self): + """Signal that data being read is not being transferred to S3""" + self.disable_bandwidth_limiting() + + def seek(self, where, whence=0): + self._fileobj.seek(where, whence) + + def tell(self): + return self._fileobj.tell() + + def close(self): + if self._bandwidth_limiting_enabled and self._bytes_seen: + # This handles the case where the file is small enough to never + # trigger the threshold and thus is never subjugated to the + # leaky bucket on read(). This specifically happens for small + # uploads. So instead to account for those bytes, have + # it go through the leaky bucket when the file gets closed. + self._consume_through_leaky_bucket() + self._fileobj.close() + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + +class LeakyBucket: + def __init__( + self, + max_rate, + time_utils=None, + rate_tracker=None, + consumption_scheduler=None, + ): + """A leaky bucket abstraction to limit bandwidth consumption + + :type rate: int + :type rate: The maximum rate to allow. This rate is in terms of + bytes per second. + + :type time_utils: TimeUtils + :param time_utils: The time utility to use for interacting with time + + :type rate_tracker: BandwidthRateTracker + :param rate_tracker: Tracks bandwidth consumption + + :type consumption_scheduler: ConsumptionScheduler + :param consumption_scheduler: Schedules consumption retries when + necessary + """ + self._max_rate = float(max_rate) + self._time_utils = time_utils + if time_utils is None: + self._time_utils = TimeUtils() + self._lock = threading.Lock() + self._rate_tracker = rate_tracker + if rate_tracker is None: + self._rate_tracker = BandwidthRateTracker() + self._consumption_scheduler = consumption_scheduler + if consumption_scheduler is None: + self._consumption_scheduler = ConsumptionScheduler() + + def consume(self, amt, request_token): + """Consume an a requested amount + + :type amt: int + :param amt: The amount of bytes to request to consume + + :type request_token: RequestToken + :param request_token: The token associated to the consumption + request that is used to identify the request. So if a + RequestExceededException is raised the token should be used + in subsequent retry consume() request. + + :raises RequestExceededException: If the consumption amount would + exceed the maximum allocated bandwidth + + :rtype: int + :returns: The amount consumed + """ + with self._lock: + time_now = self._time_utils.time() + if self._consumption_scheduler.is_scheduled(request_token): + return self._release_requested_amt_for_scheduled_request( + amt, request_token, time_now + ) + elif self._projected_to_exceed_max_rate(amt, time_now): + self._raise_request_exceeded_exception( + amt, request_token, time_now + ) + else: + return self._release_requested_amt(amt, time_now) + + def _projected_to_exceed_max_rate(self, amt, time_now): + projected_rate = self._rate_tracker.get_projected_rate(amt, time_now) + return projected_rate > self._max_rate + + def _release_requested_amt_for_scheduled_request( + self, amt, request_token, time_now + ): + self._consumption_scheduler.process_scheduled_consumption( + request_token + ) + return self._release_requested_amt(amt, time_now) + + def _raise_request_exceeded_exception(self, amt, request_token, time_now): + allocated_time = amt / float(self._max_rate) + retry_time = self._consumption_scheduler.schedule_consumption( + amt, request_token, allocated_time + ) + raise RequestExceededException( + requested_amt=amt, retry_time=retry_time + ) + + def _release_requested_amt(self, amt, time_now): + self._rate_tracker.record_consumption_rate(amt, time_now) + return amt + + +class ConsumptionScheduler: + def __init__(self): + """Schedules when to consume a desired amount""" + self._tokens_to_scheduled_consumption = {} + self._total_wait = 0 + + def is_scheduled(self, token): + """Indicates if a consumption request has been scheduled + + :type token: RequestToken + :param token: The token associated to the consumption + request that is used to identify the request. + """ + return token in self._tokens_to_scheduled_consumption + + def schedule_consumption(self, amt, token, time_to_consume): + """Schedules a wait time to be able to consume an amount + + :type amt: int + :param amt: The amount of bytes scheduled to be consumed + + :type token: RequestToken + :param token: The token associated to the consumption + request that is used to identify the request. + + :type time_to_consume: float + :param time_to_consume: The desired time it should take for that + specific request amount to be consumed in regardless of previously + scheduled consumption requests + + :rtype: float + :returns: The amount of time to wait for the specific request before + actually consuming the specified amount. + """ + self._total_wait += time_to_consume + self._tokens_to_scheduled_consumption[token] = { + 'wait_duration': self._total_wait, + 'time_to_consume': time_to_consume, + } + return self._total_wait + + def process_scheduled_consumption(self, token): + """Processes a scheduled consumption request that has completed + + :type token: RequestToken + :param token: The token associated to the consumption + request that is used to identify the request. + """ + scheduled_retry = self._tokens_to_scheduled_consumption.pop(token) + self._total_wait = max( + self._total_wait - scheduled_retry['time_to_consume'], 0 + ) + + +class BandwidthRateTracker: + def __init__(self, alpha=0.8): + """Tracks the rate of bandwidth consumption + + :type a: float + :param a: The constant to use in calculating the exponentional moving + average of the bandwidth rate. Specifically it is used in the + following calculation: + + current_rate = alpha * new_rate + (1 - alpha) * current_rate + + This value of this constant should be between 0 and 1. + """ + self._alpha = alpha + self._last_time = None + self._current_rate = None + + @property + def current_rate(self): + """The current transfer rate + + :rtype: float + :returns: The current tracked transfer rate + """ + if self._last_time is None: + return 0.0 + return self._current_rate + + def get_projected_rate(self, amt, time_at_consumption): + """Get the projected rate using a provided amount and time + + :type amt: int + :param amt: The proposed amount to consume + + :type time_at_consumption: float + :param time_at_consumption: The proposed time to consume at + + :rtype: float + :returns: The consumption rate if that amt and time were consumed + """ + if self._last_time is None: + return 0.0 + return self._calculate_exponential_moving_average_rate( + amt, time_at_consumption + ) + + def record_consumption_rate(self, amt, time_at_consumption): + """Record the consumption rate based off amount and time point + + :type amt: int + :param amt: The amount that got consumed + + :type time_at_consumption: float + :param time_at_consumption: The time at which the amount was consumed + """ + if self._last_time is None: + self._last_time = time_at_consumption + self._current_rate = 0.0 + return + self._current_rate = self._calculate_exponential_moving_average_rate( + amt, time_at_consumption + ) + self._last_time = time_at_consumption + + def _calculate_rate(self, amt, time_at_consumption): + time_delta = time_at_consumption - self._last_time + if time_delta <= 0: + # While it is really unlikely to see this in an actual transfer, + # we do not want to be returning back a negative rate or try to + # divide the amount by zero. So instead return back an infinite + # rate as the time delta is infinitesimally small. + return float('inf') + return amt / (time_delta) + + def _calculate_exponential_moving_average_rate( + self, amt, time_at_consumption + ): + new_rate = self._calculate_rate(amt, time_at_consumption) + return self._alpha * new_rate + (1 - self._alpha) * self._current_rate diff --git a/.venv/lib/python3.12/site-packages/s3transfer/compat.py b/.venv/lib/python3.12/site-packages/s3transfer/compat.py new file mode 100644 index 00000000..68267ad0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/compat.py @@ -0,0 +1,94 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import errno +import inspect +import os +import socket +import sys + +from botocore.compat import six + +if sys.platform.startswith('win'): + def rename_file(current_filename, new_filename): + try: + os.remove(new_filename) + except OSError as e: + if not e.errno == errno.ENOENT: + # We only want to a ignore trying to remove + # a file that does not exist. If it fails + # for any other reason we should be propagating + # that exception. + raise + os.rename(current_filename, new_filename) +else: + rename_file = os.rename + + +def accepts_kwargs(func): + return inspect.getfullargspec(func)[2] + + +# In python 3, socket.error is OSError, which is too general +# for what we want (i.e FileNotFoundError is a subclass of OSError). +# In python 3, all the socket related errors are in a newly created +# ConnectionError. +SOCKET_ERROR = ConnectionError +MAXINT = None + + +def seekable(fileobj): + """Backwards compat function to determine if a fileobj is seekable + + :param fileobj: The file-like object to determine if seekable + + :returns: True, if seekable. False, otherwise. + """ + # If the fileobj has a seekable attr, try calling the seekable() + # method on it. + if hasattr(fileobj, 'seekable'): + return fileobj.seekable() + # If there is no seekable attr, check if the object can be seeked + # or telled. If it can, try to seek to the current position. + elif hasattr(fileobj, 'seek') and hasattr(fileobj, 'tell'): + try: + fileobj.seek(0, 1) + return True + except OSError: + # If an io related error was thrown then it is not seekable. + return False + # Else, the fileobj is not seekable + return False + + +def readable(fileobj): + """Determines whether or not a file-like object is readable. + + :param fileobj: The file-like object to determine if readable + + :returns: True, if readable. False otherwise. + """ + if hasattr(fileobj, 'readable'): + return fileobj.readable() + + return hasattr(fileobj, 'read') + + +def fallocate(fileobj, size): + if hasattr(os, 'posix_fallocate'): + os.posix_fallocate(fileobj.fileno(), 0, size) + else: + fileobj.truncate(size) + + +# Import at end of file to avoid circular dependencies +from multiprocessing.managers import BaseManager # noqa: F401,E402 diff --git a/.venv/lib/python3.12/site-packages/s3transfer/constants.py b/.venv/lib/python3.12/site-packages/s3transfer/constants.py new file mode 100644 index 00000000..9a2ec9bb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/constants.py @@ -0,0 +1,38 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import s3transfer + +KB = 1024 +MB = KB * KB +GB = MB * KB + +ALLOWED_DOWNLOAD_ARGS = [ + 'ChecksumMode', + 'VersionId', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'RequestPayer', + 'ExpectedBucketOwner', +] + +FULL_OBJECT_CHECKSUM_ARGS = [ + 'ChecksumCRC32', + 'ChecksumCRC32C', + 'ChecksumCRC64NVME', + 'ChecksumSHA1', + 'ChecksumSHA256', +] + +USER_AGENT = f's3transfer/{s3transfer.__version__}' +PROCESS_USER_AGENT = f'{USER_AGENT} processpool' diff --git a/.venv/lib/python3.12/site-packages/s3transfer/copies.py b/.venv/lib/python3.12/site-packages/s3transfer/copies.py new file mode 100644 index 00000000..c2ae9ce0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/copies.py @@ -0,0 +1,388 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import copy +import math + +from s3transfer.tasks import ( + CompleteMultipartUploadTask, + CreateMultipartUploadTask, + SubmissionTask, + Task, +) +from s3transfer.utils import ( + ChunksizeAdjuster, + calculate_range_parameter, + get_callbacks, + get_filtered_dict, +) + + +class CopySubmissionTask(SubmissionTask): + """Task for submitting tasks to execute a copy""" + + EXTRA_ARGS_TO_HEAD_ARGS_MAPPING = { + 'CopySourceIfMatch': 'IfMatch', + 'CopySourceIfModifiedSince': 'IfModifiedSince', + 'CopySourceIfNoneMatch': 'IfNoneMatch', + 'CopySourceIfUnmodifiedSince': 'IfUnmodifiedSince', + 'CopySourceSSECustomerKey': 'SSECustomerKey', + 'CopySourceSSECustomerAlgorithm': 'SSECustomerAlgorithm', + 'CopySourceSSECustomerKeyMD5': 'SSECustomerKeyMD5', + 'RequestPayer': 'RequestPayer', + 'ExpectedBucketOwner': 'ExpectedBucketOwner', + } + + UPLOAD_PART_COPY_ARGS = [ + 'CopySourceIfMatch', + 'CopySourceIfModifiedSince', + 'CopySourceIfNoneMatch', + 'CopySourceIfUnmodifiedSince', + 'CopySourceSSECustomerKey', + 'CopySourceSSECustomerAlgorithm', + 'CopySourceSSECustomerKeyMD5', + 'SSECustomerKey', + 'SSECustomerAlgorithm', + 'SSECustomerKeyMD5', + 'RequestPayer', + 'ExpectedBucketOwner', + ] + + CREATE_MULTIPART_ARGS_BLACKLIST = [ + 'CopySourceIfMatch', + 'CopySourceIfModifiedSince', + 'CopySourceIfNoneMatch', + 'CopySourceIfUnmodifiedSince', + 'CopySourceSSECustomerKey', + 'CopySourceSSECustomerAlgorithm', + 'CopySourceSSECustomerKeyMD5', + 'MetadataDirective', + 'TaggingDirective', + ] + + COMPLETE_MULTIPART_ARGS = [ + 'SSECustomerKey', + 'SSECustomerAlgorithm', + 'SSECustomerKeyMD5', + 'RequestPayer', + 'ExpectedBucketOwner', + ] + + def _submit( + self, client, config, osutil, request_executor, transfer_future + ): + """ + :param client: The client associated with the transfer manager + + :type config: s3transfer.manager.TransferConfig + :param config: The transfer config associated with the transfer + manager + + :type osutil: s3transfer.utils.OSUtil + :param osutil: The os utility associated to the transfer manager + + :type request_executor: s3transfer.futures.BoundedExecutor + :param request_executor: The request executor associated with the + transfer manager + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future associated with the + transfer request that tasks are being submitted for + """ + # Determine the size if it was not provided + if transfer_future.meta.size is None: + # If a size was not provided figure out the size for the + # user. Note that we will only use the client provided to + # the TransferManager. If the object is outside of the region + # of the client, they may have to provide the file size themselves + # with a completely new client. + call_args = transfer_future.meta.call_args + head_object_request = ( + self._get_head_object_request_from_copy_source( + call_args.copy_source + ) + ) + extra_args = call_args.extra_args + + # Map any values that may be used in the head object that is + # used in the copy object + for param, value in extra_args.items(): + if param in self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING: + head_object_request[ + self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING[param] + ] = value + + response = call_args.source_client.head_object( + **head_object_request + ) + transfer_future.meta.provide_transfer_size( + response['ContentLength'] + ) + + # If it is greater than threshold do a multipart copy, otherwise + # do a regular copy object. + if transfer_future.meta.size < config.multipart_threshold: + self._submit_copy_request( + client, config, osutil, request_executor, transfer_future + ) + else: + self._submit_multipart_request( + client, config, osutil, request_executor, transfer_future + ) + + def _submit_copy_request( + self, client, config, osutil, request_executor, transfer_future + ): + call_args = transfer_future.meta.call_args + + # Get the needed progress callbacks for the task + progress_callbacks = get_callbacks(transfer_future, 'progress') + + # Submit the request of a single copy. + self._transfer_coordinator.submit( + request_executor, + CopyObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'copy_source': call_args.copy_source, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'extra_args': call_args.extra_args, + 'callbacks': progress_callbacks, + 'size': transfer_future.meta.size, + }, + is_final=True, + ), + ) + + def _submit_multipart_request( + self, client, config, osutil, request_executor, transfer_future + ): + call_args = transfer_future.meta.call_args + + # Submit the request to create a multipart upload and make sure it + # does not include any of the arguments used for copy part. + create_multipart_extra_args = {} + for param, val in call_args.extra_args.items(): + if param not in self.CREATE_MULTIPART_ARGS_BLACKLIST: + create_multipart_extra_args[param] = val + + create_multipart_future = self._transfer_coordinator.submit( + request_executor, + CreateMultipartUploadTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'extra_args': create_multipart_extra_args, + }, + ), + ) + + # Determine how many parts are needed based on filesize and + # desired chunksize. + part_size = config.multipart_chunksize + adjuster = ChunksizeAdjuster() + part_size = adjuster.adjust_chunksize( + part_size, transfer_future.meta.size + ) + num_parts = int( + math.ceil(transfer_future.meta.size / float(part_size)) + ) + + # Submit requests to upload the parts of the file. + part_futures = [] + progress_callbacks = get_callbacks(transfer_future, 'progress') + + for part_number in range(1, num_parts + 1): + extra_part_args = self._extra_upload_part_args( + call_args.extra_args + ) + # The part number for upload part starts at 1 while the + # range parameter starts at zero, so just subtract 1 off of + # the part number + extra_part_args['CopySourceRange'] = calculate_range_parameter( + part_size, + part_number - 1, + num_parts, + transfer_future.meta.size, + ) + # Get the size of the part copy as well for the progress + # callbacks. + size = self._get_transfer_size( + part_size, + part_number - 1, + num_parts, + transfer_future.meta.size, + ) + # Get the checksum algorithm of the multipart request. + checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm") + part_futures.append( + self._transfer_coordinator.submit( + request_executor, + CopyPartTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'copy_source': call_args.copy_source, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'part_number': part_number, + 'extra_args': extra_part_args, + 'callbacks': progress_callbacks, + 'size': size, + 'checksum_algorithm': checksum_algorithm, + }, + pending_main_kwargs={ + 'upload_id': create_multipart_future + }, + ), + ) + ) + + complete_multipart_extra_args = self._extra_complete_multipart_args( + call_args.extra_args + ) + # Submit the request to complete the multipart upload. + self._transfer_coordinator.submit( + request_executor, + CompleteMultipartUploadTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'extra_args': complete_multipart_extra_args, + }, + pending_main_kwargs={ + 'upload_id': create_multipart_future, + 'parts': part_futures, + }, + is_final=True, + ), + ) + + def _get_head_object_request_from_copy_source(self, copy_source): + if isinstance(copy_source, dict): + return copy.copy(copy_source) + else: + raise TypeError( + 'Expecting dictionary formatted: ' + '{"Bucket": bucket_name, "Key": key} ' + f'but got {copy_source} or type {type(copy_source)}.' + ) + + def _extra_upload_part_args(self, extra_args): + # Only the args in COPY_PART_ARGS actually need to be passed + # onto the upload_part_copy calls. + return get_filtered_dict(extra_args, self.UPLOAD_PART_COPY_ARGS) + + def _extra_complete_multipart_args(self, extra_args): + return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS) + + def _get_transfer_size( + self, part_size, part_index, num_parts, total_transfer_size + ): + if part_index == num_parts - 1: + # The last part may be different in size then the rest of the + # parts. + return total_transfer_size - (part_index * part_size) + return part_size + + +class CopyObjectTask(Task): + """Task to do a nonmultipart copy""" + + def _main( + self, client, copy_source, bucket, key, extra_args, callbacks, size + ): + """ + :param client: The client to use when calling PutObject + :param copy_source: The CopySource parameter to use + :param bucket: The name of the bucket to copy to + :param key: The name of the key to copy to + :param extra_args: A dictionary of any extra arguments that may be + used in the upload. + :param callbacks: List of callbacks to call after copy + :param size: The size of the transfer. This value is passed into + the callbacks + + """ + client.copy_object( + CopySource=copy_source, Bucket=bucket, Key=key, **extra_args + ) + for callback in callbacks: + callback(bytes_transferred=size) + + +class CopyPartTask(Task): + """Task to upload a part in a multipart copy""" + + def _main( + self, + client, + copy_source, + bucket, + key, + upload_id, + part_number, + extra_args, + callbacks, + size, + checksum_algorithm=None, + ): + """ + :param client: The client to use when calling PutObject + :param copy_source: The CopySource parameter to use + :param bucket: The name of the bucket to upload to + :param key: The name of the key to upload to + :param upload_id: The id of the upload + :param part_number: The number representing the part of the multipart + upload + :param extra_args: A dictionary of any extra arguments that may be + used in the upload. + :param callbacks: List of callbacks to call after copy part + :param size: The size of the transfer. This value is passed into + the callbacks + :param checksum_algorithm: The algorithm that was used to create the multipart + upload + + :rtype: dict + :returns: A dictionary representing a part:: + + {'Etag': etag_value, 'PartNumber': part_number} + + This value can be appended to a list to be used to complete + the multipart upload. If a checksum is in the response, + it will also be included. + """ + response = client.upload_part_copy( + CopySource=copy_source, + Bucket=bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + **extra_args, + ) + for callback in callbacks: + callback(bytes_transferred=size) + etag = response['CopyPartResult']['ETag'] + part_metadata = {'ETag': etag, 'PartNumber': part_number} + if checksum_algorithm: + checksum_member = f'Checksum{checksum_algorithm.upper()}' + if checksum_member in response['CopyPartResult']: + part_metadata[checksum_member] = response['CopyPartResult'][ + checksum_member + ] + return part_metadata diff --git a/.venv/lib/python3.12/site-packages/s3transfer/crt.py b/.venv/lib/python3.12/site-packages/s3transfer/crt.py new file mode 100644 index 00000000..b11c3682 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/crt.py @@ -0,0 +1,991 @@ +# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import logging +import re +import threading +from io import BytesIO + +import awscrt.http +import awscrt.s3 +import botocore.awsrequest +import botocore.session +from awscrt.auth import ( + AwsCredentials, + AwsCredentialsProvider, + AwsSigningAlgorithm, + AwsSigningConfig, +) +from awscrt.io import ( + ClientBootstrap, + ClientTlsContext, + DefaultHostResolver, + EventLoopGroup, + TlsContextOptions, +) +from awscrt.s3 import S3Client, S3RequestTlsMode, S3RequestType +from botocore import UNSIGNED +from botocore.compat import urlsplit +from botocore.config import Config +from botocore.exceptions import NoCredentialsError +from botocore.utils import ArnParser, InvalidArnException + +from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS, MB +from s3transfer.exceptions import TransferNotDoneError +from s3transfer.futures import BaseTransferFuture, BaseTransferMeta +from s3transfer.manager import TransferManager +from s3transfer.utils import ( + CallArgs, + OSUtils, + get_callbacks, + is_s3express_bucket, +) + +logger = logging.getLogger(__name__) + +CRT_S3_PROCESS_LOCK = None + + +def acquire_crt_s3_process_lock(name): + # Currently, the CRT S3 client performs best when there is only one + # instance of it running on a host. This lock allows an application to + # signal across processes whether there is another process of the same + # application using the CRT S3 client and prevent spawning more than one + # CRT S3 clients running on the system for that application. + # + # NOTE: When acquiring the CRT process lock, the lock automatically is + # released when the lock object is garbage collected. So, the CRT process + # lock is set as a global so that it is not unintentionally garbage + # collected/released if reference of the lock is lost. + global CRT_S3_PROCESS_LOCK + if CRT_S3_PROCESS_LOCK is None: + crt_lock = awscrt.s3.CrossProcessLock(name) + try: + crt_lock.acquire() + except RuntimeError: + # If there is another process that is holding the lock, the CRT + # returns a RuntimeError. We return None here to signal that our + # current process was not able to acquire the lock. + return None + CRT_S3_PROCESS_LOCK = crt_lock + return CRT_S3_PROCESS_LOCK + + +def create_s3_crt_client( + region, + crt_credentials_provider=None, + num_threads=None, + target_throughput=None, + part_size=8 * MB, + use_ssl=True, + verify=None, +): + """ + :type region: str + :param region: The region used for signing + + :type crt_credentials_provider: + Optional[awscrt.auth.AwsCredentialsProvider] + :param crt_credentials_provider: CRT AWS credentials provider + to use to sign requests. If not set, requests will not be signed. + + :type num_threads: Optional[int] + :param num_threads: Number of worker threads generated. Default + is the number of processors in the machine. + + :type target_throughput: Optional[int] + :param target_throughput: Throughput target in bytes per second. + By default, CRT will automatically attempt to choose a target + throughput that matches the system's maximum network throughput. + Currently, if CRT is unable to determine the maximum network + throughput, a fallback target throughput of ``1_250_000_000`` bytes + per second (which translates to 10 gigabits per second, or 1.16 + gibibytes per second) is used. To set a specific target + throughput, set a value for this parameter. + + :type part_size: Optional[int] + :param part_size: Size, in Bytes, of parts that files will be downloaded + or uploaded in. + + :type use_ssl: boolean + :param use_ssl: Whether or not to use SSL. By default, SSL is used. + Note that not all services support non-ssl connections. + + :type verify: Optional[boolean/string] + :param verify: Whether or not to verify SSL certificates. + By default SSL certificates are verified. You can provide the + following values: + + * False - do not validate SSL certificates. SSL will still be + used (unless use_ssl is False), but SSL certificates + will not be verified. + * path/to/cert/bundle.pem - A filename of the CA cert bundle to + use. Specify this argument if you want to use a custom CA cert + bundle instead of the default one on your system. + """ + event_loop_group = EventLoopGroup(num_threads) + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + tls_connection_options = None + + tls_mode = ( + S3RequestTlsMode.ENABLED if use_ssl else S3RequestTlsMode.DISABLED + ) + if verify is not None: + tls_ctx_options = TlsContextOptions() + if verify: + tls_ctx_options.override_default_trust_store_from_path( + ca_filepath=verify + ) + else: + tls_ctx_options.verify_peer = False + client_tls_option = ClientTlsContext(tls_ctx_options) + tls_connection_options = client_tls_option.new_connection_options() + target_gbps = _get_crt_throughput_target_gbps( + provided_throughput_target_bytes=target_throughput + ) + return S3Client( + bootstrap=bootstrap, + region=region, + credential_provider=crt_credentials_provider, + part_size=part_size, + tls_mode=tls_mode, + tls_connection_options=tls_connection_options, + throughput_target_gbps=target_gbps, + enable_s3express=True, + ) + + +def _get_crt_throughput_target_gbps(provided_throughput_target_bytes=None): + if provided_throughput_target_bytes is None: + target_gbps = awscrt.s3.get_recommended_throughput_target_gbps() + logger.debug( + 'Recommended CRT throughput target in gbps: %s', target_gbps + ) + if target_gbps is None: + target_gbps = 10.0 + else: + # NOTE: The GB constant in s3transfer is technically a gibibyte. The + # GB constant is not used here because the CRT interprets gigabits + # for networking as a base power of 10 + # (i.e. 1000 ** 3 instead of 1024 ** 3). + target_gbps = provided_throughput_target_bytes * 8 / 1_000_000_000 + logger.debug('Using CRT throughput target in gbps: %s', target_gbps) + return target_gbps + + +class CRTTransferManager: + ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS + ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS + ALLOWED_DELETE_ARGS = TransferManager.ALLOWED_DELETE_ARGS + + VALIDATE_SUPPORTED_BUCKET_VALUES = True + + _UNSUPPORTED_BUCKET_PATTERNS = TransferManager._UNSUPPORTED_BUCKET_PATTERNS + + def __init__(self, crt_s3_client, crt_request_serializer, osutil=None): + """A transfer manager interface for Amazon S3 on CRT s3 client. + + :type crt_s3_client: awscrt.s3.S3Client + :param crt_s3_client: The CRT s3 client, handling all the + HTTP requests and functions under then hood + + :type crt_request_serializer: s3transfer.crt.BaseCRTRequestSerializer + :param crt_request_serializer: Serializer, generates unsigned crt HTTP + request. + + :type osutil: s3transfer.utils.OSUtils + :param osutil: OSUtils object to use for os-related behavior when + using with transfer manager. + """ + if osutil is None: + self._osutil = OSUtils() + self._crt_s3_client = crt_s3_client + self._s3_args_creator = S3ClientArgsCreator( + crt_request_serializer, self._osutil + ) + self._crt_exception_translator = ( + crt_request_serializer.translate_crt_exception + ) + self._future_coordinators = [] + self._semaphore = threading.Semaphore(128) # not configurable + # A counter to create unique id's for each transfer submitted. + self._id_counter = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, *args): + cancel = False + if exc_type: + cancel = True + self._shutdown(cancel) + + def download( + self, bucket, key, fileobj, extra_args=None, subscribers=None + ): + if extra_args is None: + extra_args = {} + if subscribers is None: + subscribers = {} + self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) + self._validate_if_bucket_supported(bucket) + callargs = CallArgs( + bucket=bucket, + key=key, + fileobj=fileobj, + extra_args=extra_args, + subscribers=subscribers, + ) + return self._submit_transfer("get_object", callargs) + + def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): + if extra_args is None: + extra_args = {} + if subscribers is None: + subscribers = {} + self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) + self._validate_if_bucket_supported(bucket) + self._validate_checksum_algorithm_supported(extra_args) + callargs = CallArgs( + bucket=bucket, + key=key, + fileobj=fileobj, + extra_args=extra_args, + subscribers=subscribers, + ) + return self._submit_transfer("put_object", callargs) + + def delete(self, bucket, key, extra_args=None, subscribers=None): + if extra_args is None: + extra_args = {} + if subscribers is None: + subscribers = {} + self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS) + self._validate_if_bucket_supported(bucket) + callargs = CallArgs( + bucket=bucket, + key=key, + extra_args=extra_args, + subscribers=subscribers, + ) + return self._submit_transfer("delete_object", callargs) + + def shutdown(self, cancel=False): + self._shutdown(cancel) + + def _validate_if_bucket_supported(self, bucket): + # s3 high level operations don't support some resources + # (eg. S3 Object Lambda) only direct API calls are available + # for such resources + if self.VALIDATE_SUPPORTED_BUCKET_VALUES: + for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items(): + match = pattern.match(bucket) + if match: + raise ValueError( + f'TransferManager methods do not support {resource} ' + 'resource. Use direct client calls instead.' + ) + + def _validate_all_known_args(self, actual, allowed): + for kwarg in actual: + if kwarg not in allowed: + raise ValueError( + f"Invalid extra_args key '{kwarg}', " + f"must be one of: {', '.join(allowed)}" + ) + + def _validate_checksum_algorithm_supported(self, extra_args): + checksum_algorithm = extra_args.get('ChecksumAlgorithm') + if checksum_algorithm is None: + return + supported_algorithms = list(awscrt.s3.S3ChecksumAlgorithm.__members__) + if checksum_algorithm.upper() not in supported_algorithms: + raise ValueError( + f'ChecksumAlgorithm: {checksum_algorithm} not supported. ' + f'Supported algorithms are: {supported_algorithms}' + ) + + def _cancel_transfers(self): + for coordinator in self._future_coordinators: + if not coordinator.done(): + coordinator.cancel() + + def _finish_transfers(self): + for coordinator in self._future_coordinators: + coordinator.result() + + def _wait_transfers_done(self): + for coordinator in self._future_coordinators: + coordinator.wait_until_on_done_callbacks_complete() + + def _shutdown(self, cancel=False): + if cancel: + self._cancel_transfers() + try: + self._finish_transfers() + + except KeyboardInterrupt: + self._cancel_transfers() + except Exception: + pass + finally: + self._wait_transfers_done() + + def _release_semaphore(self, **kwargs): + self._semaphore.release() + + def _submit_transfer(self, request_type, call_args): + on_done_after_calls = [self._release_semaphore] + coordinator = CRTTransferCoordinator( + transfer_id=self._id_counter, + exception_translator=self._crt_exception_translator, + ) + components = { + 'meta': CRTTransferMeta(self._id_counter, call_args), + 'coordinator': coordinator, + } + future = CRTTransferFuture(**components) + afterdone = AfterDoneHandler(coordinator) + on_done_after_calls.append(afterdone) + + try: + self._semaphore.acquire() + on_queued = self._s3_args_creator.get_crt_callback( + future, 'queued' + ) + on_queued() + crt_callargs = self._s3_args_creator.get_make_request_args( + request_type, + call_args, + coordinator, + future, + on_done_after_calls, + ) + crt_s3_request = self._crt_s3_client.make_request(**crt_callargs) + except Exception as e: + coordinator.set_exception(e, True) + on_done = self._s3_args_creator.get_crt_callback( + future, 'done', after_subscribers=on_done_after_calls + ) + on_done(error=e) + else: + coordinator.set_s3_request(crt_s3_request) + self._future_coordinators.append(coordinator) + + self._id_counter += 1 + return future + + +class CRTTransferMeta(BaseTransferMeta): + """Holds metadata about the CRTTransferFuture""" + + def __init__(self, transfer_id=None, call_args=None): + self._transfer_id = transfer_id + self._call_args = call_args + self._user_context = {} + + @property + def call_args(self): + return self._call_args + + @property + def transfer_id(self): + return self._transfer_id + + @property + def user_context(self): + return self._user_context + + +class CRTTransferFuture(BaseTransferFuture): + def __init__(self, meta=None, coordinator=None): + """The future associated to a submitted transfer request via CRT S3 client + + :type meta: s3transfer.crt.CRTTransferMeta + :param meta: The metadata associated to the transfer future. + + :type coordinator: s3transfer.crt.CRTTransferCoordinator + :param coordinator: The coordinator associated to the transfer future. + """ + self._meta = meta + if meta is None: + self._meta = CRTTransferMeta() + self._coordinator = coordinator + + @property + def meta(self): + return self._meta + + def done(self): + return self._coordinator.done() + + def result(self, timeout=None): + self._coordinator.result(timeout) + + def cancel(self): + self._coordinator.cancel() + + def set_exception(self, exception): + """Sets the exception on the future.""" + if not self.done(): + raise TransferNotDoneError( + 'set_exception can only be called once the transfer is ' + 'complete.' + ) + self._coordinator.set_exception(exception, override=True) + + +class BaseCRTRequestSerializer: + def serialize_http_request(self, transfer_type, future): + """Serialize CRT HTTP requests. + + :type transfer_type: string + :param transfer_type: the type of transfer made, + e.g 'put_object', 'get_object', 'delete_object' + + :type future: s3transfer.crt.CRTTransferFuture + + :rtype: awscrt.http.HttpRequest + :returns: An unsigned HTTP request to be used for the CRT S3 client + """ + raise NotImplementedError('serialize_http_request()') + + def translate_crt_exception(self, exception): + raise NotImplementedError('translate_crt_exception()') + + +class BotocoreCRTRequestSerializer(BaseCRTRequestSerializer): + def __init__(self, session, client_kwargs=None): + """Serialize CRT HTTP request using botocore logic + It also takes into account configuration from both the session + and any keyword arguments that could be passed to + `Session.create_client()` when serializing the request. + + :type session: botocore.session.Session + + :type client_kwargs: Optional[Dict[str, str]]) + :param client_kwargs: The kwargs for the botocore + s3 client initialization. + """ + self._session = session + if client_kwargs is None: + client_kwargs = {} + self._resolve_client_config(session, client_kwargs) + self._client = session.create_client(**client_kwargs) + self._client.meta.events.register( + 'request-created.s3.*', self._capture_http_request + ) + self._client.meta.events.register( + 'after-call.s3.*', self._change_response_to_serialized_http_request + ) + self._client.meta.events.register( + 'before-send.s3.*', self._make_fake_http_response + ) + self._client.meta.events.register( + 'before-call.s3.*', self._remove_checksum_context + ) + + def _resolve_client_config(self, session, client_kwargs): + user_provided_config = None + if session.get_default_client_config(): + user_provided_config = session.get_default_client_config() + if 'config' in client_kwargs: + user_provided_config = client_kwargs['config'] + + client_config = Config(signature_version=UNSIGNED) + if user_provided_config: + client_config = user_provided_config.merge(client_config) + client_kwargs['config'] = client_config + client_kwargs["service_name"] = "s3" + + def _crt_request_from_aws_request(self, aws_request): + url_parts = urlsplit(aws_request.url) + crt_path = url_parts.path + if url_parts.query: + crt_path = f'{crt_path}?{url_parts.query}' + headers_list = [] + for name, value in aws_request.headers.items(): + if isinstance(value, str): + headers_list.append((name, value)) + else: + headers_list.append((name, str(value, 'utf-8'))) + + crt_headers = awscrt.http.HttpHeaders(headers_list) + + crt_request = awscrt.http.HttpRequest( + method=aws_request.method, + path=crt_path, + headers=crt_headers, + body_stream=aws_request.body, + ) + return crt_request + + def _convert_to_crt_http_request(self, botocore_http_request): + # Logic that does CRTUtils.crt_request_from_aws_request + crt_request = self._crt_request_from_aws_request(botocore_http_request) + if crt_request.headers.get("host") is None: + # If host is not set, set it for the request before using CRT s3 + url_parts = urlsplit(botocore_http_request.url) + crt_request.headers.set("host", url_parts.netloc) + if crt_request.headers.get('Content-MD5') is not None: + crt_request.headers.remove("Content-MD5") + + # In general, the CRT S3 client expects a content length header. It + # only expects a missing content length header if the body is not + # seekable. However, botocore does not set the content length header + # for GetObject API requests and so we set the content length to zero + # to meet the CRT S3 client's expectation that the content length + # header is set even if there is no body. + if crt_request.headers.get('Content-Length') is None: + if botocore_http_request.body is None: + crt_request.headers.add('Content-Length', "0") + + # Botocore sets the Transfer-Encoding header when it cannot determine + # the content length of the request body (e.g. it's not seekable). + # However, CRT does not support this header, but it supports + # non-seekable bodies. So we remove this header to not cause issues + # in the downstream CRT S3 request. + if crt_request.headers.get('Transfer-Encoding') is not None: + crt_request.headers.remove('Transfer-Encoding') + + return crt_request + + def _capture_http_request(self, request, **kwargs): + request.context['http_request'] = request + + def _change_response_to_serialized_http_request( + self, context, parsed, **kwargs + ): + request = context['http_request'] + parsed['HTTPRequest'] = request.prepare() + + def _make_fake_http_response(self, request, **kwargs): + return botocore.awsrequest.AWSResponse( + None, + 200, + {}, + FakeRawResponse(b""), + ) + + def _get_botocore_http_request(self, client_method, call_args): + return getattr(self._client, client_method)( + Bucket=call_args.bucket, Key=call_args.key, **call_args.extra_args + )['HTTPRequest'] + + def serialize_http_request(self, transfer_type, future): + botocore_http_request = self._get_botocore_http_request( + transfer_type, future.meta.call_args + ) + crt_request = self._convert_to_crt_http_request(botocore_http_request) + return crt_request + + def translate_crt_exception(self, exception): + if isinstance(exception, awscrt.s3.S3ResponseError): + return self._translate_crt_s3_response_error(exception) + else: + return None + + def _translate_crt_s3_response_error(self, s3_response_error): + status_code = s3_response_error.status_code + if status_code < 301: + # Botocore's exception parsing only + # runs on status codes >= 301 + return None + + headers = {k: v for k, v in s3_response_error.headers} + operation_name = s3_response_error.operation_name + if operation_name is not None: + service_model = self._client.meta.service_model + shape = service_model.operation_model(operation_name).output_shape + else: + shape = None + + response_dict = { + 'headers': botocore.awsrequest.HeadersDict(headers), + 'status_code': status_code, + 'body': s3_response_error.body, + } + parsed_response = self._client._response_parser.parse( + response_dict, shape=shape + ) + + error_code = parsed_response.get("Error", {}).get("Code") + error_class = self._client.exceptions.from_code(error_code) + return error_class(parsed_response, operation_name=operation_name) + + def _remove_checksum_context(self, params, **kwargs): + request_context = params.get("context", {}) + if "checksum" in request_context: + del request_context["checksum"] + + +class FakeRawResponse(BytesIO): + def stream(self, amt=1024, decode_content=None): + while True: + chunk = self.read(amt) + if not chunk: + break + yield chunk + + +class BotocoreCRTCredentialsWrapper: + def __init__(self, resolved_botocore_credentials): + self._resolved_credentials = resolved_botocore_credentials + + def __call__(self): + credentials = self._get_credentials().get_frozen_credentials() + return AwsCredentials( + credentials.access_key, credentials.secret_key, credentials.token + ) + + def to_crt_credentials_provider(self): + return AwsCredentialsProvider.new_delegate(self) + + def _get_credentials(self): + if self._resolved_credentials is None: + raise NoCredentialsError() + return self._resolved_credentials + + +class CRTTransferCoordinator: + """A helper class for managing CRTTransferFuture""" + + def __init__( + self, transfer_id=None, s3_request=None, exception_translator=None + ): + self.transfer_id = transfer_id + self._exception_translator = exception_translator + self._s3_request = s3_request + self._lock = threading.Lock() + self._exception = None + self._crt_future = None + self._done_event = threading.Event() + + @property + def s3_request(self): + return self._s3_request + + def set_done_callbacks_complete(self): + self._done_event.set() + + def wait_until_on_done_callbacks_complete(self, timeout=None): + self._done_event.wait(timeout) + + def set_exception(self, exception, override=False): + with self._lock: + if not self.done() or override: + self._exception = exception + + def cancel(self): + if self._s3_request: + self._s3_request.cancel() + + def result(self, timeout=None): + if self._exception: + raise self._exception + try: + self._crt_future.result(timeout) + except KeyboardInterrupt: + self.cancel() + self._crt_future.result(timeout) + raise + except Exception as e: + self.handle_exception(e) + finally: + if self._s3_request: + self._s3_request = None + + def handle_exception(self, exc): + translated_exc = None + if self._exception_translator: + try: + translated_exc = self._exception_translator(exc) + except Exception as e: + # Bail out if we hit an issue translating + # and raise the original error. + logger.debug("Unable to translate exception.", exc_info=e) + pass + if translated_exc is not None: + raise translated_exc from exc + else: + raise exc + + def done(self): + if self._crt_future is None: + return False + return self._crt_future.done() + + def set_s3_request(self, s3_request): + self._s3_request = s3_request + self._crt_future = self._s3_request.finished_future + + +class S3ClientArgsCreator: + def __init__(self, crt_request_serializer, os_utils): + self._request_serializer = crt_request_serializer + self._os_utils = os_utils + + def get_make_request_args( + self, request_type, call_args, coordinator, future, on_done_after_calls + ): + request_args_handler = getattr( + self, + f'_get_make_request_args_{request_type}', + self._default_get_make_request_args, + ) + return request_args_handler( + request_type=request_type, + call_args=call_args, + coordinator=coordinator, + future=future, + on_done_before_calls=[], + on_done_after_calls=on_done_after_calls, + ) + + def get_crt_callback( + self, + future, + callback_type, + before_subscribers=None, + after_subscribers=None, + ): + def invoke_all_callbacks(*args, **kwargs): + callbacks_list = [] + if before_subscribers is not None: + callbacks_list += before_subscribers + callbacks_list += get_callbacks(future, callback_type) + if after_subscribers is not None: + callbacks_list += after_subscribers + for callback in callbacks_list: + # The get_callbacks helper will set the first augment + # by keyword, the other augments need to be set by keyword + # as well + if callback_type == "progress": + callback(bytes_transferred=args[0]) + else: + callback(*args, **kwargs) + + return invoke_all_callbacks + + def _get_make_request_args_put_object( + self, + request_type, + call_args, + coordinator, + future, + on_done_before_calls, + on_done_after_calls, + ): + send_filepath = None + if isinstance(call_args.fileobj, str): + send_filepath = call_args.fileobj + data_len = self._os_utils.get_file_size(send_filepath) + call_args.extra_args["ContentLength"] = data_len + else: + call_args.extra_args["Body"] = call_args.fileobj + + checksum_config = None + if not any( + checksum_arg in call_args.extra_args + for checksum_arg in FULL_OBJECT_CHECKSUM_ARGS + ): + checksum_algorithm = call_args.extra_args.pop( + 'ChecksumAlgorithm', 'CRC32' + ).upper() + checksum_config = awscrt.s3.S3ChecksumConfig( + algorithm=awscrt.s3.S3ChecksumAlgorithm[checksum_algorithm], + location=awscrt.s3.S3ChecksumLocation.TRAILER, + ) + # Suppress botocore's automatic MD5 calculation by setting an override + # value that will get deleted in the BotocoreCRTRequestSerializer. + # As part of the CRT S3 request, we request the CRT S3 client to + # automatically add trailing checksums to its uploads. + call_args.extra_args["ContentMD5"] = "override-to-be-removed" + + make_request_args = self._default_get_make_request_args( + request_type=request_type, + call_args=call_args, + coordinator=coordinator, + future=future, + on_done_before_calls=on_done_before_calls, + on_done_after_calls=on_done_after_calls, + ) + make_request_args['send_filepath'] = send_filepath + make_request_args['checksum_config'] = checksum_config + return make_request_args + + def _get_make_request_args_get_object( + self, + request_type, + call_args, + coordinator, + future, + on_done_before_calls, + on_done_after_calls, + ): + recv_filepath = None + on_body = None + checksum_config = awscrt.s3.S3ChecksumConfig(validate_response=True) + if isinstance(call_args.fileobj, str): + final_filepath = call_args.fileobj + recv_filepath = self._os_utils.get_temp_filename(final_filepath) + on_done_before_calls.append( + RenameTempFileHandler( + coordinator, final_filepath, recv_filepath, self._os_utils + ) + ) + else: + on_body = OnBodyFileObjWriter(call_args.fileobj) + + make_request_args = self._default_get_make_request_args( + request_type=request_type, + call_args=call_args, + coordinator=coordinator, + future=future, + on_done_before_calls=on_done_before_calls, + on_done_after_calls=on_done_after_calls, + ) + make_request_args['recv_filepath'] = recv_filepath + make_request_args['on_body'] = on_body + make_request_args['checksum_config'] = checksum_config + return make_request_args + + def _default_get_make_request_args( + self, + request_type, + call_args, + coordinator, + future, + on_done_before_calls, + on_done_after_calls, + ): + make_request_args = { + 'request': self._request_serializer.serialize_http_request( + request_type, future + ), + 'type': getattr( + S3RequestType, request_type.upper(), S3RequestType.DEFAULT + ), + 'on_done': self.get_crt_callback( + future, 'done', on_done_before_calls, on_done_after_calls + ), + 'on_progress': self.get_crt_callback(future, 'progress'), + } + + # For DEFAULT requests, CRT requires the official S3 operation name. + # So transform string like "delete_object" -> "DeleteObject". + if make_request_args['type'] == S3RequestType.DEFAULT: + make_request_args['operation_name'] = ''.join( + x.title() for x in request_type.split('_') + ) + + arn_handler = _S3ArnParamHandler() + if ( + accesspoint_arn_details := arn_handler.handle_arn(call_args.bucket) + ) and accesspoint_arn_details['region'] == "": + # Configure our region to `*` to propogate in `x-amz-region-set` + # for multi-region support in MRAP accesspoints. + # use_double_uri_encode and should_normalize_uri_path are defaulted to be True + # But SDK already encoded the URI, and it's for S3, so set both to False + make_request_args['signing_config'] = AwsSigningConfig( + algorithm=AwsSigningAlgorithm.V4_ASYMMETRIC, + region="*", + use_double_uri_encode=False, + should_normalize_uri_path=False, + ) + call_args.bucket = accesspoint_arn_details['resource_name'] + elif is_s3express_bucket(call_args.bucket): + # use_double_uri_encode and should_normalize_uri_path are defaulted to be True + # But SDK already encoded the URI, and it's for S3, so set both to False + make_request_args['signing_config'] = AwsSigningConfig( + algorithm=AwsSigningAlgorithm.V4_S3EXPRESS, + use_double_uri_encode=False, + should_normalize_uri_path=False, + ) + return make_request_args + + +class RenameTempFileHandler: + def __init__(self, coordinator, final_filename, temp_filename, osutil): + self._coordinator = coordinator + self._final_filename = final_filename + self._temp_filename = temp_filename + self._osutil = osutil + + def __call__(self, **kwargs): + error = kwargs['error'] + if error: + self._osutil.remove_file(self._temp_filename) + else: + try: + self._osutil.rename_file( + self._temp_filename, self._final_filename + ) + except Exception as e: + self._osutil.remove_file(self._temp_filename) + # the CRT future has done already at this point + self._coordinator.set_exception(e) + + +class AfterDoneHandler: + def __init__(self, coordinator): + self._coordinator = coordinator + + def __call__(self, **kwargs): + self._coordinator.set_done_callbacks_complete() + + +class OnBodyFileObjWriter: + def __init__(self, fileobj): + self._fileobj = fileobj + + def __call__(self, chunk, **kwargs): + self._fileobj.write(chunk) + + +class _S3ArnParamHandler: + """Partial port of S3ArnParamHandler from botocore. + + This is used to make a determination on MRAP accesspoints for signing + purposes. This should be safe to remove once we properly integrate auth + resolution from Botocore into the CRT transfer integration. + """ + + _RESOURCE_REGEX = re.compile( + r'^(?P<resource_type>accesspoint|outpost)[/:](?P<resource_name>.+)$' + ) + + def __init__(self): + self._arn_parser = ArnParser() + + def handle_arn(self, bucket): + arn_details = self._get_arn_details_from_bucket(bucket) + if arn_details is None: + return + if arn_details['resource_type'] == 'accesspoint': + return arn_details + + def _get_arn_details_from_bucket(self, bucket): + try: + arn_details = self._arn_parser.parse_arn(bucket) + self._add_resource_type_and_name(arn_details) + return arn_details + except InvalidArnException: + pass + return None + + def _add_resource_type_and_name(self, arn_details): + match = self._RESOURCE_REGEX.match(arn_details['resource']) + if match: + arn_details['resource_type'] = match.group('resource_type') + arn_details['resource_name'] = match.group('resource_name') diff --git a/.venv/lib/python3.12/site-packages/s3transfer/delete.py b/.venv/lib/python3.12/site-packages/s3transfer/delete.py new file mode 100644 index 00000000..74084d31 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/delete.py @@ -0,0 +1,71 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from s3transfer.tasks import SubmissionTask, Task + + +class DeleteSubmissionTask(SubmissionTask): + """Task for submitting tasks to execute an object deletion.""" + + def _submit(self, client, request_executor, transfer_future, **kwargs): + """ + :param client: The client associated with the transfer manager + + :type config: s3transfer.manager.TransferConfig + :param config: The transfer config associated with the transfer + manager + + :type osutil: s3transfer.utils.OSUtil + :param osutil: The os utility associated to the transfer manager + + :type request_executor: s3transfer.futures.BoundedExecutor + :param request_executor: The request executor associated with the + transfer manager + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future associated with the + transfer request that tasks are being submitted for + """ + call_args = transfer_future.meta.call_args + + self._transfer_coordinator.submit( + request_executor, + DeleteObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'extra_args': call_args.extra_args, + }, + is_final=True, + ), + ) + + +class DeleteObjectTask(Task): + def _main(self, client, bucket, key, extra_args): + """ + + :param client: The S3 client to use when calling DeleteObject + + :type bucket: str + :param bucket: The name of the bucket. + + :type key: str + :param key: The name of the object to delete. + + :type extra_args: dict + :param extra_args: Extra arguments to pass to the DeleteObject call. + + """ + client.delete_object(Bucket=bucket, Key=key, **extra_args) diff --git a/.venv/lib/python3.12/site-packages/s3transfer/download.py b/.venv/lib/python3.12/site-packages/s3transfer/download.py new file mode 100644 index 00000000..e1a9cf0e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/download.py @@ -0,0 +1,788 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import heapq +import logging +import threading + +from s3transfer.compat import seekable +from s3transfer.exceptions import RetriesExceededError +from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG +from s3transfer.tasks import SubmissionTask, Task +from s3transfer.utils import ( + S3_RETRYABLE_DOWNLOAD_ERRORS, + CountCallbackInvoker, + DeferredOpenFile, + FunctionContainer, + StreamReaderProgress, + calculate_num_parts, + calculate_range_parameter, + get_callbacks, + invoke_progress_callbacks, +) + +logger = logging.getLogger(__name__) + + +class DownloadOutputManager: + """Base manager class for handling various types of files for downloads + + This class is typically used for the DownloadSubmissionTask class to help + determine the following: + + * Provides the fileobj to write to downloads to + * Get a task to complete once everything downloaded has been written + + The answers/implementations differ for the various types of file outputs + that may be accepted. All implementations must subclass and override + public methods from this class. + """ + + def __init__(self, osutil, transfer_coordinator, io_executor): + self._osutil = osutil + self._transfer_coordinator = transfer_coordinator + self._io_executor = io_executor + + @classmethod + def is_compatible(cls, download_target, osutil): + """Determines if the target for the download is compatible with manager + + :param download_target: The target for which the upload will write + data to. + + :param osutil: The os utility to be used for the transfer + + :returns: True if the manager can handle the type of target specified + otherwise returns False. + """ + raise NotImplementedError('must implement is_compatible()') + + def get_download_task_tag(self): + """Get the tag (if any) to associate all GetObjectTasks + + :rtype: s3transfer.futures.TaskTag + :returns: The tag to associate all GetObjectTasks with + """ + return None + + def get_fileobj_for_io_writes(self, transfer_future): + """Get file-like object to use for io writes in the io executor + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The future associated with upload request + + returns: A file-like object to write to + """ + raise NotImplementedError('must implement get_fileobj_for_io_writes()') + + def queue_file_io_task(self, fileobj, data, offset): + """Queue IO write for submission to the IO executor. + + This method accepts an IO executor and information about the + downloaded data, and handles submitting this to the IO executor. + + This method may defer submission to the IO executor if necessary. + + """ + self._transfer_coordinator.submit( + self._io_executor, self.get_io_write_task(fileobj, data, offset) + ) + + def get_io_write_task(self, fileobj, data, offset): + """Get an IO write task for the requested set of data + + This task can be ran immediately or be submitted to the IO executor + for it to run. + + :type fileobj: file-like object + :param fileobj: The file-like object to write to + + :type data: bytes + :param data: The data to write out + + :type offset: integer + :param offset: The offset to write the data to in the file-like object + + :returns: An IO task to be used to write data to a file-like object + """ + return IOWriteTask( + self._transfer_coordinator, + main_kwargs={ + 'fileobj': fileobj, + 'data': data, + 'offset': offset, + }, + ) + + def get_final_io_task(self): + """Get the final io task to complete the download + + This is needed because based on the architecture of the TransferManager + the final tasks will be sent to the IO executor, but the executor + needs a final task for it to signal that the transfer is done and + all done callbacks can be run. + + :rtype: s3transfer.tasks.Task + :returns: A final task to completed in the io executor + """ + raise NotImplementedError('must implement get_final_io_task()') + + def _get_fileobj_from_filename(self, filename): + f = DeferredOpenFile( + filename, mode='wb', open_function=self._osutil.open + ) + # Make sure the file gets closed and we remove the temporary file + # if anything goes wrong during the process. + self._transfer_coordinator.add_failure_cleanup(f.close) + return f + + +class DownloadFilenameOutputManager(DownloadOutputManager): + def __init__(self, osutil, transfer_coordinator, io_executor): + super().__init__(osutil, transfer_coordinator, io_executor) + self._final_filename = None + self._temp_filename = None + self._temp_fileobj = None + + @classmethod + def is_compatible(cls, download_target, osutil): + return isinstance(download_target, str) + + def get_fileobj_for_io_writes(self, transfer_future): + fileobj = transfer_future.meta.call_args.fileobj + self._final_filename = fileobj + self._temp_filename = self._osutil.get_temp_filename(fileobj) + self._temp_fileobj = self._get_temp_fileobj() + return self._temp_fileobj + + def get_final_io_task(self): + # A task to rename the file from the temporary file to its final + # location is needed. This should be the last task needed to complete + # the download. + return IORenameFileTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'fileobj': self._temp_fileobj, + 'final_filename': self._final_filename, + 'osutil': self._osutil, + }, + is_final=True, + ) + + def _get_temp_fileobj(self): + f = self._get_fileobj_from_filename(self._temp_filename) + self._transfer_coordinator.add_failure_cleanup( + self._osutil.remove_file, self._temp_filename + ) + return f + + +class DownloadSeekableOutputManager(DownloadOutputManager): + @classmethod + def is_compatible(cls, download_target, osutil): + return seekable(download_target) + + def get_fileobj_for_io_writes(self, transfer_future): + # Return the fileobj provided to the future. + return transfer_future.meta.call_args.fileobj + + def get_final_io_task(self): + # This task will serve the purpose of signaling when all of the io + # writes have finished so done callbacks can be called. + return CompleteDownloadNOOPTask( + transfer_coordinator=self._transfer_coordinator + ) + + +class DownloadNonSeekableOutputManager(DownloadOutputManager): + def __init__( + self, osutil, transfer_coordinator, io_executor, defer_queue=None + ): + super().__init__(osutil, transfer_coordinator, io_executor) + if defer_queue is None: + defer_queue = DeferQueue() + self._defer_queue = defer_queue + self._io_submit_lock = threading.Lock() + + @classmethod + def is_compatible(cls, download_target, osutil): + return hasattr(download_target, 'write') + + def get_download_task_tag(self): + return IN_MEMORY_DOWNLOAD_TAG + + def get_fileobj_for_io_writes(self, transfer_future): + return transfer_future.meta.call_args.fileobj + + def get_final_io_task(self): + return CompleteDownloadNOOPTask( + transfer_coordinator=self._transfer_coordinator + ) + + def queue_file_io_task(self, fileobj, data, offset): + with self._io_submit_lock: + writes = self._defer_queue.request_writes(offset, data) + for write in writes: + data = write['data'] + logger.debug( + "Queueing IO offset %s for fileobj: %s", + write['offset'], + fileobj, + ) + super().queue_file_io_task(fileobj, data, offset) + + def get_io_write_task(self, fileobj, data, offset): + return IOStreamingWriteTask( + self._transfer_coordinator, + main_kwargs={ + 'fileobj': fileobj, + 'data': data, + }, + ) + + +class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager): + def __init__( + self, osutil, transfer_coordinator, io_executor, defer_queue=None + ): + super().__init__( + osutil, transfer_coordinator, io_executor, defer_queue + ) + self._fileobj = None + + @classmethod + def is_compatible(cls, download_target, osutil): + return isinstance(download_target, str) and osutil.is_special_file( + download_target + ) + + def get_fileobj_for_io_writes(self, transfer_future): + filename = transfer_future.meta.call_args.fileobj + self._fileobj = self._get_fileobj_from_filename(filename) + return self._fileobj + + def get_final_io_task(self): + # Make sure the file gets closed once the transfer is done. + return IOCloseTask( + transfer_coordinator=self._transfer_coordinator, + is_final=True, + main_kwargs={'fileobj': self._fileobj}, + ) + + +class DownloadSubmissionTask(SubmissionTask): + """Task for submitting tasks to execute a download""" + + def _get_download_output_manager_cls(self, transfer_future, osutil): + """Retrieves a class for managing output for a download + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future for the request + + :type osutil: s3transfer.utils.OSUtils + :param osutil: The os utility associated to the transfer + + :rtype: class of DownloadOutputManager + :returns: The appropriate class to use for managing a specific type of + input for downloads. + """ + download_manager_resolver_chain = [ + DownloadSpecialFilenameOutputManager, + DownloadFilenameOutputManager, + DownloadSeekableOutputManager, + DownloadNonSeekableOutputManager, + ] + + fileobj = transfer_future.meta.call_args.fileobj + for download_manager_cls in download_manager_resolver_chain: + if download_manager_cls.is_compatible(fileobj, osutil): + return download_manager_cls + raise RuntimeError( + f'Output {fileobj} of type: {type(fileobj)} is not supported.' + ) + + def _submit( + self, + client, + config, + osutil, + request_executor, + io_executor, + transfer_future, + bandwidth_limiter=None, + ): + """ + :param client: The client associated with the transfer manager + + :type config: s3transfer.manager.TransferConfig + :param config: The transfer config associated with the transfer + manager + + :type osutil: s3transfer.utils.OSUtil + :param osutil: The os utility associated to the transfer manager + + :type request_executor: s3transfer.futures.BoundedExecutor + :param request_executor: The request executor associated with the + transfer manager + + :type io_executor: s3transfer.futures.BoundedExecutor + :param io_executor: The io executor associated with the + transfer manager + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future associated with the + transfer request that tasks are being submitted for + + :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter + :param bandwidth_limiter: The bandwidth limiter to use when + downloading streams + """ + if transfer_future.meta.size is None: + # If a size was not provided figure out the size for the + # user. + response = client.head_object( + Bucket=transfer_future.meta.call_args.bucket, + Key=transfer_future.meta.call_args.key, + **transfer_future.meta.call_args.extra_args, + ) + transfer_future.meta.provide_transfer_size( + response['ContentLength'] + ) + + download_output_manager = self._get_download_output_manager_cls( + transfer_future, osutil + )(osutil, self._transfer_coordinator, io_executor) + + # If it is greater than threshold do a ranged download, otherwise + # do a regular GetObject download. + if transfer_future.meta.size < config.multipart_threshold: + self._submit_download_request( + client, + config, + osutil, + request_executor, + io_executor, + download_output_manager, + transfer_future, + bandwidth_limiter, + ) + else: + self._submit_ranged_download_request( + client, + config, + osutil, + request_executor, + io_executor, + download_output_manager, + transfer_future, + bandwidth_limiter, + ) + + def _submit_download_request( + self, + client, + config, + osutil, + request_executor, + io_executor, + download_output_manager, + transfer_future, + bandwidth_limiter, + ): + call_args = transfer_future.meta.call_args + + # Get a handle to the file that will be used for writing downloaded + # contents + fileobj = download_output_manager.get_fileobj_for_io_writes( + transfer_future + ) + + # Get the needed callbacks for the task + progress_callbacks = get_callbacks(transfer_future, 'progress') + + # Get any associated tags for the get object task. + get_object_tag = download_output_manager.get_download_task_tag() + + # Get the final io task to run once the download is complete. + final_task = download_output_manager.get_final_io_task() + + # Submit the task to download the object. + self._transfer_coordinator.submit( + request_executor, + ImmediatelyWriteIOGetObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'fileobj': fileobj, + 'extra_args': call_args.extra_args, + 'callbacks': progress_callbacks, + 'max_attempts': config.num_download_attempts, + 'download_output_manager': download_output_manager, + 'io_chunksize': config.io_chunksize, + 'bandwidth_limiter': bandwidth_limiter, + }, + done_callbacks=[final_task], + ), + tag=get_object_tag, + ) + + def _submit_ranged_download_request( + self, + client, + config, + osutil, + request_executor, + io_executor, + download_output_manager, + transfer_future, + bandwidth_limiter, + ): + call_args = transfer_future.meta.call_args + + # Get the needed progress callbacks for the task + progress_callbacks = get_callbacks(transfer_future, 'progress') + + # Get a handle to the file that will be used for writing downloaded + # contents + fileobj = download_output_manager.get_fileobj_for_io_writes( + transfer_future + ) + + # Determine the number of parts + part_size = config.multipart_chunksize + num_parts = calculate_num_parts(transfer_future.meta.size, part_size) + + # Get any associated tags for the get object task. + get_object_tag = download_output_manager.get_download_task_tag() + + # Callback invoker to submit the final io task once all downloads + # are complete. + finalize_download_invoker = CountCallbackInvoker( + self._get_final_io_task_submission_callback( + download_output_manager, io_executor + ) + ) + for i in range(num_parts): + # Calculate the range parameter + range_parameter = calculate_range_parameter( + part_size, i, num_parts + ) + + # Inject the Range parameter to the parameters to be passed in + # as extra args + extra_args = {'Range': range_parameter} + extra_args.update(call_args.extra_args) + finalize_download_invoker.increment() + # Submit the ranged downloads + self._transfer_coordinator.submit( + request_executor, + GetObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'fileobj': fileobj, + 'extra_args': extra_args, + 'callbacks': progress_callbacks, + 'max_attempts': config.num_download_attempts, + 'start_index': i * part_size, + 'download_output_manager': download_output_manager, + 'io_chunksize': config.io_chunksize, + 'bandwidth_limiter': bandwidth_limiter, + }, + done_callbacks=[finalize_download_invoker.decrement], + ), + tag=get_object_tag, + ) + finalize_download_invoker.finalize() + + def _get_final_io_task_submission_callback( + self, download_manager, io_executor + ): + final_task = download_manager.get_final_io_task() + return FunctionContainer( + self._transfer_coordinator.submit, io_executor, final_task + ) + + def _calculate_range_param(self, part_size, part_index, num_parts): + # Used to calculate the Range parameter + start_range = part_index * part_size + if part_index == num_parts - 1: + end_range = '' + else: + end_range = start_range + part_size - 1 + range_param = f'bytes={start_range}-{end_range}' + return range_param + + +class GetObjectTask(Task): + def _main( + self, + client, + bucket, + key, + fileobj, + extra_args, + callbacks, + max_attempts, + download_output_manager, + io_chunksize, + start_index=0, + bandwidth_limiter=None, + ): + """Downloads an object and places content into io queue + + :param client: The client to use when calling GetObject + :param bucket: The bucket to download from + :param key: The key to download from + :param fileobj: The file handle to write content to + :param exta_args: Any extra arguments to include in GetObject request + :param callbacks: List of progress callbacks to invoke on download + :param max_attempts: The number of retries to do when downloading + :param download_output_manager: The download output manager associated + with the current download. + :param io_chunksize: The size of each io chunk to read from the + download stream and queue in the io queue. + :param start_index: The location in the file to start writing the + content of the key to. + :param bandwidth_limiter: The bandwidth limiter to use when throttling + the downloading of data in streams. + """ + last_exception = None + for i in range(max_attempts): + try: + current_index = start_index + response = client.get_object( + Bucket=bucket, Key=key, **extra_args + ) + streaming_body = StreamReaderProgress( + response['Body'], callbacks + ) + if bandwidth_limiter: + streaming_body = ( + bandwidth_limiter.get_bandwith_limited_stream( + streaming_body, self._transfer_coordinator + ) + ) + + chunks = DownloadChunkIterator(streaming_body, io_chunksize) + for chunk in chunks: + # If the transfer is done because of a cancellation + # or error somewhere else, stop trying to submit more + # data to be written and break out of the download. + if not self._transfer_coordinator.done(): + self._handle_io( + download_output_manager, + fileobj, + chunk, + current_index, + ) + current_index += len(chunk) + else: + return + return + except S3_RETRYABLE_DOWNLOAD_ERRORS as e: + logger.debug( + "Retrying exception caught (%s), " + "retrying request, (attempt %s / %s)", + e, + i, + max_attempts, + exc_info=True, + ) + last_exception = e + # Also invoke the progress callbacks to indicate that we + # are trying to download the stream again and all progress + # for this GetObject has been lost. + invoke_progress_callbacks( + callbacks, start_index - current_index + ) + continue + raise RetriesExceededError(last_exception) + + def _handle_io(self, download_output_manager, fileobj, chunk, index): + download_output_manager.queue_file_io_task(fileobj, chunk, index) + + +class ImmediatelyWriteIOGetObjectTask(GetObjectTask): + """GetObjectTask that immediately writes to the provided file object + + This is useful for downloads where it is known only one thread is + downloading the object so there is no reason to go through the + overhead of using an IO queue and executor. + """ + + def _handle_io(self, download_output_manager, fileobj, chunk, index): + task = download_output_manager.get_io_write_task(fileobj, chunk, index) + task() + + +class IOWriteTask(Task): + def _main(self, fileobj, data, offset): + """Pulls off an io queue to write contents to a file + + :param fileobj: The file handle to write content to + :param data: The data to write + :param offset: The offset to write the data to. + """ + fileobj.seek(offset) + fileobj.write(data) + + +class IOStreamingWriteTask(Task): + """Task for writing data to a non-seekable stream.""" + + def _main(self, fileobj, data): + """Write data to a fileobj. + + Data will be written directly to the fileobj without + any prior seeking. + + :param fileobj: The fileobj to write content to + :param data: The data to write + + """ + fileobj.write(data) + + +class IORenameFileTask(Task): + """A task to rename a temporary file to its final filename + + :param fileobj: The file handle that content was written to. + :param final_filename: The final name of the file to rename to + upon completion of writing the contents. + :param osutil: OS utility + """ + + def _main(self, fileobj, final_filename, osutil): + fileobj.close() + osutil.rename_file(fileobj.name, final_filename) + + +class IOCloseTask(Task): + """A task to close out a file once the download is complete. + + :param fileobj: The fileobj to close. + """ + + def _main(self, fileobj): + fileobj.close() + + +class CompleteDownloadNOOPTask(Task): + """A NOOP task to serve as an indicator that the download is complete + + Note that the default for is_final is set to True because this should + always be the last task. + """ + + def __init__( + self, + transfer_coordinator, + main_kwargs=None, + pending_main_kwargs=None, + done_callbacks=None, + is_final=True, + ): + super().__init__( + transfer_coordinator=transfer_coordinator, + main_kwargs=main_kwargs, + pending_main_kwargs=pending_main_kwargs, + done_callbacks=done_callbacks, + is_final=is_final, + ) + + def _main(self): + pass + + +class DownloadChunkIterator: + def __init__(self, body, chunksize): + """Iterator to chunk out a downloaded S3 stream + + :param body: A readable file-like object + :param chunksize: The amount to read each time + """ + self._body = body + self._chunksize = chunksize + self._num_reads = 0 + + def __iter__(self): + return self + + def __next__(self): + chunk = self._body.read(self._chunksize) + self._num_reads += 1 + if chunk: + return chunk + elif self._num_reads == 1: + # Even though the response may have not had any + # content, we still want to account for an empty object's + # existence so return the empty chunk for that initial + # read. + return chunk + raise StopIteration() + + next = __next__ + + +class DeferQueue: + """IO queue that defers write requests until they are queued sequentially. + + This class is used to track IO data for a *single* fileobj. + + You can send data to this queue, and it will defer any IO write requests + until it has the next contiguous block available (starting at 0). + + """ + + def __init__(self): + self._writes = [] + self._pending_offsets = set() + self._next_offset = 0 + + def request_writes(self, offset, data): + """Request any available writes given new incoming data. + + You call this method by providing new data along with the + offset associated with the data. If that new data unlocks + any contiguous writes that can now be submitted, this + method will return all applicable writes. + + This is done with 1 method call so you don't have to + make two method calls (put(), get()) which acquires a lock + each method call. + + """ + if offset < self._next_offset: + # This is a request for a write that we've already + # seen. This can happen in the event of a retry + # where if we retry at at offset N/2, we'll requeue + # offsets 0-N/2 again. + return [] + writes = [] + if offset in self._pending_offsets: + # We've already queued this offset so this request is + # a duplicate. In this case we should ignore + # this request and prefer what's already queued. + return [] + heapq.heappush(self._writes, (offset, data)) + self._pending_offsets.add(offset) + while self._writes and self._writes[0][0] == self._next_offset: + next_write = heapq.heappop(self._writes) + writes.append({'offset': next_write[0], 'data': next_write[1]}) + self._pending_offsets.remove(next_write[0]) + self._next_offset += len(next_write[1]) + return writes diff --git a/.venv/lib/python3.12/site-packages/s3transfer/exceptions.py b/.venv/lib/python3.12/site-packages/s3transfer/exceptions.py new file mode 100644 index 00000000..6150fe65 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/exceptions.py @@ -0,0 +1,37 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from concurrent.futures import CancelledError + + +class RetriesExceededError(Exception): + def __init__(self, last_exception, msg='Max Retries Exceeded'): + super().__init__(msg) + self.last_exception = last_exception + + +class S3UploadFailedError(Exception): + pass + + +class InvalidSubscriberMethodError(Exception): + pass + + +class TransferNotDoneError(Exception): + pass + + +class FatalError(CancelledError): + """A CancelledError raised from an error in the TransferManager""" + + pass diff --git a/.venv/lib/python3.12/site-packages/s3transfer/futures.py b/.venv/lib/python3.12/site-packages/s3transfer/futures.py new file mode 100644 index 00000000..9ab30a7a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/futures.py @@ -0,0 +1,613 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import copy +import logging +import sys +import threading +from collections import namedtuple +from concurrent import futures + +from s3transfer.compat import MAXINT +from s3transfer.exceptions import CancelledError, TransferNotDoneError +from s3transfer.utils import FunctionContainer, TaskSemaphore + +try: + from botocore.context import get_context +except ImportError: + + def get_context(): + return None + + +logger = logging.getLogger(__name__) + + +class BaseTransferFuture: + @property + def meta(self): + """The metadata associated to the TransferFuture""" + raise NotImplementedError('meta') + + def done(self): + """Determines if a TransferFuture has completed + + :returns: True if completed. False, otherwise. + """ + raise NotImplementedError('done()') + + def result(self): + """Waits until TransferFuture is done and returns the result + + If the TransferFuture succeeded, it will return the result. If the + TransferFuture failed, it will raise the exception associated to the + failure. + """ + raise NotImplementedError('result()') + + def cancel(self): + """Cancels the request associated with the TransferFuture""" + raise NotImplementedError('cancel()') + + +class BaseTransferMeta: + @property + def call_args(self): + """The call args used in the transfer request""" + raise NotImplementedError('call_args') + + @property + def transfer_id(self): + """The unique id of the transfer""" + raise NotImplementedError('transfer_id') + + @property + def user_context(self): + """A dictionary that requesters can store data in""" + raise NotImplementedError('user_context') + + +class TransferFuture(BaseTransferFuture): + def __init__(self, meta=None, coordinator=None): + """The future associated to a submitted transfer request + + :type meta: TransferMeta + :param meta: The metadata associated to the request. This object + is visible to the requester. + + :type coordinator: TransferCoordinator + :param coordinator: The coordinator associated to the request. This + object is not visible to the requester. + """ + self._meta = meta + if meta is None: + self._meta = TransferMeta() + + self._coordinator = coordinator + if coordinator is None: + self._coordinator = TransferCoordinator() + + @property + def meta(self): + return self._meta + + def done(self): + return self._coordinator.done() + + def result(self): + try: + # Usually the result() method blocks until the transfer is done, + # however if a KeyboardInterrupt is raised we want want to exit + # out of this and propagate the exception. + return self._coordinator.result() + except KeyboardInterrupt as e: + self.cancel() + raise e + + def cancel(self): + self._coordinator.cancel() + + def set_exception(self, exception): + """Sets the exception on the future.""" + if not self.done(): + raise TransferNotDoneError( + 'set_exception can only be called once the transfer is ' + 'complete.' + ) + self._coordinator.set_exception(exception, override=True) + + +class TransferMeta(BaseTransferMeta): + """Holds metadata about the TransferFuture""" + + def __init__(self, call_args=None, transfer_id=None): + self._call_args = call_args + self._transfer_id = transfer_id + self._size = None + self._user_context = {} + + @property + def call_args(self): + """The call args used in the transfer request""" + return self._call_args + + @property + def transfer_id(self): + """The unique id of the transfer""" + return self._transfer_id + + @property + def size(self): + """The size of the transfer request if known""" + return self._size + + @property + def user_context(self): + """A dictionary that requesters can store data in""" + return self._user_context + + def provide_transfer_size(self, size): + """A method to provide the size of a transfer request + + By providing this value, the TransferManager will not try to + call HeadObject or use the use OS to determine the size of the + transfer. + """ + self._size = size + + +class TransferCoordinator: + """A helper class for managing TransferFuture""" + + def __init__(self, transfer_id=None): + self.transfer_id = transfer_id + self._status = 'not-started' + self._result = None + self._exception = None + self._associated_futures = set() + self._failure_cleanups = [] + self._done_callbacks = [] + self._done_event = threading.Event() + self._lock = threading.Lock() + self._associated_futures_lock = threading.Lock() + self._done_callbacks_lock = threading.Lock() + self._failure_cleanups_lock = threading.Lock() + + def __repr__(self): + return f'{self.__class__.__name__}(transfer_id={self.transfer_id})' + + @property + def exception(self): + return self._exception + + @property + def associated_futures(self): + """The list of futures associated to the inprogress TransferFuture + + Once the transfer finishes this list becomes empty as the transfer + is considered done and there should be no running futures left. + """ + with self._associated_futures_lock: + # We return a copy of the list because we do not want to + # processing the returned list while another thread is adding + # more futures to the actual list. + return copy.copy(self._associated_futures) + + @property + def failure_cleanups(self): + """The list of callbacks to call when the TransferFuture fails""" + return self._failure_cleanups + + @property + def status(self): + """The status of the TransferFuture + + The currently supported states are: + * not-started - Has yet to start. If in this state, a transfer + can be canceled immediately and nothing will happen. + * queued - SubmissionTask is about to submit tasks + * running - Is inprogress. In-progress as of now means that + the SubmissionTask that runs the transfer is being executed. So + there is no guarantee any transfer requests had been made to + S3 if this state is reached. + * cancelled - Was cancelled + * failed - An exception other than CancelledError was thrown + * success - No exceptions were thrown and is done. + """ + return self._status + + def set_result(self, result): + """Set a result for the TransferFuture + + Implies that the TransferFuture succeeded. This will always set a + result because it is invoked on the final task where there is only + ever one final task and it is ran at the very end of a transfer + process. So if a result is being set for this final task, the transfer + succeeded even if something came a long and canceled the transfer + on the final task. + """ + with self._lock: + self._exception = None + self._result = result + self._status = 'success' + + def set_exception(self, exception, override=False): + """Set an exception for the TransferFuture + + Implies the TransferFuture failed. + + :param exception: The exception that cause the transfer to fail. + :param override: If True, override any existing state. + """ + with self._lock: + if not self.done() or override: + self._exception = exception + self._status = 'failed' + + def result(self): + """Waits until TransferFuture is done and returns the result + + If the TransferFuture succeeded, it will return the result. If the + TransferFuture failed, it will raise the exception associated to the + failure. + """ + # Doing a wait() with no timeout cannot be interrupted in python2 but + # can be interrupted in python3 so we just wait with the largest + # possible value integer value, which is on the scale of billions of + # years... + self._done_event.wait(MAXINT) + + # Once done waiting, raise an exception if present or return the + # final result. + if self._exception: + raise self._exception + return self._result + + def cancel(self, msg='', exc_type=CancelledError): + """Cancels the TransferFuture + + :param msg: The message to attach to the cancellation + :param exc_type: The type of exception to set for the cancellation + """ + with self._lock: + if not self.done(): + should_announce_done = False + logger.debug('%s cancel(%s) called', self, msg) + self._exception = exc_type(msg) + if self._status == 'not-started': + should_announce_done = True + self._status = 'cancelled' + if should_announce_done: + self.announce_done() + + def set_status_to_queued(self): + """Sets the TransferFutrue's status to running""" + self._transition_to_non_done_state('queued') + + def set_status_to_running(self): + """Sets the TransferFuture's status to running""" + self._transition_to_non_done_state('running') + + def _transition_to_non_done_state(self, desired_state): + with self._lock: + if self.done(): + raise RuntimeError( + f'Unable to transition from done state {self.status} to non-done ' + f'state {desired_state}.' + ) + self._status = desired_state + + def submit(self, executor, task, tag=None): + """Submits a task to a provided executor + + :type executor: s3transfer.futures.BoundedExecutor + :param executor: The executor to submit the callable to + + :type task: s3transfer.tasks.Task + :param task: The task to submit to the executor + + :type tag: s3transfer.futures.TaskTag + :param tag: A tag to associate to the submitted task + + :rtype: concurrent.futures.Future + :returns: A future representing the submitted task + """ + logger.debug( + f"Submitting task {task} to executor {executor} for transfer request: {self.transfer_id}." + ) + future = executor.submit(task, tag=tag) + # Add this created future to the list of associated future just + # in case it is needed during cleanups. + self.add_associated_future(future) + future.add_done_callback( + FunctionContainer(self.remove_associated_future, future) + ) + return future + + def done(self): + """Determines if a TransferFuture has completed + + :returns: False if status is equal to 'failed', 'cancelled', or + 'success'. True, otherwise + """ + return self.status in ['failed', 'cancelled', 'success'] + + def add_associated_future(self, future): + """Adds a future to be associated with the TransferFuture""" + with self._associated_futures_lock: + self._associated_futures.add(future) + + def remove_associated_future(self, future): + """Removes a future's association to the TransferFuture""" + with self._associated_futures_lock: + self._associated_futures.remove(future) + + def add_done_callback(self, function, *args, **kwargs): + """Add a done callback to be invoked when transfer is done""" + with self._done_callbacks_lock: + self._done_callbacks.append( + FunctionContainer(function, *args, **kwargs) + ) + + def add_failure_cleanup(self, function, *args, **kwargs): + """Adds a callback to call upon failure""" + with self._failure_cleanups_lock: + self._failure_cleanups.append( + FunctionContainer(function, *args, **kwargs) + ) + + def announce_done(self): + """Announce that future is done running and run associated callbacks + + This will run any failure cleanups if the transfer failed if not + they have not been run, allows the result() to be unblocked, and will + run any done callbacks associated to the TransferFuture if they have + not already been ran. + """ + if self.status != 'success': + self._run_failure_cleanups() + self._done_event.set() + self._run_done_callbacks() + + def _run_done_callbacks(self): + # Run the callbacks and remove the callbacks from the internal + # list so they do not get ran again if done is announced more than + # once. + with self._done_callbacks_lock: + self._run_callbacks(self._done_callbacks) + self._done_callbacks = [] + + def _run_failure_cleanups(self): + # Run the cleanup callbacks and remove the callbacks from the internal + # list so they do not get ran again if done is announced more than + # once. + with self._failure_cleanups_lock: + self._run_callbacks(self.failure_cleanups) + self._failure_cleanups = [] + + def _run_callbacks(self, callbacks): + for callback in callbacks: + self._run_callback(callback) + + def _run_callback(self, callback): + try: + callback() + # We do not want a callback interrupting the process, especially + # in the failure cleanups. So log and catch, the exception. + except Exception: + logger.debug(f"Exception raised in {callback}.", exc_info=True) + + +class BoundedExecutor: + EXECUTOR_CLS = futures.ThreadPoolExecutor + + def __init__( + self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None + ): + """An executor implementation that has a maximum queued up tasks + + The executor will block if the number of tasks that have been + submitted and is currently working on is past its maximum. + + :params max_size: The maximum number of inflight futures. An inflight + future means that the task is either queued up or is currently + being executed. A size of None or 0 means that the executor will + have no bound in terms of the number of inflight futures. + + :params max_num_threads: The maximum number of threads the executor + uses. + + :type tag_semaphores: dict + :params tag_semaphores: A dictionary where the key is the name of the + tag and the value is the semaphore to use when limiting the + number of tasks the executor is processing at a time. + + :type executor_cls: BaseExecutor + :param underlying_executor_cls: The executor class that + get bounded by this executor. If None is provided, the + concurrent.futures.ThreadPoolExecutor class is used. + """ + self._max_num_threads = max_num_threads + if executor_cls is None: + executor_cls = self.EXECUTOR_CLS + self._executor = executor_cls(max_workers=self._max_num_threads) + self._semaphore = TaskSemaphore(max_size) + self._tag_semaphores = tag_semaphores + + def submit(self, task, tag=None, block=True): + """Submit a task to complete + + :type task: s3transfer.tasks.Task + :param task: The task to run __call__ on + + + :type tag: s3transfer.futures.TaskTag + :param tag: An optional tag to associate to the task. This + is used to override which semaphore to use. + + :type block: boolean + :param block: True if to wait till it is possible to submit a task. + False, if not to wait and raise an error if not able to submit + a task. + + :returns: The future associated to the submitted task + """ + semaphore = self._semaphore + # If a tag was provided, use the semaphore associated to that + # tag. + if tag: + semaphore = self._tag_semaphores[tag] + + # Call acquire on the semaphore. + acquire_token = semaphore.acquire(task.transfer_id, block) + # Create a callback to invoke when task is done in order to call + # release on the semaphore. + release_callback = FunctionContainer( + semaphore.release, task.transfer_id, acquire_token + ) + # Submit the task to the underlying executor. + # Pass the current context to ensure child threads persist the + # parent thread's context. + future = ExecutorFuture(self._executor.submit(task, get_context())) + # Add the Semaphore.release() callback to the future such that + # it is invoked once the future completes. + future.add_done_callback(release_callback) + return future + + def shutdown(self, wait=True): + self._executor.shutdown(wait) + + +class ExecutorFuture: + def __init__(self, future): + """A future returned from the executor + + Currently, it is just a wrapper around a concurrent.futures.Future. + However, this can eventually grow to implement the needed functionality + of concurrent.futures.Future if we move off of the library and not + affect the rest of the codebase. + + :type future: concurrent.futures.Future + :param future: The underlying future + """ + self._future = future + + def result(self): + return self._future.result() + + def add_done_callback(self, fn): + """Adds a callback to be completed once future is done + + :param fn: A callable that takes no arguments. Note that is different + than concurrent.futures.Future.add_done_callback that requires + a single argument for the future. + """ + + # The done callback for concurrent.futures.Future will always pass a + # the future in as the only argument. So we need to create the + # proper signature wrapper that will invoke the callback provided. + def done_callback(future_passed_to_callback): + return fn() + + self._future.add_done_callback(done_callback) + + def done(self): + return self._future.done() + + +class BaseExecutor: + """Base Executor class implementation needed to work with s3transfer""" + + def __init__(self, max_workers=None): + pass + + def submit(self, fn, *args, **kwargs): + raise NotImplementedError('submit()') + + def shutdown(self, wait=True): + raise NotImplementedError('shutdown()') + + +class NonThreadedExecutor(BaseExecutor): + """A drop-in replacement non-threaded version of ThreadPoolExecutor""" + + def submit(self, fn, *args, **kwargs): + future = NonThreadedExecutorFuture() + try: + result = fn(*args, **kwargs) + future.set_result(result) + except Exception: + e, tb = sys.exc_info()[1:] + logger.debug( + 'Setting exception for %s to %s with traceback %s', + future, + e, + tb, + ) + future.set_exception_info(e, tb) + return future + + def shutdown(self, wait=True): + pass + + +class NonThreadedExecutorFuture: + """The Future returned from NonThreadedExecutor + + Note that this future is **not** thread-safe as it is being used + from the context of a non-threaded environment. + """ + + def __init__(self): + self._result = None + self._exception = None + self._traceback = None + self._done = False + self._done_callbacks = [] + + def set_result(self, result): + self._result = result + self._set_done() + + def set_exception_info(self, exception, traceback): + self._exception = exception + self._traceback = traceback + self._set_done() + + def result(self, timeout=None): + if self._exception: + raise self._exception.with_traceback(self._traceback) + return self._result + + def _set_done(self): + self._done = True + for done_callback in self._done_callbacks: + self._invoke_done_callback(done_callback) + self._done_callbacks = [] + + def _invoke_done_callback(self, done_callback): + return done_callback(self) + + def done(self): + return self._done + + def add_done_callback(self, fn): + if self._done: + self._invoke_done_callback(fn) + else: + self._done_callbacks.append(fn) + + +TaskTag = namedtuple('TaskTag', ['name']) + +IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload') +IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download') diff --git a/.venv/lib/python3.12/site-packages/s3transfer/manager.py b/.venv/lib/python3.12/site-packages/s3transfer/manager.py new file mode 100644 index 00000000..8e1cdf06 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/manager.py @@ -0,0 +1,754 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import copy +import logging +import re +import threading + +from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket +from s3transfer.constants import ( + ALLOWED_DOWNLOAD_ARGS, + FULL_OBJECT_CHECKSUM_ARGS, + KB, + MB, +) +from s3transfer.copies import CopySubmissionTask +from s3transfer.delete import DeleteSubmissionTask +from s3transfer.download import DownloadSubmissionTask +from s3transfer.exceptions import CancelledError, FatalError +from s3transfer.futures import ( + IN_MEMORY_DOWNLOAD_TAG, + IN_MEMORY_UPLOAD_TAG, + BoundedExecutor, + TransferCoordinator, + TransferFuture, + TransferMeta, +) +from s3transfer.upload import UploadSubmissionTask +from s3transfer.utils import ( + CallArgs, + OSUtils, + SlidingWindowSemaphore, + TaskSemaphore, + get_callbacks, + set_default_checksum_algorithm, + signal_not_transferring, + signal_transferring, +) + +logger = logging.getLogger(__name__) + + +class TransferConfig: + def __init__( + self, + multipart_threshold=8 * MB, + multipart_chunksize=8 * MB, + max_request_concurrency=10, + max_submission_concurrency=5, + max_request_queue_size=1000, + max_submission_queue_size=1000, + max_io_queue_size=1000, + io_chunksize=256 * KB, + num_download_attempts=5, + max_in_memory_upload_chunks=10, + max_in_memory_download_chunks=10, + max_bandwidth=None, + ): + """Configurations for the transfer manager + + :param multipart_threshold: The threshold for which multipart + transfers occur. + + :param max_request_concurrency: The maximum number of S3 API + transfer-related requests that can happen at a time. + + :param max_submission_concurrency: The maximum number of threads + processing a call to a TransferManager method. Processing a + call usually entails determining which S3 API requests that need + to be enqueued, but does **not** entail making any of the + S3 API data transferring requests needed to perform the transfer. + The threads controlled by ``max_request_concurrency`` is + responsible for that. + + :param multipart_chunksize: The size of each transfer if a request + becomes a multipart transfer. + + :param max_request_queue_size: The maximum amount of S3 API requests + that can be queued at a time. + + :param max_submission_queue_size: The maximum amount of + TransferManager method calls that can be queued at a time. + + :param max_io_queue_size: The maximum amount of read parts that + can be queued to be written to disk per download. The default + size for each elementin this queue is 8 KB. + + :param io_chunksize: The max size of each chunk in the io queue. + Currently, this is size used when reading from the downloaded + stream as well. + + :param num_download_attempts: The number of download attempts that + will be tried upon errors with downloading an object in S3. Note + that these retries account for errors that occur when streaming + down the data from s3 (i.e. socket errors and read timeouts that + occur after receiving an OK response from s3). + Other retryable exceptions such as throttling errors and 5xx errors + are already retried by botocore (this default is 5). The + ``num_download_attempts`` does not take into account the + number of exceptions retried by botocore. + + :param max_in_memory_upload_chunks: The number of chunks that can + be stored in memory at a time for all ongoing upload requests. + This pertains to chunks of data that need to be stored in memory + during an upload if the data is sourced from a file-like object. + The total maximum memory footprint due to a in-memory upload + chunks is roughly equal to: + + max_in_memory_upload_chunks * multipart_chunksize + + max_submission_concurrency * multipart_chunksize + + ``max_submission_concurrency`` has an affect on this value because + for each thread pulling data off of a file-like object, they may + be waiting with a single read chunk to be submitted for upload + because the ``max_in_memory_upload_chunks`` value has been reached + by the threads making the upload request. + + :param max_in_memory_download_chunks: The number of chunks that can + be buffered in memory and **not** in the io queue at a time for all + ongoing download requests. This pertains specifically to file-like + objects that cannot be seeked. The total maximum memory footprint + due to a in-memory download chunks is roughly equal to: + + max_in_memory_download_chunks * multipart_chunksize + + :param max_bandwidth: The maximum bandwidth that will be consumed + in uploading and downloading file content. The value is in terms of + bytes per second. + """ + self.multipart_threshold = multipart_threshold + self.multipart_chunksize = multipart_chunksize + self.max_request_concurrency = max_request_concurrency + self.max_submission_concurrency = max_submission_concurrency + self.max_request_queue_size = max_request_queue_size + self.max_submission_queue_size = max_submission_queue_size + self.max_io_queue_size = max_io_queue_size + self.io_chunksize = io_chunksize + self.num_download_attempts = num_download_attempts + self.max_in_memory_upload_chunks = max_in_memory_upload_chunks + self.max_in_memory_download_chunks = max_in_memory_download_chunks + self.max_bandwidth = max_bandwidth + self._validate_attrs_are_nonzero() + + def _validate_attrs_are_nonzero(self): + for attr, attr_val in self.__dict__.items(): + if attr_val is not None and attr_val <= 0: + raise ValueError( + f'Provided parameter {attr} of value {attr_val} must ' + 'be greater than 0.' + ) + + +class TransferManager: + ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS + + _ALLOWED_SHARED_ARGS = [ + 'ACL', + 'CacheControl', + 'ChecksumAlgorithm', + 'ContentDisposition', + 'ContentEncoding', + 'ContentLanguage', + 'ContentType', + 'ExpectedBucketOwner', + 'Expires', + 'GrantFullControl', + 'GrantRead', + 'GrantReadACP', + 'GrantWriteACP', + 'Metadata', + 'ObjectLockLegalHoldStatus', + 'ObjectLockMode', + 'ObjectLockRetainUntilDate', + 'RequestPayer', + 'ServerSideEncryption', + 'StorageClass', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'SSEKMSKeyId', + 'SSEKMSEncryptionContext', + 'Tagging', + 'WebsiteRedirectLocation', + ] + + ALLOWED_UPLOAD_ARGS = ( + _ALLOWED_SHARED_ARGS + + [ + 'ChecksumType', + 'MpuObjectSize', + ] + + FULL_OBJECT_CHECKSUM_ARGS + ) + + ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [ + 'CopySourceIfMatch', + 'CopySourceIfModifiedSince', + 'CopySourceIfNoneMatch', + 'CopySourceIfUnmodifiedSince', + 'CopySourceSSECustomerAlgorithm', + 'CopySourceSSECustomerKey', + 'CopySourceSSECustomerKeyMD5', + 'MetadataDirective', + 'TaggingDirective', + ] + + ALLOWED_DELETE_ARGS = [ + 'MFA', + 'VersionId', + 'RequestPayer', + 'ExpectedBucketOwner', + ] + + VALIDATE_SUPPORTED_BUCKET_VALUES = True + + _UNSUPPORTED_BUCKET_PATTERNS = { + 'S3 Object Lambda': re.compile( + r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:' + r'accesspoint[/:][a-zA-Z0-9\-]{1,63}' + ), + } + + def __init__(self, client, config=None, osutil=None, executor_cls=None): + """A transfer manager interface for Amazon S3 + + :param client: Client to be used by the manager + :param config: TransferConfig to associate specific configurations + :param osutil: OSUtils object to use for os-related behavior when + using with transfer manager. + + :type executor_cls: s3transfer.futures.BaseExecutor + :param executor_cls: The class of executor to use with the transfer + manager. By default, concurrent.futures.ThreadPoolExecutor is used. + """ + self._client = client + self._config = config + if config is None: + self._config = TransferConfig() + self._osutil = osutil + if osutil is None: + self._osutil = OSUtils() + self._coordinator_controller = TransferCoordinatorController() + # A counter to create unique id's for each transfer submitted. + self._id_counter = 0 + + # The executor responsible for making S3 API transfer requests + self._request_executor = BoundedExecutor( + max_size=self._config.max_request_queue_size, + max_num_threads=self._config.max_request_concurrency, + tag_semaphores={ + IN_MEMORY_UPLOAD_TAG: TaskSemaphore( + self._config.max_in_memory_upload_chunks + ), + IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore( + self._config.max_in_memory_download_chunks + ), + }, + executor_cls=executor_cls, + ) + + # The executor responsible for submitting the necessary tasks to + # perform the desired transfer + self._submission_executor = BoundedExecutor( + max_size=self._config.max_submission_queue_size, + max_num_threads=self._config.max_submission_concurrency, + executor_cls=executor_cls, + ) + + # There is one thread available for writing to disk. It will handle + # downloads for all files. + self._io_executor = BoundedExecutor( + max_size=self._config.max_io_queue_size, + max_num_threads=1, + executor_cls=executor_cls, + ) + + # The component responsible for limiting bandwidth usage if it + # is configured. + self._bandwidth_limiter = None + if self._config.max_bandwidth is not None: + logger.debug( + 'Setting max_bandwidth to %s', self._config.max_bandwidth + ) + leaky_bucket = LeakyBucket(self._config.max_bandwidth) + self._bandwidth_limiter = BandwidthLimiter(leaky_bucket) + + self._register_handlers() + + @property + def client(self): + return self._client + + @property + def config(self): + return self._config + + def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): + """Uploads a file to S3 + + :type fileobj: str or seekable file-like object + :param fileobj: The name of a file to upload or a seekable file-like + object to upload. It is recommended to use a filename because + file-like objects may result in higher memory usage. + + :type bucket: str + :param bucket: The name of the bucket to upload to + + :type key: str + :param key: The name of the key to upload to + + :type extra_args: dict + :param extra_args: Extra arguments that may be passed to the + client operation + + :type subscribers: list(s3transfer.subscribers.BaseSubscriber) + :param subscribers: The list of subscribers to be invoked in the + order provided based on the event emit during the process of + the transfer request. + + :rtype: s3transfer.futures.TransferFuture + :returns: Transfer future representing the upload + """ + + extra_args = extra_args.copy() if extra_args else {} + if subscribers is None: + subscribers = [] + self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) + self._validate_if_bucket_supported(bucket) + self._add_operation_defaults(extra_args) + call_args = CallArgs( + fileobj=fileobj, + bucket=bucket, + key=key, + extra_args=extra_args, + subscribers=subscribers, + ) + extra_main_kwargs = {} + if self._bandwidth_limiter: + extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter + return self._submit_transfer( + call_args, UploadSubmissionTask, extra_main_kwargs + ) + + def download( + self, bucket, key, fileobj, extra_args=None, subscribers=None + ): + """Downloads a file from S3 + + :type bucket: str + :param bucket: The name of the bucket to download from + + :type key: str + :param key: The name of the key to download from + + :type fileobj: str or seekable file-like object + :param fileobj: The name of a file to download or a seekable file-like + object to download. It is recommended to use a filename because + file-like objects may result in higher memory usage. + + :type extra_args: dict + :param extra_args: Extra arguments that may be passed to the + client operation + + :type subscribers: list(s3transfer.subscribers.BaseSubscriber) + :param subscribers: The list of subscribers to be invoked in the + order provided based on the event emit during the process of + the transfer request. + + :rtype: s3transfer.futures.TransferFuture + :returns: Transfer future representing the download + """ + if extra_args is None: + extra_args = {} + if subscribers is None: + subscribers = [] + self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) + self._validate_if_bucket_supported(bucket) + call_args = CallArgs( + bucket=bucket, + key=key, + fileobj=fileobj, + extra_args=extra_args, + subscribers=subscribers, + ) + extra_main_kwargs = {'io_executor': self._io_executor} + if self._bandwidth_limiter: + extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter + return self._submit_transfer( + call_args, DownloadSubmissionTask, extra_main_kwargs + ) + + def copy( + self, + copy_source, + bucket, + key, + extra_args=None, + subscribers=None, + source_client=None, + ): + """Copies a file in S3 + + :type copy_source: dict + :param copy_source: The name of the source bucket, key name of the + source object, and optional version ID of the source object. The + dictionary format is: + ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note + that the ``VersionId`` key is optional and may be omitted. + + :type bucket: str + :param bucket: The name of the bucket to copy to + + :type key: str + :param key: The name of the key to copy to + + :type extra_args: dict + :param extra_args: Extra arguments that may be passed to the + client operation + + :type subscribers: a list of subscribers + :param subscribers: The list of subscribers to be invoked in the + order provided based on the event emit during the process of + the transfer request. + + :type source_client: botocore or boto3 Client + :param source_client: The client to be used for operation that + may happen at the source object. For example, this client is + used for the head_object that determines the size of the copy. + If no client is provided, the transfer manager's client is used + as the client for the source object. + + :rtype: s3transfer.futures.TransferFuture + :returns: Transfer future representing the copy + """ + if extra_args is None: + extra_args = {} + if subscribers is None: + subscribers = [] + if source_client is None: + source_client = self._client + self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS) + if isinstance(copy_source, dict): + self._validate_if_bucket_supported(copy_source.get('Bucket')) + self._validate_if_bucket_supported(bucket) + call_args = CallArgs( + copy_source=copy_source, + bucket=bucket, + key=key, + extra_args=extra_args, + subscribers=subscribers, + source_client=source_client, + ) + return self._submit_transfer(call_args, CopySubmissionTask) + + def delete(self, bucket, key, extra_args=None, subscribers=None): + """Delete an S3 object. + + :type bucket: str + :param bucket: The name of the bucket. + + :type key: str + :param key: The name of the S3 object to delete. + + :type extra_args: dict + :param extra_args: Extra arguments that may be passed to the + DeleteObject call. + + :type subscribers: list + :param subscribers: A list of subscribers to be invoked during the + process of the transfer request. Note that the ``on_progress`` + callback is not invoked during object deletion. + + :rtype: s3transfer.futures.TransferFuture + :return: Transfer future representing the deletion. + + """ + if extra_args is None: + extra_args = {} + if subscribers is None: + subscribers = [] + self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS) + self._validate_if_bucket_supported(bucket) + call_args = CallArgs( + bucket=bucket, + key=key, + extra_args=extra_args, + subscribers=subscribers, + ) + return self._submit_transfer(call_args, DeleteSubmissionTask) + + def _validate_if_bucket_supported(self, bucket): + # s3 high level operations don't support some resources + # (eg. S3 Object Lambda) only direct API calls are available + # for such resources + if self.VALIDATE_SUPPORTED_BUCKET_VALUES: + for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items(): + match = pattern.match(bucket) + if match: + raise ValueError( + f'TransferManager methods do not support {resource} ' + 'resource. Use direct client calls instead.' + ) + + def _validate_all_known_args(self, actual, allowed): + for kwarg in actual: + if kwarg not in allowed: + raise ValueError( + "Invalid extra_args key '{}', " + "must be one of: {}".format(kwarg, ', '.join(allowed)) + ) + + def _add_operation_defaults(self, extra_args): + if ( + self.client.meta.config.request_checksum_calculation + == "when_supported" + ): + set_default_checksum_algorithm(extra_args) + + def _submit_transfer( + self, call_args, submission_task_cls, extra_main_kwargs=None + ): + if not extra_main_kwargs: + extra_main_kwargs = {} + + # Create a TransferFuture to return back to the user + transfer_future, components = self._get_future_with_components( + call_args + ) + + # Add any provided done callbacks to the created transfer future + # to be invoked on the transfer future being complete. + for callback in get_callbacks(transfer_future, 'done'): + components['coordinator'].add_done_callback(callback) + + # Get the main kwargs needed to instantiate the submission task + main_kwargs = self._get_submission_task_main_kwargs( + transfer_future, extra_main_kwargs + ) + + # Submit a SubmissionTask that will submit all of the necessary + # tasks needed to complete the S3 transfer. + self._submission_executor.submit( + submission_task_cls( + transfer_coordinator=components['coordinator'], + main_kwargs=main_kwargs, + ) + ) + + # Increment the unique id counter for future transfer requests + self._id_counter += 1 + + return transfer_future + + def _get_future_with_components(self, call_args): + transfer_id = self._id_counter + # Creates a new transfer future along with its components + transfer_coordinator = TransferCoordinator(transfer_id=transfer_id) + # Track the transfer coordinator for transfers to manage. + self._coordinator_controller.add_transfer_coordinator( + transfer_coordinator + ) + # Also make sure that the transfer coordinator is removed once + # the transfer completes so it does not stick around in memory. + transfer_coordinator.add_done_callback( + self._coordinator_controller.remove_transfer_coordinator, + transfer_coordinator, + ) + components = { + 'meta': TransferMeta(call_args, transfer_id=transfer_id), + 'coordinator': transfer_coordinator, + } + transfer_future = TransferFuture(**components) + return transfer_future, components + + def _get_submission_task_main_kwargs( + self, transfer_future, extra_main_kwargs + ): + main_kwargs = { + 'client': self._client, + 'config': self._config, + 'osutil': self._osutil, + 'request_executor': self._request_executor, + 'transfer_future': transfer_future, + } + main_kwargs.update(extra_main_kwargs) + return main_kwargs + + def _register_handlers(self): + # Register handlers to enable/disable callbacks on uploads. + event_name = 'request-created.s3' + self._client.meta.events.register_first( + event_name, + signal_not_transferring, + unique_id='s3upload-not-transferring', + ) + self._client.meta.events.register_last( + event_name, signal_transferring, unique_id='s3upload-transferring' + ) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, *args): + cancel = False + cancel_msg = '' + cancel_exc_type = FatalError + # If a exception was raised in the context handler, signal to cancel + # all of the inprogress futures in the shutdown. + if exc_type: + cancel = True + cancel_msg = str(exc_value) + if not cancel_msg: + cancel_msg = repr(exc_value) + # If it was a KeyboardInterrupt, the cancellation was initiated + # by the user. + if isinstance(exc_value, KeyboardInterrupt): + cancel_exc_type = CancelledError + self._shutdown(cancel, cancel_msg, cancel_exc_type) + + def shutdown(self, cancel=False, cancel_msg=''): + """Shutdown the TransferManager + + It will wait till all transfers complete before it completely shuts + down. + + :type cancel: boolean + :param cancel: If True, calls TransferFuture.cancel() for + all in-progress in transfers. This is useful if you want the + shutdown to happen quicker. + + :type cancel_msg: str + :param cancel_msg: The message to specify if canceling all in-progress + transfers. + """ + self._shutdown(cancel, cancel, cancel_msg) + + def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError): + if cancel: + # Cancel all in-flight transfers if requested, before waiting + # for them to complete. + self._coordinator_controller.cancel(cancel_msg, exc_type) + try: + # Wait until there are no more in-progress transfers. This is + # wrapped in a try statement because this can be interrupted + # with a KeyboardInterrupt that needs to be caught. + self._coordinator_controller.wait() + except KeyboardInterrupt: + # If not errors were raised in the try block, the cancel should + # have no coordinators it needs to run cancel on. If there was + # an error raised in the try statement we want to cancel all of + # the inflight transfers before shutting down to speed that + # process up. + self._coordinator_controller.cancel('KeyboardInterrupt()') + raise + finally: + # Shutdown all of the executors. + self._submission_executor.shutdown() + self._request_executor.shutdown() + self._io_executor.shutdown() + + +class TransferCoordinatorController: + def __init__(self): + """Abstraction to control all transfer coordinators + + This abstraction allows the manager to wait for inprogress transfers + to complete and cancel all inprogress transfers. + """ + self._lock = threading.Lock() + self._tracked_transfer_coordinators = set() + + @property + def tracked_transfer_coordinators(self): + """The set of transfer coordinators being tracked""" + with self._lock: + # We return a copy because the set is mutable and if you were to + # iterate over the set, it may be changing in length due to + # additions and removals of transfer coordinators. + return copy.copy(self._tracked_transfer_coordinators) + + def add_transfer_coordinator(self, transfer_coordinator): + """Adds a transfer coordinator of a transfer to be canceled if needed + + :type transfer_coordinator: s3transfer.futures.TransferCoordinator + :param transfer_coordinator: The transfer coordinator for the + particular transfer + """ + with self._lock: + self._tracked_transfer_coordinators.add(transfer_coordinator) + + def remove_transfer_coordinator(self, transfer_coordinator): + """Remove a transfer coordinator from cancellation consideration + + Typically, this method is invoked by the transfer coordinator itself + to remove its self when it completes its transfer. + + :type transfer_coordinator: s3transfer.futures.TransferCoordinator + :param transfer_coordinator: The transfer coordinator for the + particular transfer + """ + with self._lock: + self._tracked_transfer_coordinators.remove(transfer_coordinator) + + def cancel(self, msg='', exc_type=CancelledError): + """Cancels all inprogress transfers + + This cancels the inprogress transfers by calling cancel() on all + tracked transfer coordinators. + + :param msg: The message to pass on to each transfer coordinator that + gets cancelled. + + :param exc_type: The type of exception to set for the cancellation + """ + for transfer_coordinator in self.tracked_transfer_coordinators: + transfer_coordinator.cancel(msg, exc_type) + + def wait(self): + """Wait until there are no more inprogress transfers + + This will not stop when failures are encountered and not propagate any + of these errors from failed transfers, but it can be interrupted with + a KeyboardInterrupt. + """ + try: + transfer_coordinator = None + for transfer_coordinator in self.tracked_transfer_coordinators: + transfer_coordinator.result() + except KeyboardInterrupt: + logger.debug('Received KeyboardInterrupt in wait()') + # If Keyboard interrupt is raised while waiting for + # the result, then exit out of the wait and raise the + # exception + if transfer_coordinator: + logger.debug( + 'On KeyboardInterrupt was waiting for %s', + transfer_coordinator, + ) + raise + except Exception: + # A general exception could have been thrown because + # of result(). We just want to ignore this and continue + # because we at least know that the transfer coordinator + # has completed. + pass diff --git a/.venv/lib/python3.12/site-packages/s3transfer/processpool.py b/.venv/lib/python3.12/site-packages/s3transfer/processpool.py new file mode 100644 index 00000000..318f1e7f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/processpool.py @@ -0,0 +1,1009 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Speeds up S3 throughput by using processes + +Getting Started +=============== + +The :class:`ProcessPoolDownloader` can be used to download a single file by +calling :meth:`ProcessPoolDownloader.download_file`: + +.. code:: python + + from s3transfer.processpool import ProcessPoolDownloader + + with ProcessPoolDownloader() as downloader: + downloader.download_file('mybucket', 'mykey', 'myfile') + + +This snippet downloads the S3 object located in the bucket ``mybucket`` at the +key ``mykey`` to the local file ``myfile``. Any errors encountered during the +transfer are not propagated. To determine if a transfer succeeded or +failed, use the `Futures`_ interface. + + +The :class:`ProcessPoolDownloader` can be used to download multiple files as +well: + +.. code:: python + + from s3transfer.processpool import ProcessPoolDownloader + + with ProcessPoolDownloader() as downloader: + downloader.download_file('mybucket', 'mykey', 'myfile') + downloader.download_file('mybucket', 'myotherkey', 'myotherfile') + + +When running this snippet, the downloading of ``mykey`` and ``myotherkey`` +happen in parallel. The first ``download_file`` call does not block the +second ``download_file`` call. The snippet blocks when exiting +the context manager and blocks until both downloads are complete. + +Alternatively, the ``ProcessPoolDownloader`` can be instantiated +and explicitly be shutdown using :meth:`ProcessPoolDownloader.shutdown`: + +.. code:: python + + from s3transfer.processpool import ProcessPoolDownloader + + downloader = ProcessPoolDownloader() + downloader.download_file('mybucket', 'mykey', 'myfile') + downloader.download_file('mybucket', 'myotherkey', 'myotherfile') + downloader.shutdown() + + +For this code snippet, the call to ``shutdown`` blocks until both +downloads are complete. + + +Additional Parameters +===================== + +Additional parameters can be provided to the ``download_file`` method: + +* ``extra_args``: A dictionary containing any additional client arguments + to include in the + `GetObject <https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object>`_ + API request. For example: + + .. code:: python + + from s3transfer.processpool import ProcessPoolDownloader + + with ProcessPoolDownloader() as downloader: + downloader.download_file( + 'mybucket', 'mykey', 'myfile', + extra_args={'VersionId': 'myversion'}) + + +* ``expected_size``: By default, the downloader will make a HeadObject + call to determine the size of the object. To opt-out of this additional + API call, you can provide the size of the object in bytes: + + .. code:: python + + from s3transfer.processpool import ProcessPoolDownloader + + MB = 1024 * 1024 + with ProcessPoolDownloader() as downloader: + downloader.download_file( + 'mybucket', 'mykey', 'myfile', expected_size=2 * MB) + + +Futures +======= + +When ``download_file`` is called, it immediately returns a +:class:`ProcessPoolTransferFuture`. The future can be used to poll the state +of a particular transfer. To get the result of the download, +call :meth:`ProcessPoolTransferFuture.result`. The method blocks +until the transfer completes, whether it succeeds or fails. For example: + +.. code:: python + + from s3transfer.processpool import ProcessPoolDownloader + + with ProcessPoolDownloader() as downloader: + future = downloader.download_file('mybucket', 'mykey', 'myfile') + print(future.result()) + + +If the download succeeds, the future returns ``None``: + +.. code:: python + + None + + +If the download fails, the exception causing the failure is raised. For +example, if ``mykey`` did not exist, the following error would be raised + + +.. code:: python + + botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found + + +.. note:: + + :meth:`ProcessPoolTransferFuture.result` can only be called while the + ``ProcessPoolDownloader`` is running (e.g. before calling ``shutdown`` or + inside the context manager). + + +Process Pool Configuration +========================== + +By default, the downloader has the following configuration options: + +* ``multipart_threshold``: The threshold size for performing ranged downloads + in bytes. By default, ranged downloads happen for S3 objects that are + greater than or equal to 8 MB in size. + +* ``multipart_chunksize``: The size of each ranged download in bytes. By + default, the size of each ranged download is 8 MB. + +* ``max_request_processes``: The maximum number of processes used to download + S3 objects. By default, the maximum is 10 processes. + + +To change the default configuration, use the :class:`ProcessTransferConfig`: + +.. code:: python + + from s3transfer.processpool import ProcessPoolDownloader + from s3transfer.processpool import ProcessTransferConfig + + config = ProcessTransferConfig( + multipart_threshold=64 * 1024 * 1024, # 64 MB + max_request_processes=50 + ) + downloader = ProcessPoolDownloader(config=config) + + +Client Configuration +==================== + +The process pool downloader creates ``botocore`` clients on your behalf. In +order to affect how the client is created, pass the keyword arguments +that would have been used in the :meth:`botocore.Session.create_client` call: + +.. code:: python + + + from s3transfer.processpool import ProcessPoolDownloader + from s3transfer.processpool import ProcessTransferConfig + + downloader = ProcessPoolDownloader( + client_kwargs={'region_name': 'us-west-2'}) + + +This snippet ensures that all clients created by the ``ProcessPoolDownloader`` +are using ``us-west-2`` as their region. + +""" + +import collections +import contextlib +import logging +import multiprocessing +import signal +import threading +from copy import deepcopy + +import botocore.session +from botocore.config import Config + +from s3transfer.compat import MAXINT, BaseManager +from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, MB, PROCESS_USER_AGENT +from s3transfer.exceptions import CancelledError, RetriesExceededError +from s3transfer.futures import BaseTransferFuture, BaseTransferMeta +from s3transfer.utils import ( + S3_RETRYABLE_DOWNLOAD_ERRORS, + CallArgs, + OSUtils, + calculate_num_parts, + calculate_range_parameter, +) + +logger = logging.getLogger(__name__) + +SHUTDOWN_SIGNAL = 'SHUTDOWN' + +# The DownloadFileRequest tuple is submitted from the ProcessPoolDownloader +# to the GetObjectSubmitter in order for the submitter to begin submitting +# GetObjectJobs to the GetObjectWorkers. +DownloadFileRequest = collections.namedtuple( + 'DownloadFileRequest', + [ + 'transfer_id', # The unique id for the transfer + 'bucket', # The bucket to download the object from + 'key', # The key to download the object from + 'filename', # The user-requested download location + 'extra_args', # Extra arguments to provide to client calls + 'expected_size', # The user-provided expected size of the download + ], +) + +# The GetObjectJob tuple is submitted from the GetObjectSubmitter +# to the GetObjectWorkers to download the file or parts of the file. +GetObjectJob = collections.namedtuple( + 'GetObjectJob', + [ + 'transfer_id', # The unique id for the transfer + 'bucket', # The bucket to download the object from + 'key', # The key to download the object from + 'temp_filename', # The temporary file to write the content to via + # completed GetObject calls. + 'extra_args', # Extra arguments to provide to the GetObject call + 'offset', # The offset to write the content for the temp file. + 'filename', # The user-requested download location. The worker + # of final GetObjectJob will move the file located at + # temp_filename to the location of filename. + ], +) + + +@contextlib.contextmanager +def ignore_ctrl_c(): + original_handler = _add_ignore_handler_for_interrupts() + yield + signal.signal(signal.SIGINT, original_handler) + + +def _add_ignore_handler_for_interrupts(): + # Windows is unable to pickle signal.signal directly so it needs to + # be wrapped in a function defined at the module level + return signal.signal(signal.SIGINT, signal.SIG_IGN) + + +class ProcessTransferConfig: + def __init__( + self, + multipart_threshold=8 * MB, + multipart_chunksize=8 * MB, + max_request_processes=10, + ): + """Configuration for the ProcessPoolDownloader + + :param multipart_threshold: The threshold for which ranged downloads + occur. + + :param multipart_chunksize: The chunk size of each ranged download. + + :param max_request_processes: The maximum number of processes that + will be making S3 API transfer-related requests at a time. + """ + self.multipart_threshold = multipart_threshold + self.multipart_chunksize = multipart_chunksize + self.max_request_processes = max_request_processes + + +class ProcessPoolDownloader: + def __init__(self, client_kwargs=None, config=None): + """Downloads S3 objects using process pools + + :type client_kwargs: dict + :param client_kwargs: The keyword arguments to provide when + instantiating S3 clients. The arguments must match the keyword + arguments provided to the + `botocore.session.Session.create_client()` method. + + :type config: ProcessTransferConfig + :param config: Configuration for the downloader + """ + if client_kwargs is None: + client_kwargs = {} + self._client_factory = ClientFactory(client_kwargs) + + self._transfer_config = config + if config is None: + self._transfer_config = ProcessTransferConfig() + + self._download_request_queue = multiprocessing.Queue(1000) + self._worker_queue = multiprocessing.Queue(1000) + self._osutil = OSUtils() + + self._started = False + self._start_lock = threading.Lock() + + # These below are initialized in the start() method + self._manager = None + self._transfer_monitor = None + self._submitter = None + self._workers = [] + + def download_file( + self, bucket, key, filename, extra_args=None, expected_size=None + ): + """Downloads the object's contents to a file + + :type bucket: str + :param bucket: The name of the bucket to download from + + :type key: str + :param key: The name of the key to download from + + :type filename: str + :param filename: The name of a file to download to. + + :type extra_args: dict + :param extra_args: Extra arguments that may be passed to the + client operation + + :type expected_size: int + :param expected_size: The expected size in bytes of the download. If + provided, the downloader will not call HeadObject to determine the + object's size and use the provided value instead. The size is + needed to determine whether to do a multipart download. + + :rtype: s3transfer.futures.TransferFuture + :returns: Transfer future representing the download + """ + self._start_if_needed() + if extra_args is None: + extra_args = {} + self._validate_all_known_args(extra_args) + transfer_id = self._transfer_monitor.notify_new_transfer() + download_file_request = DownloadFileRequest( + transfer_id=transfer_id, + bucket=bucket, + key=key, + filename=filename, + extra_args=extra_args, + expected_size=expected_size, + ) + logger.debug( + 'Submitting download file request: %s.', download_file_request + ) + self._download_request_queue.put(download_file_request) + call_args = CallArgs( + bucket=bucket, + key=key, + filename=filename, + extra_args=extra_args, + expected_size=expected_size, + ) + future = self._get_transfer_future(transfer_id, call_args) + return future + + def shutdown(self): + """Shutdown the downloader + + It will wait till all downloads are complete before returning. + """ + self._shutdown_if_needed() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, *args): + if isinstance(exc_value, KeyboardInterrupt): + if self._transfer_monitor is not None: + self._transfer_monitor.notify_cancel_all_in_progress() + self.shutdown() + + def _start_if_needed(self): + with self._start_lock: + if not self._started: + self._start() + + def _start(self): + self._start_transfer_monitor_manager() + self._start_submitter() + self._start_get_object_workers() + self._started = True + + def _validate_all_known_args(self, provided): + for kwarg in provided: + if kwarg not in ALLOWED_DOWNLOAD_ARGS: + download_args = ', '.join(ALLOWED_DOWNLOAD_ARGS) + raise ValueError( + f"Invalid extra_args key '{kwarg}', " + f"must be one of: {download_args}" + ) + + def _get_transfer_future(self, transfer_id, call_args): + meta = ProcessPoolTransferMeta( + call_args=call_args, transfer_id=transfer_id + ) + future = ProcessPoolTransferFuture( + monitor=self._transfer_monitor, meta=meta + ) + return future + + def _start_transfer_monitor_manager(self): + logger.debug('Starting the TransferMonitorManager.') + self._manager = TransferMonitorManager() + # We do not want Ctrl-C's to cause the manager to shutdown immediately + # as worker processes will still need to communicate with it when they + # are shutting down. So instead we ignore Ctrl-C and let the manager + # be explicitly shutdown when shutting down the downloader. + self._manager.start(_add_ignore_handler_for_interrupts) + self._transfer_monitor = self._manager.TransferMonitor() + + def _start_submitter(self): + logger.debug('Starting the GetObjectSubmitter.') + self._submitter = GetObjectSubmitter( + transfer_config=self._transfer_config, + client_factory=self._client_factory, + transfer_monitor=self._transfer_monitor, + osutil=self._osutil, + download_request_queue=self._download_request_queue, + worker_queue=self._worker_queue, + ) + self._submitter.start() + + def _start_get_object_workers(self): + logger.debug( + 'Starting %s GetObjectWorkers.', + self._transfer_config.max_request_processes, + ) + for _ in range(self._transfer_config.max_request_processes): + worker = GetObjectWorker( + queue=self._worker_queue, + client_factory=self._client_factory, + transfer_monitor=self._transfer_monitor, + osutil=self._osutil, + ) + worker.start() + self._workers.append(worker) + + def _shutdown_if_needed(self): + with self._start_lock: + if self._started: + self._shutdown() + + def _shutdown(self): + self._shutdown_submitter() + self._shutdown_get_object_workers() + self._shutdown_transfer_monitor_manager() + self._started = False + + def _shutdown_transfer_monitor_manager(self): + logger.debug('Shutting down the TransferMonitorManager.') + self._manager.shutdown() + + def _shutdown_submitter(self): + logger.debug('Shutting down the GetObjectSubmitter.') + self._download_request_queue.put(SHUTDOWN_SIGNAL) + self._submitter.join() + + def _shutdown_get_object_workers(self): + logger.debug('Shutting down the GetObjectWorkers.') + for _ in self._workers: + self._worker_queue.put(SHUTDOWN_SIGNAL) + for worker in self._workers: + worker.join() + + +class ProcessPoolTransferFuture(BaseTransferFuture): + def __init__(self, monitor, meta): + """The future associated to a submitted process pool transfer request + + :type monitor: TransferMonitor + :param monitor: The monitor associated to the process pool downloader + + :type meta: ProcessPoolTransferMeta + :param meta: The metadata associated to the request. This object + is visible to the requester. + """ + self._monitor = monitor + self._meta = meta + + @property + def meta(self): + return self._meta + + def done(self): + return self._monitor.is_done(self._meta.transfer_id) + + def result(self): + try: + return self._monitor.poll_for_result(self._meta.transfer_id) + except KeyboardInterrupt: + # For the multiprocessing Manager, a thread is given a single + # connection to reuse in communicating between the thread in the + # main process and the Manager's process. If a Ctrl-C happens when + # polling for the result, it will make the main thread stop trying + # to receive from the connection, but the Manager process will not + # know that the main process has stopped trying to receive and + # will not close the connection. As a result if another message is + # sent to the Manager process, the listener in the Manager + # processes will not process the new message as it is still trying + # trying to process the previous message (that was Ctrl-C'd) and + # thus cause the thread in the main process to hang on its send. + # The only way around this is to create a new connection and send + # messages from that new connection instead. + self._monitor._connect() + self.cancel() + raise + + def cancel(self): + self._monitor.notify_exception( + self._meta.transfer_id, CancelledError() + ) + + +class ProcessPoolTransferMeta(BaseTransferMeta): + """Holds metadata about the ProcessPoolTransferFuture""" + + def __init__(self, transfer_id, call_args): + self._transfer_id = transfer_id + self._call_args = call_args + self._user_context = {} + + @property + def call_args(self): + return self._call_args + + @property + def transfer_id(self): + return self._transfer_id + + @property + def user_context(self): + return self._user_context + + +class ClientFactory: + def __init__(self, client_kwargs=None): + """Creates S3 clients for processes + + Botocore sessions and clients are not pickleable so they cannot be + inherited across Process boundaries. Instead, they must be instantiated + once a process is running. + """ + self._client_kwargs = client_kwargs + if self._client_kwargs is None: + self._client_kwargs = {} + + client_config = deepcopy(self._client_kwargs.get('config', Config())) + if not client_config.user_agent_extra: + client_config.user_agent_extra = PROCESS_USER_AGENT + else: + client_config.user_agent_extra += " " + PROCESS_USER_AGENT + self._client_kwargs['config'] = client_config + + def create_client(self): + """Create a botocore S3 client""" + return botocore.session.Session().create_client( + 's3', **self._client_kwargs + ) + + +class TransferMonitor: + def __init__(self): + """Monitors transfers for cross-process communication + + Notifications can be sent to the monitor and information can be + retrieved from the monitor for a particular transfer. This abstraction + is ran in a ``multiprocessing.managers.BaseManager`` in order to be + shared across processes. + """ + # TODO: Add logic that removes the TransferState if the transfer is + # marked as done and the reference to the future is no longer being + # held onto. Without this logic, this dictionary will continue to + # grow in size with no limit. + self._transfer_states = {} + self._id_count = 0 + self._init_lock = threading.Lock() + + def notify_new_transfer(self): + with self._init_lock: + transfer_id = self._id_count + self._transfer_states[transfer_id] = TransferState() + self._id_count += 1 + return transfer_id + + def is_done(self, transfer_id): + """Determine a particular transfer is complete + + :param transfer_id: Unique identifier for the transfer + :return: True, if done. False, otherwise. + """ + return self._transfer_states[transfer_id].done + + def notify_done(self, transfer_id): + """Notify a particular transfer is complete + + :param transfer_id: Unique identifier for the transfer + """ + self._transfer_states[transfer_id].set_done() + + def poll_for_result(self, transfer_id): + """Poll for the result of a transfer + + :param transfer_id: Unique identifier for the transfer + :return: If the transfer succeeded, it will return the result. If the + transfer failed, it will raise the exception associated to the + failure. + """ + self._transfer_states[transfer_id].wait_till_done() + exception = self._transfer_states[transfer_id].exception + if exception: + raise exception + return None + + def notify_exception(self, transfer_id, exception): + """Notify an exception was encountered for a transfer + + :param transfer_id: Unique identifier for the transfer + :param exception: The exception encountered for that transfer + """ + # TODO: Not all exceptions are pickleable so if we are running + # this in a multiprocessing.BaseManager we will want to + # make sure to update this signature to ensure pickleability of the + # arguments or have the ProxyObject do the serialization. + self._transfer_states[transfer_id].exception = exception + + def notify_cancel_all_in_progress(self): + for transfer_state in self._transfer_states.values(): + if not transfer_state.done: + transfer_state.exception = CancelledError() + + def get_exception(self, transfer_id): + """Retrieve the exception encountered for the transfer + + :param transfer_id: Unique identifier for the transfer + :return: The exception encountered for that transfer. Otherwise + if there were no exceptions, returns None. + """ + return self._transfer_states[transfer_id].exception + + def notify_expected_jobs_to_complete(self, transfer_id, num_jobs): + """Notify the amount of jobs expected for a transfer + + :param transfer_id: Unique identifier for the transfer + :param num_jobs: The number of jobs to complete the transfer + """ + self._transfer_states[transfer_id].jobs_to_complete = num_jobs + + def notify_job_complete(self, transfer_id): + """Notify that a single job is completed for a transfer + + :param transfer_id: Unique identifier for the transfer + :return: The number of jobs remaining to complete the transfer + """ + return self._transfer_states[transfer_id].decrement_jobs_to_complete() + + +class TransferState: + """Represents the current state of an individual transfer""" + + # NOTE: Ideally the TransferState object would be used directly by the + # various different abstractions in the ProcessPoolDownloader and remove + # the need for the TransferMonitor. However, it would then impose the + # constraint that two hops are required to make or get any changes in the + # state of a transfer across processes: one hop to get a proxy object for + # the TransferState and then a second hop to communicate calling the + # specific TransferState method. + def __init__(self): + self._exception = None + self._done_event = threading.Event() + self._job_lock = threading.Lock() + self._jobs_to_complete = 0 + + @property + def done(self): + return self._done_event.is_set() + + def set_done(self): + self._done_event.set() + + def wait_till_done(self): + self._done_event.wait(MAXINT) + + @property + def exception(self): + return self._exception + + @exception.setter + def exception(self, val): + self._exception = val + + @property + def jobs_to_complete(self): + return self._jobs_to_complete + + @jobs_to_complete.setter + def jobs_to_complete(self, val): + self._jobs_to_complete = val + + def decrement_jobs_to_complete(self): + with self._job_lock: + self._jobs_to_complete -= 1 + return self._jobs_to_complete + + +class TransferMonitorManager(BaseManager): + pass + + +TransferMonitorManager.register('TransferMonitor', TransferMonitor) + + +class BaseS3TransferProcess(multiprocessing.Process): + def __init__(self, client_factory): + super().__init__() + self._client_factory = client_factory + self._client = None + + def run(self): + # Clients are not pickleable so their instantiation cannot happen + # in the __init__ for processes that are created under the + # spawn method. + self._client = self._client_factory.create_client() + with ignore_ctrl_c(): + # By default these processes are ran as child processes to the + # main process. Any Ctrl-c encountered in the main process is + # propagated to the child process and interrupt it at any time. + # To avoid any potentially bad states caused from an interrupt + # (i.e. a transfer failing to notify its done or making the + # communication protocol become out of sync with the + # TransferMonitor), we ignore all Ctrl-C's and allow the main + # process to notify these child processes when to stop processing + # jobs. + self._do_run() + + def _do_run(self): + raise NotImplementedError('_do_run()') + + +class GetObjectSubmitter(BaseS3TransferProcess): + def __init__( + self, + transfer_config, + client_factory, + transfer_monitor, + osutil, + download_request_queue, + worker_queue, + ): + """Submit GetObjectJobs to fulfill a download file request + + :param transfer_config: Configuration for transfers. + :param client_factory: ClientFactory for creating S3 clients. + :param transfer_monitor: Monitor for notifying and retrieving state + of transfer. + :param osutil: OSUtils object to use for os-related behavior when + performing the transfer. + :param download_request_queue: Queue to retrieve download file + requests. + :param worker_queue: Queue to submit GetObjectJobs for workers + to perform. + """ + super().__init__(client_factory) + self._transfer_config = transfer_config + self._transfer_monitor = transfer_monitor + self._osutil = osutil + self._download_request_queue = download_request_queue + self._worker_queue = worker_queue + + def _do_run(self): + while True: + download_file_request = self._download_request_queue.get() + if download_file_request == SHUTDOWN_SIGNAL: + logger.debug('Submitter shutdown signal received.') + return + try: + self._submit_get_object_jobs(download_file_request) + except Exception as e: + logger.debug( + 'Exception caught when submitting jobs for ' + 'download file request %s: %s', + download_file_request, + e, + exc_info=True, + ) + self._transfer_monitor.notify_exception( + download_file_request.transfer_id, e + ) + self._transfer_monitor.notify_done( + download_file_request.transfer_id + ) + + def _submit_get_object_jobs(self, download_file_request): + size = self._get_size(download_file_request) + temp_filename = self._allocate_temp_file(download_file_request, size) + if size < self._transfer_config.multipart_threshold: + self._submit_single_get_object_job( + download_file_request, temp_filename + ) + else: + self._submit_ranged_get_object_jobs( + download_file_request, temp_filename, size + ) + + def _get_size(self, download_file_request): + expected_size = download_file_request.expected_size + if expected_size is None: + expected_size = self._client.head_object( + Bucket=download_file_request.bucket, + Key=download_file_request.key, + **download_file_request.extra_args, + )['ContentLength'] + return expected_size + + def _allocate_temp_file(self, download_file_request, size): + temp_filename = self._osutil.get_temp_filename( + download_file_request.filename + ) + self._osutil.allocate(temp_filename, size) + return temp_filename + + def _submit_single_get_object_job( + self, download_file_request, temp_filename + ): + self._notify_jobs_to_complete(download_file_request.transfer_id, 1) + self._submit_get_object_job( + transfer_id=download_file_request.transfer_id, + bucket=download_file_request.bucket, + key=download_file_request.key, + temp_filename=temp_filename, + offset=0, + extra_args=download_file_request.extra_args, + filename=download_file_request.filename, + ) + + def _submit_ranged_get_object_jobs( + self, download_file_request, temp_filename, size + ): + part_size = self._transfer_config.multipart_chunksize + num_parts = calculate_num_parts(size, part_size) + self._notify_jobs_to_complete( + download_file_request.transfer_id, num_parts + ) + for i in range(num_parts): + offset = i * part_size + range_parameter = calculate_range_parameter( + part_size, i, num_parts + ) + get_object_kwargs = {'Range': range_parameter} + get_object_kwargs.update(download_file_request.extra_args) + self._submit_get_object_job( + transfer_id=download_file_request.transfer_id, + bucket=download_file_request.bucket, + key=download_file_request.key, + temp_filename=temp_filename, + offset=offset, + extra_args=get_object_kwargs, + filename=download_file_request.filename, + ) + + def _submit_get_object_job(self, **get_object_job_kwargs): + self._worker_queue.put(GetObjectJob(**get_object_job_kwargs)) + + def _notify_jobs_to_complete(self, transfer_id, jobs_to_complete): + logger.debug( + 'Notifying %s job(s) to complete for transfer_id %s.', + jobs_to_complete, + transfer_id, + ) + self._transfer_monitor.notify_expected_jobs_to_complete( + transfer_id, jobs_to_complete + ) + + +class GetObjectWorker(BaseS3TransferProcess): + # TODO: It may make sense to expose these class variables as configuration + # options if users want to tweak them. + _MAX_ATTEMPTS = 5 + _IO_CHUNKSIZE = 2 * MB + + def __init__(self, queue, client_factory, transfer_monitor, osutil): + """Fulfills GetObjectJobs + + Downloads the S3 object, writes it to the specified file, and + renames the file to its final location if it completes the final + job for a particular transfer. + + :param queue: Queue for retrieving GetObjectJob's + :param client_factory: ClientFactory for creating S3 clients + :param transfer_monitor: Monitor for notifying + :param osutil: OSUtils object to use for os-related behavior when + performing the transfer. + """ + super().__init__(client_factory) + self._queue = queue + self._client_factory = client_factory + self._transfer_monitor = transfer_monitor + self._osutil = osutil + + def _do_run(self): + while True: + job = self._queue.get() + if job == SHUTDOWN_SIGNAL: + logger.debug('Worker shutdown signal received.') + return + if not self._transfer_monitor.get_exception(job.transfer_id): + self._run_get_object_job(job) + else: + logger.debug( + 'Skipping get object job %s because there was a previous ' + 'exception.', + job, + ) + remaining = self._transfer_monitor.notify_job_complete( + job.transfer_id + ) + logger.debug( + '%s jobs remaining for transfer_id %s.', + remaining, + job.transfer_id, + ) + if not remaining: + self._finalize_download( + job.transfer_id, job.temp_filename, job.filename + ) + + def _run_get_object_job(self, job): + try: + self._do_get_object( + bucket=job.bucket, + key=job.key, + temp_filename=job.temp_filename, + extra_args=job.extra_args, + offset=job.offset, + ) + except Exception as e: + logger.debug( + 'Exception caught when downloading object for ' + 'get object job %s: %s', + job, + e, + exc_info=True, + ) + self._transfer_monitor.notify_exception(job.transfer_id, e) + + def _do_get_object(self, bucket, key, extra_args, temp_filename, offset): + last_exception = None + for i in range(self._MAX_ATTEMPTS): + try: + response = self._client.get_object( + Bucket=bucket, Key=key, **extra_args + ) + self._write_to_file(temp_filename, offset, response['Body']) + return + except S3_RETRYABLE_DOWNLOAD_ERRORS as e: + logger.debug( + 'Retrying exception caught (%s), ' + 'retrying request, (attempt %s / %s)', + e, + i + 1, + self._MAX_ATTEMPTS, + exc_info=True, + ) + last_exception = e + raise RetriesExceededError(last_exception) + + def _write_to_file(self, filename, offset, body): + with open(filename, 'rb+') as f: + f.seek(offset) + chunks = iter(lambda: body.read(self._IO_CHUNKSIZE), b'') + for chunk in chunks: + f.write(chunk) + + def _finalize_download(self, transfer_id, temp_filename, filename): + if self._transfer_monitor.get_exception(transfer_id): + self._osutil.remove_file(temp_filename) + else: + self._do_file_rename(transfer_id, temp_filename, filename) + self._transfer_monitor.notify_done(transfer_id) + + def _do_file_rename(self, transfer_id, temp_filename, filename): + try: + self._osutil.rename_file(temp_filename, filename) + except Exception as e: + self._transfer_monitor.notify_exception(transfer_id, e) + self._osutil.remove_file(temp_filename) diff --git a/.venv/lib/python3.12/site-packages/s3transfer/subscribers.py b/.venv/lib/python3.12/site-packages/s3transfer/subscribers.py new file mode 100644 index 00000000..fe773233 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/subscribers.py @@ -0,0 +1,94 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from functools import lru_cache + +from s3transfer.compat import accepts_kwargs +from s3transfer.exceptions import InvalidSubscriberMethodError + + +class BaseSubscriber: + """The base subscriber class + + It is recommended that all subscriber implementations subclass and then + override the subscription methods (i.e. on_{subsribe_type}() methods). + """ + + VALID_SUBSCRIBER_TYPES = ['queued', 'progress', 'done'] + + def __new__(cls, *args, **kwargs): + cls._validate_subscriber_methods() + return super().__new__(cls) + + @classmethod + @lru_cache + def _validate_subscriber_methods(cls): + for subscriber_type in cls.VALID_SUBSCRIBER_TYPES: + subscriber_method = getattr(cls, 'on_' + subscriber_type) + if not callable(subscriber_method): + raise InvalidSubscriberMethodError( + f'Subscriber method {subscriber_method} must be callable.' + ) + + if not accepts_kwargs(subscriber_method): + raise InvalidSubscriberMethodError( + f'Subscriber method {subscriber_method} must accept keyword ' + 'arguments (**kwargs)' + ) + + def on_queued(self, future, **kwargs): + """Callback to be invoked when transfer request gets queued + + This callback can be useful for: + + * Keeping track of how many transfers have been requested + * Providing the expected transfer size through + future.meta.provide_transfer_size() so a HeadObject would not + need to be made for copies and downloads. + + :type future: s3transfer.futures.TransferFuture + :param future: The TransferFuture representing the requested transfer. + """ + pass + + def on_progress(self, future, bytes_transferred, **kwargs): + """Callback to be invoked when progress is made on transfer + + This callback can be useful for: + + * Recording and displaying progress + + :type future: s3transfer.futures.TransferFuture + :param future: The TransferFuture representing the requested transfer. + + :type bytes_transferred: int + :param bytes_transferred: The number of bytes transferred for that + invocation of the callback. Note that a negative amount can be + provided, which usually indicates that an in-progress request + needed to be retried and thus progress was rewound. + """ + pass + + def on_done(self, future, **kwargs): + """Callback to be invoked once a transfer is done + + This callback can be useful for: + + * Recording and displaying whether the transfer succeeded or + failed using future.result() + * Running some task after the transfer completed like changing + the last modified time of a downloaded file. + + :type future: s3transfer.futures.TransferFuture + :param future: The TransferFuture representing the requested transfer. + """ + pass diff --git a/.venv/lib/python3.12/site-packages/s3transfer/tasks.py b/.venv/lib/python3.12/site-packages/s3transfer/tasks.py new file mode 100644 index 00000000..4ed3b88c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/tasks.py @@ -0,0 +1,390 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import copy +import logging + +from s3transfer.utils import get_callbacks + +try: + from botocore.context import start_as_current_context +except ImportError: + from contextlib import nullcontext as start_as_current_context + + +logger = logging.getLogger(__name__) + + +class Task: + """A task associated to a TransferFuture request + + This is a base class for other classes to subclass from. All subclassed + classes must implement the main() method. + """ + + def __init__( + self, + transfer_coordinator, + main_kwargs=None, + pending_main_kwargs=None, + done_callbacks=None, + is_final=False, + ): + """ + :type transfer_coordinator: s3transfer.futures.TransferCoordinator + :param transfer_coordinator: The context associated to the + TransferFuture for which this Task is associated with. + + :type main_kwargs: dict + :param main_kwargs: The keyword args that can be immediately supplied + to the _main() method of the task + + :type pending_main_kwargs: dict + :param pending_main_kwargs: The keyword args that are depended upon + by the result from a dependent future(s). The result returned by + the future(s) will be used as the value for the keyword argument + when _main() is called. The values for each key can be: + * a single future - Once completed, its value will be the + result of that single future + * a list of futures - Once all of the futures complete, the + value used will be a list of each completed future result + value in order of when they were originally supplied. + + :type done_callbacks: list of callbacks + :param done_callbacks: A list of callbacks to call once the task is + done completing. Each callback will be called with no arguments + and will be called no matter if the task succeeds or an exception + is raised. + + :type is_final: boolean + :param is_final: True, to indicate that this task is the final task + for the TransferFuture request. By setting this value to True, it + will set the result of the entire TransferFuture to the result + returned by this task's main() method. + """ + self._transfer_coordinator = transfer_coordinator + + self._main_kwargs = main_kwargs + if self._main_kwargs is None: + self._main_kwargs = {} + + self._pending_main_kwargs = pending_main_kwargs + if pending_main_kwargs is None: + self._pending_main_kwargs = {} + + self._done_callbacks = done_callbacks + if self._done_callbacks is None: + self._done_callbacks = [] + + self._is_final = is_final + + def __repr__(self): + # These are the general main_kwarg parameters that we want to + # display in the repr. + params_to_display = [ + 'bucket', + 'key', + 'part_number', + 'final_filename', + 'transfer_future', + 'offset', + 'extra_args', + ] + main_kwargs_to_display = self._get_kwargs_with_params_to_include( + self._main_kwargs, params_to_display + ) + return f'{self.__class__.__name__}(transfer_id={self._transfer_coordinator.transfer_id}, {main_kwargs_to_display})' + + @property + def transfer_id(self): + """The id for the transfer request that the task belongs to""" + return self._transfer_coordinator.transfer_id + + def _get_kwargs_with_params_to_include(self, kwargs, include): + filtered_kwargs = {} + for param in include: + if param in kwargs: + filtered_kwargs[param] = kwargs[param] + return filtered_kwargs + + def _get_kwargs_with_params_to_exclude(self, kwargs, exclude): + filtered_kwargs = {} + for param, value in kwargs.items(): + if param in exclude: + continue + filtered_kwargs[param] = value + return filtered_kwargs + + def __call__(self, ctx=None): + """The callable to use when submitting a Task to an executor""" + with start_as_current_context(ctx): + try: + # Wait for all of futures this task depends on. + self._wait_on_dependent_futures() + # Gather up all of the main keyword arguments for main(). + # This includes the immediately provided main_kwargs and + # the values for pending_main_kwargs that source from the return + # values from the task's dependent futures. + kwargs = self._get_all_main_kwargs() + # If the task is not done (really only if some other related + # task to the TransferFuture had failed) then execute the task's + # main() method. + if not self._transfer_coordinator.done(): + return self._execute_main(kwargs) + except Exception as e: + self._log_and_set_exception(e) + finally: + # Run any done callbacks associated to the task no matter what. + for done_callback in self._done_callbacks: + done_callback() + + if self._is_final: + # If this is the final task announce that it is done if results + # are waiting on its completion. + self._transfer_coordinator.announce_done() + + def _execute_main(self, kwargs): + # Do not display keyword args that should not be printed, especially + # if they are going to make the logs hard to follow. + params_to_exclude = ['data'] + kwargs_to_display = self._get_kwargs_with_params_to_exclude( + kwargs, params_to_exclude + ) + # Log what is about to be executed. + logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}") + + return_value = self._main(**kwargs) + # If the task is the final task, then set the TransferFuture's + # value to the return value from main(). + if self._is_final: + self._transfer_coordinator.set_result(return_value) + return return_value + + def _log_and_set_exception(self, exception): + # If an exception is ever thrown than set the exception for the + # entire TransferFuture. + logger.debug("Exception raised.", exc_info=True) + self._transfer_coordinator.set_exception(exception) + + def _main(self, **kwargs): + """The method that will be ran in the executor + + This method must be implemented by subclasses from Task. main() can + be implemented with any arguments decided upon by the subclass. + """ + raise NotImplementedError('_main() must be implemented') + + def _wait_on_dependent_futures(self): + # Gather all of the futures into that main() depends on. + futures_to_wait_on = [] + for _, future in self._pending_main_kwargs.items(): + # If the pending main keyword arg is a list then extend the list. + if isinstance(future, list): + futures_to_wait_on.extend(future) + # If the pending main keyword arg is a future append it to the list. + else: + futures_to_wait_on.append(future) + # Now wait for all of the futures to complete. + self._wait_until_all_complete(futures_to_wait_on) + + def _wait_until_all_complete(self, futures): + # This is a basic implementation of the concurrent.futures.wait() + # + # concurrent.futures.wait() is not used instead because of this + # reported issue: https://bugs.python.org/issue20319. + # The issue would occasionally cause multipart uploads to hang + # when wait() was called. With this approach, it avoids the + # concurrency bug by removing any association with concurrent.futures + # implementation of waiters. + logger.debug( + '%s about to wait for the following futures %s', self, futures + ) + for future in futures: + try: + logger.debug('%s about to wait for %s', self, future) + future.result() + except Exception: + # result() can also produce exceptions. We want to ignore + # these to be deferred to error handling down the road. + pass + logger.debug('%s done waiting for dependent futures', self) + + def _get_all_main_kwargs(self): + # Copy over all of the kwargs that we know is available. + kwargs = copy.copy(self._main_kwargs) + + # Iterate through the kwargs whose values are pending on the result + # of a future. + for key, pending_value in self._pending_main_kwargs.items(): + # If the value is a list of futures, iterate though the list + # appending on the result from each future. + if isinstance(pending_value, list): + result = [] + for future in pending_value: + result.append(future.result()) + # Otherwise if the pending_value is a future, just wait for it. + else: + result = pending_value.result() + # Add the retrieved value to the kwargs to be sent to the + # main() call. + kwargs[key] = result + return kwargs + + +class SubmissionTask(Task): + """A base class for any submission task + + Submission tasks are the top-level task used to submit a series of tasks + to execute a particular transfer. + """ + + def _main(self, transfer_future, **kwargs): + """ + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future associated with the + transfer request that tasks are being submitted for + + :param kwargs: Any additional kwargs that you may want to pass + to the _submit() method + """ + try: + self._transfer_coordinator.set_status_to_queued() + + # Before submitting any tasks, run all of the on_queued callbacks + on_queued_callbacks = get_callbacks(transfer_future, 'queued') + for on_queued_callback in on_queued_callbacks: + on_queued_callback() + + # Once callbacks have been ran set the status to running. + self._transfer_coordinator.set_status_to_running() + + # Call the submit method to start submitting tasks to execute the + # transfer. + self._submit(transfer_future=transfer_future, **kwargs) + except BaseException as e: + # If there was an exception raised during the submission of task + # there is a chance that the final task that signals if a transfer + # is done and too run the cleanup may never have been submitted in + # the first place so we need to account accordingly. + # + # Note that BaseException is caught, instead of Exception, because + # for some implementations of executors, specifically the serial + # implementation, the SubmissionTask is directly exposed to + # KeyboardInterupts and so needs to cleanup and signal done + # for those as well. + + # Set the exception, that caused the process to fail. + self._log_and_set_exception(e) + + # Wait for all possibly associated futures that may have spawned + # from this submission task have finished before we announce the + # transfer done. + self._wait_for_all_submitted_futures_to_complete() + + # Announce the transfer as done, which will run any cleanups + # and done callbacks as well. + self._transfer_coordinator.announce_done() + + def _submit(self, transfer_future, **kwargs): + """The submission method to be implemented + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future associated with the + transfer request that tasks are being submitted for + + :param kwargs: Any additional keyword arguments you want to be passed + in + """ + raise NotImplementedError('_submit() must be implemented') + + def _wait_for_all_submitted_futures_to_complete(self): + # We want to wait for all futures that were submitted to + # complete as we do not want the cleanup callbacks or done callbacks + # to be called to early. The main problem is any task that was + # submitted may have submitted even more during its process and so + # we need to account accordingly. + + # First get all of the futures that were submitted up to this point. + submitted_futures = self._transfer_coordinator.associated_futures + while submitted_futures: + # Wait for those futures to complete. + self._wait_until_all_complete(submitted_futures) + # However, more futures may have been submitted as we waited so + # we need to check again for any more associated futures. + possibly_more_submitted_futures = ( + self._transfer_coordinator.associated_futures + ) + # If the current list of submitted futures is equal to the + # the list of associated futures for when after the wait completes, + # we can ensure no more futures were submitted in waiting on + # the current list of futures to complete ultimately meaning all + # futures that may have spawned from the original submission task + # have completed. + if submitted_futures == possibly_more_submitted_futures: + break + submitted_futures = possibly_more_submitted_futures + + +class CreateMultipartUploadTask(Task): + """Task to initiate a multipart upload""" + + def _main(self, client, bucket, key, extra_args): + """ + :param client: The client to use when calling CreateMultipartUpload + :param bucket: The name of the bucket to upload to + :param key: The name of the key to upload to + :param extra_args: A dictionary of any extra arguments that may be + used in the initialization. + + :returns: The upload id of the multipart upload + """ + # Create the multipart upload. + response = client.create_multipart_upload( + Bucket=bucket, Key=key, **extra_args + ) + upload_id = response['UploadId'] + + # Add a cleanup if the multipart upload fails at any point. + self._transfer_coordinator.add_failure_cleanup( + client.abort_multipart_upload, + Bucket=bucket, + Key=key, + UploadId=upload_id, + ) + return upload_id + + +class CompleteMultipartUploadTask(Task): + """Task to complete a multipart upload""" + + def _main(self, client, bucket, key, upload_id, parts, extra_args): + """ + :param client: The client to use when calling CompleteMultipartUpload + :param bucket: The name of the bucket to upload to + :param key: The name of the key to upload to + :param upload_id: The id of the upload + :param parts: A list of parts to use to complete the multipart upload:: + + [{'Etag': etag_value, 'PartNumber': part_number}, ...] + + Each element in the list consists of a return value from + ``UploadPartTask.main()``. + :param extra_args: A dictionary of any extra arguments that may be + used in completing the multipart transfer. + """ + client.complete_multipart_upload( + Bucket=bucket, + Key=key, + UploadId=upload_id, + MultipartUpload={'Parts': parts}, + **extra_args, + ) diff --git a/.venv/lib/python3.12/site-packages/s3transfer/upload.py b/.venv/lib/python3.12/site-packages/s3transfer/upload.py new file mode 100644 index 00000000..a3db8f89 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/upload.py @@ -0,0 +1,840 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import math +from io import BytesIO + +from s3transfer.compat import readable, seekable +from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS +from s3transfer.futures import IN_MEMORY_UPLOAD_TAG +from s3transfer.tasks import ( + CompleteMultipartUploadTask, + CreateMultipartUploadTask, + SubmissionTask, + Task, +) +from s3transfer.utils import ( + ChunksizeAdjuster, + DeferredOpenFile, + get_callbacks, + get_filtered_dict, +) + + +class AggregatedProgressCallback: + def __init__(self, callbacks, threshold=1024 * 256): + """Aggregates progress updates for every provided progress callback + + :type callbacks: A list of functions that accepts bytes_transferred + as a single argument + :param callbacks: The callbacks to invoke when threshold is reached + + :type threshold: int + :param threshold: The progress threshold in which to take the + aggregated progress and invoke the progress callback with that + aggregated progress total + """ + self._callbacks = callbacks + self._threshold = threshold + self._bytes_seen = 0 + + def __call__(self, bytes_transferred): + self._bytes_seen += bytes_transferred + if self._bytes_seen >= self._threshold: + self._trigger_callbacks() + + def flush(self): + """Flushes out any progress that has not been sent to its callbacks""" + if self._bytes_seen > 0: + self._trigger_callbacks() + + def _trigger_callbacks(self): + for callback in self._callbacks: + callback(bytes_transferred=self._bytes_seen) + self._bytes_seen = 0 + + +class InterruptReader: + """Wrapper that can interrupt reading using an error + + It uses a transfer coordinator to propagate an error if it notices + that a read is being made while the file is being read from. + + :type fileobj: file-like obj + :param fileobj: The file-like object to read from + + :type transfer_coordinator: s3transfer.futures.TransferCoordinator + :param transfer_coordinator: The transfer coordinator to use if the + reader needs to be interrupted. + """ + + def __init__(self, fileobj, transfer_coordinator): + self._fileobj = fileobj + self._transfer_coordinator = transfer_coordinator + + def read(self, amount=None): + # If there is an exception, then raise the exception. + # We raise an error instead of returning no bytes because for + # requests where the content length and md5 was sent, it will + # cause md5 mismatches and retries as there was no indication that + # the stream being read from encountered any issues. + if self._transfer_coordinator.exception: + raise self._transfer_coordinator.exception + return self._fileobj.read(amount) + + def seek(self, where, whence=0): + self._fileobj.seek(where, whence) + + def tell(self): + return self._fileobj.tell() + + def close(self): + self._fileobj.close() + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + +class UploadInputManager: + """Base manager class for handling various types of files for uploads + + This class is typically used for the UploadSubmissionTask class to help + determine the following: + + * How to determine the size of the file + * How to determine if a multipart upload is required + * How to retrieve the body for a PutObject + * How to retrieve the bodies for a set of UploadParts + + The answers/implementations differ for the various types of file inputs + that may be accepted. All implementations must subclass and override + public methods from this class. + """ + + def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None): + self._osutil = osutil + self._transfer_coordinator = transfer_coordinator + self._bandwidth_limiter = bandwidth_limiter + + @classmethod + def is_compatible(cls, upload_source): + """Determines if the source for the upload is compatible with manager + + :param upload_source: The source for which the upload will pull data + from. + + :returns: True if the manager can handle the type of source specified + otherwise returns False. + """ + raise NotImplementedError('must implement _is_compatible()') + + def stores_body_in_memory(self, operation_name): + """Whether the body it provides are stored in-memory + + :type operation_name: str + :param operation_name: The name of the client operation that the body + is being used for. Valid operation_names are ``put_object`` and + ``upload_part``. + + :rtype: boolean + :returns: True if the body returned by the manager will be stored in + memory. False if the manager will not directly store the body in + memory. + """ + raise NotImplementedError('must implement store_body_in_memory()') + + def provide_transfer_size(self, transfer_future): + """Provides the transfer size of an upload + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The future associated with upload request + """ + raise NotImplementedError('must implement provide_transfer_size()') + + def requires_multipart_upload(self, transfer_future, config): + """Determines where a multipart upload is required + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The future associated with upload request + + :type config: s3transfer.manager.TransferConfig + :param config: The config associated to the transfer manager + + :rtype: boolean + :returns: True, if the upload should be multipart based on + configuration and size. False, otherwise. + """ + raise NotImplementedError('must implement requires_multipart_upload()') + + def get_put_object_body(self, transfer_future): + """Returns the body to use for PutObject + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The future associated with upload request + + :type config: s3transfer.manager.TransferConfig + :param config: The config associated to the transfer manager + + :rtype: s3transfer.utils.ReadFileChunk + :returns: A ReadFileChunk including all progress callbacks + associated with the transfer future. + """ + raise NotImplementedError('must implement get_put_object_body()') + + def yield_upload_part_bodies(self, transfer_future, chunksize): + """Yields the part number and body to use for each UploadPart + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The future associated with upload request + + :type chunksize: int + :param chunksize: The chunksize to use for this upload. + + :rtype: int, s3transfer.utils.ReadFileChunk + :returns: Yields the part number and the ReadFileChunk including all + progress callbacks associated with the transfer future for that + specific yielded part. + """ + raise NotImplementedError('must implement yield_upload_part_bodies()') + + def _wrap_fileobj(self, fileobj): + fileobj = InterruptReader(fileobj, self._transfer_coordinator) + if self._bandwidth_limiter: + fileobj = self._bandwidth_limiter.get_bandwith_limited_stream( + fileobj, self._transfer_coordinator, enabled=False + ) + return fileobj + + def _get_progress_callbacks(self, transfer_future): + callbacks = get_callbacks(transfer_future, 'progress') + # We only want to be wrapping the callbacks if there are callbacks to + # invoke because we do not want to be doing any unnecessary work if + # there are no callbacks to invoke. + if callbacks: + return [AggregatedProgressCallback(callbacks)] + return [] + + def _get_close_callbacks(self, aggregated_progress_callbacks): + return [callback.flush for callback in aggregated_progress_callbacks] + + +class UploadFilenameInputManager(UploadInputManager): + """Upload utility for filenames""" + + @classmethod + def is_compatible(cls, upload_source): + return isinstance(upload_source, str) + + def stores_body_in_memory(self, operation_name): + return False + + def provide_transfer_size(self, transfer_future): + transfer_future.meta.provide_transfer_size( + self._osutil.get_file_size(transfer_future.meta.call_args.fileobj) + ) + + def requires_multipart_upload(self, transfer_future, config): + return transfer_future.meta.size >= config.multipart_threshold + + def get_put_object_body(self, transfer_future): + # Get a file-like object for the given input + fileobj, full_size = self._get_put_object_fileobj_with_full_size( + transfer_future + ) + + # Wrap fileobj with interrupt reader that will quickly cancel + # uploads if needed instead of having to wait for the socket + # to completely read all of the data. + fileobj = self._wrap_fileobj(fileobj) + + callbacks = self._get_progress_callbacks(transfer_future) + close_callbacks = self._get_close_callbacks(callbacks) + size = transfer_future.meta.size + # Return the file-like object wrapped into a ReadFileChunk to get + # progress. + return self._osutil.open_file_chunk_reader_from_fileobj( + fileobj=fileobj, + chunk_size=size, + full_file_size=full_size, + callbacks=callbacks, + close_callbacks=close_callbacks, + ) + + def yield_upload_part_bodies(self, transfer_future, chunksize): + full_file_size = transfer_future.meta.size + num_parts = self._get_num_parts(transfer_future, chunksize) + for part_number in range(1, num_parts + 1): + callbacks = self._get_progress_callbacks(transfer_future) + close_callbacks = self._get_close_callbacks(callbacks) + start_byte = chunksize * (part_number - 1) + # Get a file-like object for that part and the size of the full + # file size for the associated file-like object for that part. + fileobj, full_size = self._get_upload_part_fileobj_with_full_size( + transfer_future.meta.call_args.fileobj, + start_byte=start_byte, + part_size=chunksize, + full_file_size=full_file_size, + ) + + # Wrap fileobj with interrupt reader that will quickly cancel + # uploads if needed instead of having to wait for the socket + # to completely read all of the data. + fileobj = self._wrap_fileobj(fileobj) + + # Wrap the file-like object into a ReadFileChunk to get progress. + read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj( + fileobj=fileobj, + chunk_size=chunksize, + full_file_size=full_size, + callbacks=callbacks, + close_callbacks=close_callbacks, + ) + yield part_number, read_file_chunk + + def _get_deferred_open_file(self, fileobj, start_byte): + fileobj = DeferredOpenFile( + fileobj, start_byte, open_function=self._osutil.open + ) + return fileobj + + def _get_put_object_fileobj_with_full_size(self, transfer_future): + fileobj = transfer_future.meta.call_args.fileobj + size = transfer_future.meta.size + return self._get_deferred_open_file(fileobj, 0), size + + def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs): + start_byte = kwargs['start_byte'] + full_size = kwargs['full_file_size'] + return self._get_deferred_open_file(fileobj, start_byte), full_size + + def _get_num_parts(self, transfer_future, part_size): + return int(math.ceil(transfer_future.meta.size / float(part_size))) + + +class UploadSeekableInputManager(UploadFilenameInputManager): + """Upload utility for an open file object""" + + @classmethod + def is_compatible(cls, upload_source): + return readable(upload_source) and seekable(upload_source) + + def stores_body_in_memory(self, operation_name): + if operation_name == 'put_object': + return False + else: + return True + + def provide_transfer_size(self, transfer_future): + fileobj = transfer_future.meta.call_args.fileobj + # To determine size, first determine the starting position + # Seek to the end and then find the difference in the length + # between the end and start positions. + start_position = fileobj.tell() + fileobj.seek(0, 2) + end_position = fileobj.tell() + fileobj.seek(start_position) + transfer_future.meta.provide_transfer_size( + end_position - start_position + ) + + def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs): + # Note: It is unfortunate that in order to do a multithreaded + # multipart upload we cannot simply copy the filelike object + # since there is not really a mechanism in python (i.e. os.dup + # points to the same OS filehandle which causes concurrency + # issues). So instead we need to read from the fileobj and + # chunk the data out to separate file-like objects in memory. + data = fileobj.read(kwargs['part_size']) + # We return the length of the data instead of the full_file_size + # because we partitioned the data into separate BytesIO objects + # meaning the BytesIO object has no knowledge of its start position + # relative the input source nor access to the rest of the input + # source. So we must treat it as its own standalone file. + return BytesIO(data), len(data) + + def _get_put_object_fileobj_with_full_size(self, transfer_future): + fileobj = transfer_future.meta.call_args.fileobj + # The current position needs to be taken into account when retrieving + # the full size of the file. + size = fileobj.tell() + transfer_future.meta.size + return fileobj, size + + +class UploadNonSeekableInputManager(UploadInputManager): + """Upload utility for a file-like object that cannot seek.""" + + def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None): + super().__init__(osutil, transfer_coordinator, bandwidth_limiter) + self._initial_data = b'' + + @classmethod + def is_compatible(cls, upload_source): + return readable(upload_source) + + def stores_body_in_memory(self, operation_name): + return True + + def provide_transfer_size(self, transfer_future): + # No-op because there is no way to do this short of reading the entire + # body into memory. + return + + def requires_multipart_upload(self, transfer_future, config): + # If the user has set the size, we can use that. + if transfer_future.meta.size is not None: + return transfer_future.meta.size >= config.multipart_threshold + + # This is tricky to determine in this case because we can't know how + # large the input is. So to figure it out, we read data into memory + # up until the threshold and compare how much data was actually read + # against the threshold. + fileobj = transfer_future.meta.call_args.fileobj + threshold = config.multipart_threshold + self._initial_data = self._read(fileobj, threshold, False) + if len(self._initial_data) < threshold: + return False + else: + return True + + def get_put_object_body(self, transfer_future): + callbacks = self._get_progress_callbacks(transfer_future) + close_callbacks = self._get_close_callbacks(callbacks) + fileobj = transfer_future.meta.call_args.fileobj + + body = self._wrap_data( + self._initial_data + fileobj.read(), callbacks, close_callbacks + ) + + # Zero out the stored data so we don't have additional copies + # hanging around in memory. + self._initial_data = None + return body + + def yield_upload_part_bodies(self, transfer_future, chunksize): + file_object = transfer_future.meta.call_args.fileobj + part_number = 0 + + # Continue reading parts from the file-like object until it is empty. + while True: + callbacks = self._get_progress_callbacks(transfer_future) + close_callbacks = self._get_close_callbacks(callbacks) + part_number += 1 + part_content = self._read(file_object, chunksize) + if not part_content: + break + part_object = self._wrap_data( + part_content, callbacks, close_callbacks + ) + + # Zero out part_content to avoid hanging on to additional data. + part_content = None + yield part_number, part_object + + def _read(self, fileobj, amount, truncate=True): + """ + Reads a specific amount of data from a stream and returns it. If there + is any data in initial_data, that will be popped out first. + + :type fileobj: A file-like object that implements read + :param fileobj: The stream to read from. + + :type amount: int + :param amount: The number of bytes to read from the stream. + + :type truncate: bool + :param truncate: Whether or not to truncate initial_data after + reading from it. + + :return: Generator which generates part bodies from the initial data. + """ + # If the the initial data is empty, we simply read from the fileobj + if len(self._initial_data) == 0: + return fileobj.read(amount) + + # If the requested number of bytes is less than the amount of + # initial data, pull entirely from initial data. + if amount <= len(self._initial_data): + data = self._initial_data[:amount] + # Truncate initial data so we don't hang onto the data longer + # than we need. + if truncate: + self._initial_data = self._initial_data[amount:] + return data + + # At this point there is some initial data left, but not enough to + # satisfy the number of bytes requested. Pull out the remaining + # initial data and read the rest from the fileobj. + amount_to_read = amount - len(self._initial_data) + data = self._initial_data + fileobj.read(amount_to_read) + + # Zero out initial data so we don't hang onto the data any more. + if truncate: + self._initial_data = b'' + return data + + def _wrap_data(self, data, callbacks, close_callbacks): + """ + Wraps data with the interrupt reader and the file chunk reader. + + :type data: bytes + :param data: The data to wrap. + + :type callbacks: list + :param callbacks: The callbacks associated with the transfer future. + + :type close_callbacks: list + :param close_callbacks: The callbacks to be called when closing the + wrapper for the data. + + :return: Fully wrapped data. + """ + fileobj = self._wrap_fileobj(BytesIO(data)) + return self._osutil.open_file_chunk_reader_from_fileobj( + fileobj=fileobj, + chunk_size=len(data), + full_file_size=len(data), + callbacks=callbacks, + close_callbacks=close_callbacks, + ) + + +class UploadSubmissionTask(SubmissionTask): + """Task for submitting tasks to execute an upload""" + + PUT_OBJECT_BLOCKLIST = ["ChecksumType", "MpuObjectSize"] + + CREATE_MULTIPART_BLOCKLIST = FULL_OBJECT_CHECKSUM_ARGS + ["MpuObjectSize"] + + UPLOAD_PART_ARGS = [ + 'ChecksumAlgorithm', + 'SSECustomerKey', + 'SSECustomerAlgorithm', + 'SSECustomerKeyMD5', + 'RequestPayer', + 'ExpectedBucketOwner', + ] + + COMPLETE_MULTIPART_ARGS = [ + 'SSECustomerKey', + 'SSECustomerAlgorithm', + 'SSECustomerKeyMD5', + 'RequestPayer', + 'ExpectedBucketOwner', + 'ChecksumType', + 'MpuObjectSize', + ] + FULL_OBJECT_CHECKSUM_ARGS + + def _get_upload_input_manager_cls(self, transfer_future): + """Retrieves a class for managing input for an upload based on file type + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future for the request + + :rtype: class of UploadInputManager + :returns: The appropriate class to use for managing a specific type of + input for uploads. + """ + upload_manager_resolver_chain = [ + UploadFilenameInputManager, + UploadSeekableInputManager, + UploadNonSeekableInputManager, + ] + + fileobj = transfer_future.meta.call_args.fileobj + for upload_manager_cls in upload_manager_resolver_chain: + if upload_manager_cls.is_compatible(fileobj): + return upload_manager_cls + raise RuntimeError( + f'Input {fileobj} of type: {type(fileobj)} is not supported.' + ) + + def _submit( + self, + client, + config, + osutil, + request_executor, + transfer_future, + bandwidth_limiter=None, + ): + """ + :param client: The client associated with the transfer manager + + :type config: s3transfer.manager.TransferConfig + :param config: The transfer config associated with the transfer + manager + + :type osutil: s3transfer.utils.OSUtil + :param osutil: The os utility associated to the transfer manager + + :type request_executor: s3transfer.futures.BoundedExecutor + :param request_executor: The request executor associated with the + transfer manager + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future associated with the + transfer request that tasks are being submitted for + """ + upload_input_manager = self._get_upload_input_manager_cls( + transfer_future + )(osutil, self._transfer_coordinator, bandwidth_limiter) + + # Determine the size if it was not provided + if transfer_future.meta.size is None: + upload_input_manager.provide_transfer_size(transfer_future) + + # Do a multipart upload if needed, otherwise do a regular put object. + if not upload_input_manager.requires_multipart_upload( + transfer_future, config + ): + self._submit_upload_request( + client, + config, + osutil, + request_executor, + transfer_future, + upload_input_manager, + ) + else: + self._submit_multipart_request( + client, + config, + osutil, + request_executor, + transfer_future, + upload_input_manager, + ) + + def _submit_upload_request( + self, + client, + config, + osutil, + request_executor, + transfer_future, + upload_input_manager, + ): + call_args = transfer_future.meta.call_args + + put_object_extra_args = self._extra_put_object_args( + call_args.extra_args + ) + + # Get any tags that need to be associated to the put object task + put_object_tag = self._get_upload_task_tag( + upload_input_manager, 'put_object' + ) + + # Submit the request of a single upload. + self._transfer_coordinator.submit( + request_executor, + PutObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'fileobj': upload_input_manager.get_put_object_body( + transfer_future + ), + 'bucket': call_args.bucket, + 'key': call_args.key, + 'extra_args': put_object_extra_args, + }, + is_final=True, + ), + tag=put_object_tag, + ) + + def _submit_multipart_request( + self, + client, + config, + osutil, + request_executor, + transfer_future, + upload_input_manager, + ): + call_args = transfer_future.meta.call_args + + # When a user provided checksum is passed, set "ChecksumType" to "FULL_OBJECT" + # and "ChecksumAlgorithm" to the related algorithm. + for checksum in FULL_OBJECT_CHECKSUM_ARGS: + if checksum in call_args.extra_args: + call_args.extra_args["ChecksumType"] = "FULL_OBJECT" + call_args.extra_args["ChecksumAlgorithm"] = checksum.replace( + "Checksum", "" + ) + + create_multipart_extra_args = self._extra_create_multipart_args( + call_args.extra_args + ) + + # Submit the request to create a multipart upload. + create_multipart_future = self._transfer_coordinator.submit( + request_executor, + CreateMultipartUploadTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'extra_args': create_multipart_extra_args, + }, + ), + ) + + # Submit requests to upload the parts of the file. + part_futures = [] + extra_part_args = self._extra_upload_part_args(call_args.extra_args) + + # Get any tags that need to be associated to the submitted task + # for upload the data + upload_part_tag = self._get_upload_task_tag( + upload_input_manager, 'upload_part' + ) + + size = transfer_future.meta.size + adjuster = ChunksizeAdjuster() + chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size) + part_iterator = upload_input_manager.yield_upload_part_bodies( + transfer_future, chunksize + ) + + for part_number, fileobj in part_iterator: + part_futures.append( + self._transfer_coordinator.submit( + request_executor, + UploadPartTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'fileobj': fileobj, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'part_number': part_number, + 'extra_args': extra_part_args, + }, + pending_main_kwargs={ + 'upload_id': create_multipart_future + }, + ), + tag=upload_part_tag, + ) + ) + + complete_multipart_extra_args = self._extra_complete_multipart_args( + call_args.extra_args + ) + # Submit the request to complete the multipart upload. + self._transfer_coordinator.submit( + request_executor, + CompleteMultipartUploadTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'extra_args': complete_multipart_extra_args, + }, + pending_main_kwargs={ + 'upload_id': create_multipart_future, + 'parts': part_futures, + }, + is_final=True, + ), + ) + + def _extra_upload_part_args(self, extra_args): + # Only the args in UPLOAD_PART_ARGS actually need to be passed + # onto the upload_part calls. + return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS) + + def _extra_complete_multipart_args(self, extra_args): + return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS) + + def _extra_create_multipart_args(self, extra_args): + return get_filtered_dict( + extra_args, blocklisted_keys=self.CREATE_MULTIPART_BLOCKLIST + ) + + def _extra_put_object_args(self, extra_args): + return get_filtered_dict( + extra_args, blocklisted_keys=self.PUT_OBJECT_BLOCKLIST + ) + + def _get_upload_task_tag(self, upload_input_manager, operation_name): + tag = None + if upload_input_manager.stores_body_in_memory(operation_name): + tag = IN_MEMORY_UPLOAD_TAG + return tag + + +class PutObjectTask(Task): + """Task to do a nonmultipart upload""" + + def _main(self, client, fileobj, bucket, key, extra_args): + """ + :param client: The client to use when calling PutObject + :param fileobj: The file to upload. + :param bucket: The name of the bucket to upload to + :param key: The name of the key to upload to + :param extra_args: A dictionary of any extra arguments that may be + used in the upload. + """ + with fileobj as body: + client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args) + + +class UploadPartTask(Task): + """Task to upload a part in a multipart upload""" + + def _main( + self, client, fileobj, bucket, key, upload_id, part_number, extra_args + ): + """ + :param client: The client to use when calling PutObject + :param fileobj: The file to upload. + :param bucket: The name of the bucket to upload to + :param key: The name of the key to upload to + :param upload_id: The id of the upload + :param part_number: The number representing the part of the multipart + upload + :param extra_args: A dictionary of any extra arguments that may be + used in the upload. + + :rtype: dict + :returns: A dictionary representing a part:: + + {'Etag': etag_value, 'PartNumber': part_number} + + This value can be appended to a list to be used to complete + the multipart upload. + """ + with fileobj as body: + response = client.upload_part( + Bucket=bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + Body=body, + **extra_args, + ) + etag = response['ETag'] + part_metadata = {'ETag': etag, 'PartNumber': part_number} + if 'ChecksumAlgorithm' in extra_args: + algorithm_name = extra_args['ChecksumAlgorithm'].upper() + checksum_member = f'Checksum{algorithm_name}' + if checksum_member in response: + part_metadata[checksum_member] = response[checksum_member] + return part_metadata diff --git a/.venv/lib/python3.12/site-packages/s3transfer/utils.py b/.venv/lib/python3.12/site-packages/s3transfer/utils.py new file mode 100644 index 00000000..f2c4e06c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/s3transfer/utils.py @@ -0,0 +1,833 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import functools +import logging +import math +import os +import random +import socket +import stat +import string +import threading +from collections import defaultdict + +from botocore.exceptions import ( + IncompleteReadError, + ReadTimeoutError, + ResponseStreamingError, +) +from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper +from botocore.utils import is_s3express_bucket + +from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file +from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS + +MAX_PARTS = 10000 +# The maximum file size you can upload via S3 per request. +# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html +# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html +MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3) +MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2) +logger = logging.getLogger(__name__) + + +S3_RETRYABLE_DOWNLOAD_ERRORS = ( + socket.timeout, + SOCKET_ERROR, + ReadTimeoutError, + IncompleteReadError, + ResponseStreamingError, +) + + +def random_file_extension(num_digits=8): + return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) + + +def signal_not_transferring(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart'] and hasattr( + request.body, 'signal_not_transferring' + ): + request.body.signal_not_transferring() + + +def signal_transferring(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart']: + body = request.body + if isinstance(body, AwsChunkedWrapper): + body = getattr(body, '_raw', None) + if hasattr(body, 'signal_transferring'): + body.signal_transferring() + + +def calculate_num_parts(size, part_size): + return int(math.ceil(size / float(part_size))) + + +def calculate_range_parameter( + part_size, part_index, num_parts, total_size=None +): + """Calculate the range parameter for multipart downloads/copies + + :type part_size: int + :param part_size: The size of the part + + :type part_index: int + :param part_index: The index for which this parts starts. This index starts + at zero + + :type num_parts: int + :param num_parts: The total number of parts in the transfer + + :returns: The value to use for Range parameter on downloads or + the CopySourceRange parameter for copies + """ + # Used to calculate the Range parameter + start_range = part_index * part_size + if part_index == num_parts - 1: + end_range = '' + if total_size is not None: + end_range = str(total_size - 1) + else: + end_range = start_range + part_size - 1 + range_param = f'bytes={start_range}-{end_range}' + return range_param + + +def get_callbacks(transfer_future, callback_type): + """Retrieves callbacks from a subscriber + + :type transfer_future: s3transfer.futures.TransferFuture + :param transfer_future: The transfer future the subscriber is associated + to. + + :type callback_type: str + :param callback_type: The type of callback to retrieve from the subscriber. + Valid types include: + * 'queued' + * 'progress' + * 'done' + + :returns: A list of callbacks for the type specified. All callbacks are + preinjected with the transfer future. + """ + callbacks = [] + for subscriber in transfer_future.meta.call_args.subscribers: + callback_name = 'on_' + callback_type + if hasattr(subscriber, callback_name): + callbacks.append( + functools.partial( + getattr(subscriber, callback_name), future=transfer_future + ) + ) + return callbacks + + +def invoke_progress_callbacks(callbacks, bytes_transferred): + """Calls all progress callbacks + + :param callbacks: A list of progress callbacks to invoke + :param bytes_transferred: The number of bytes transferred. This is passed + to the callbacks. If no bytes were transferred the callbacks will not + be invoked because no progress was achieved. It is also possible + to receive a negative amount which comes from retrying a transfer + request. + """ + # Only invoke the callbacks if bytes were actually transferred. + if bytes_transferred: + for callback in callbacks: + callback(bytes_transferred=bytes_transferred) + + +def get_filtered_dict( + original_dict, whitelisted_keys=None, blocklisted_keys=None +): + """Gets a dictionary filtered by whitelisted and blocklisted keys. + + :param original_dict: The original dictionary of arguments to source keys + and values. + :param whitelisted_key: A list of keys to include in the filtered + dictionary. + :param blocklisted_key: A list of keys to exclude in the filtered + dictionary. + + :returns: A dictionary containing key/values from the original dictionary + whose key was included in the whitelist and/or not included in the + blocklist. + """ + filtered_dict = {} + for key, value in original_dict.items(): + if (whitelisted_keys and key in whitelisted_keys) or ( + blocklisted_keys and key not in blocklisted_keys + ): + filtered_dict[key] = value + return filtered_dict + + +class CallArgs: + def __init__(self, **kwargs): + """A class that records call arguments + + The call arguments must be passed as keyword arguments. It will set + each keyword argument as an attribute of the object along with its + associated value. + """ + for arg, value in kwargs.items(): + setattr(self, arg, value) + + +class FunctionContainer: + """An object that contains a function and any args or kwargs to call it + + When called the provided function will be called with provided args + and kwargs. + """ + + def __init__(self, func, *args, **kwargs): + self._func = func + self._args = args + self._kwargs = kwargs + + def __repr__(self): + return f'Function: {self._func} with args {self._args} and kwargs {self._kwargs}' + + def __call__(self): + return self._func(*self._args, **self._kwargs) + + +class CountCallbackInvoker: + """An abstraction to invoke a callback when a shared count reaches zero + + :param callback: Callback invoke when finalized count reaches zero + """ + + def __init__(self, callback): + self._lock = threading.Lock() + self._callback = callback + self._count = 0 + self._is_finalized = False + + @property + def current_count(self): + with self._lock: + return self._count + + def increment(self): + """Increment the count by one""" + with self._lock: + if self._is_finalized: + raise RuntimeError( + 'Counter has been finalized it can no longer be ' + 'incremented.' + ) + self._count += 1 + + def decrement(self): + """Decrement the count by one""" + with self._lock: + if self._count == 0: + raise RuntimeError( + 'Counter is at zero. It cannot dip below zero' + ) + self._count -= 1 + if self._is_finalized and self._count == 0: + self._callback() + + def finalize(self): + """Finalize the counter + + Once finalized, the counter never be incremented and the callback + can be invoked once the count reaches zero + """ + with self._lock: + self._is_finalized = True + if self._count == 0: + self._callback() + + +class OSUtils: + _MAX_FILENAME_LEN = 255 + + def get_file_size(self, filename): + return os.path.getsize(filename) + + def open_file_chunk_reader(self, filename, start_byte, size, callbacks): + return ReadFileChunk.from_filename( + filename, start_byte, size, callbacks, enable_callbacks=False + ) + + def open_file_chunk_reader_from_fileobj( + self, + fileobj, + chunk_size, + full_file_size, + callbacks, + close_callbacks=None, + ): + return ReadFileChunk( + fileobj, + chunk_size, + full_file_size, + callbacks=callbacks, + enable_callbacks=False, + close_callbacks=close_callbacks, + ) + + def open(self, filename, mode): + return open(filename, mode) + + def remove_file(self, filename): + """Remove a file, noop if file does not exist.""" + # Unlike os.remove, if the file does not exist, + # then this method does nothing. + try: + os.remove(filename) + except OSError: + pass + + def rename_file(self, current_filename, new_filename): + rename_file(current_filename, new_filename) + + def is_special_file(cls, filename): + """Checks to see if a file is a special UNIX file. + + It checks if the file is a character special device, block special + device, FIFO, or socket. + + :param filename: Name of the file + + :returns: True if the file is a special file. False, if is not. + """ + # If it does not exist, it must be a new file so it cannot be + # a special file. + if not os.path.exists(filename): + return False + mode = os.stat(filename).st_mode + # Character special device. + if stat.S_ISCHR(mode): + return True + # Block special device + if stat.S_ISBLK(mode): + return True + # Named pipe / FIFO + if stat.S_ISFIFO(mode): + return True + # Socket. + if stat.S_ISSOCK(mode): + return True + return False + + def get_temp_filename(self, filename): + suffix = os.extsep + random_file_extension() + path = os.path.dirname(filename) + name = os.path.basename(filename) + temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix + return os.path.join(path, temp_filename) + + def allocate(self, filename, size): + try: + with self.open(filename, 'wb') as f: + fallocate(f, size) + except OSError: + self.remove_file(filename) + raise + + +class DeferredOpenFile: + def __init__(self, filename, start_byte=0, mode='rb', open_function=open): + """A class that defers the opening of a file till needed + + This is useful for deferring opening of a file till it is needed + in a separate thread, as there is a limit of how many open files + there can be in a single thread for most operating systems. The + file gets opened in the following methods: ``read()``, ``seek()``, + and ``__enter__()`` + + :type filename: str + :param filename: The name of the file to open + + :type start_byte: int + :param start_byte: The byte to seek to when the file is opened. + + :type mode: str + :param mode: The mode to use to open the file + + :type open_function: function + :param open_function: The function to use to open the file + """ + self._filename = filename + self._fileobj = None + self._start_byte = start_byte + self._mode = mode + self._open_function = open_function + + def _open_if_needed(self): + if self._fileobj is None: + self._fileobj = self._open_function(self._filename, self._mode) + if self._start_byte != 0: + self._fileobj.seek(self._start_byte) + + @property + def name(self): + return self._filename + + def read(self, amount=None): + self._open_if_needed() + return self._fileobj.read(amount) + + def write(self, data): + self._open_if_needed() + self._fileobj.write(data) + + def seek(self, where, whence=0): + self._open_if_needed() + self._fileobj.seek(where, whence) + + def tell(self): + if self._fileobj is None: + return self._start_byte + return self._fileobj.tell() + + def close(self): + if self._fileobj: + self._fileobj.close() + + def __enter__(self): + self._open_if_needed() + return self + + def __exit__(self, *args, **kwargs): + self.close() + + +class ReadFileChunk: + def __init__( + self, + fileobj, + chunk_size, + full_file_size, + callbacks=None, + enable_callbacks=True, + close_callbacks=None, + ): + """ + + Given a file object shown below:: + + |___________________________________________________| + 0 | | full_file_size + |----chunk_size---| + f.tell() + + :type fileobj: file + :param fileobj: File like object + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callbacks: A list of function(amount_read) + :param callbacks: Called whenever data is read from this object in the + order provided. + + :type enable_callbacks: boolean + :param enable_callbacks: True if to run callbacks. Otherwise, do not + run callbacks + + :type close_callbacks: A list of function() + :param close_callbacks: Called when close is called. The function + should take no arguments. + """ + self._fileobj = fileobj + self._start_byte = self._fileobj.tell() + self._size = self._calculate_file_size( + self._fileobj, + requested_size=chunk_size, + start_byte=self._start_byte, + actual_file_size=full_file_size, + ) + # _amount_read represents the position in the chunk and may exceed + # the chunk size, but won't allow reads out of bounds. + self._amount_read = 0 + self._callbacks = callbacks + if callbacks is None: + self._callbacks = [] + self._callbacks_enabled = enable_callbacks + self._close_callbacks = close_callbacks + if close_callbacks is None: + self._close_callbacks = close_callbacks + + @classmethod + def from_filename( + cls, + filename, + start_byte, + chunk_size, + callbacks=None, + enable_callbacks=True, + ): + """Convenience factory function to create from a filename. + + :type start_byte: int + :param start_byte: The first byte from which to start reading. + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callbacks: function(amount_read) + :param callbacks: Called whenever data is read from this object. + + :type enable_callbacks: bool + :param enable_callbacks: Indicate whether to invoke callback + during read() calls. + + :rtype: ``ReadFileChunk`` + :return: A new instance of ``ReadFileChunk`` + + """ + f = open(filename, 'rb') + f.seek(start_byte) + file_size = os.fstat(f.fileno()).st_size + return cls(f, chunk_size, file_size, callbacks, enable_callbacks) + + def _calculate_file_size( + self, fileobj, requested_size, start_byte, actual_file_size + ): + max_chunk_size = actual_file_size - start_byte + return min(max_chunk_size, requested_size) + + def read(self, amount=None): + amount_left = max(self._size - self._amount_read, 0) + if amount is None: + amount_to_read = amount_left + else: + amount_to_read = min(amount_left, amount) + data = self._fileobj.read(amount_to_read) + self._amount_read += len(data) + if self._callbacks is not None and self._callbacks_enabled: + invoke_progress_callbacks(self._callbacks, len(data)) + return data + + def signal_transferring(self): + self.enable_callback() + if hasattr(self._fileobj, 'signal_transferring'): + self._fileobj.signal_transferring() + + def signal_not_transferring(self): + self.disable_callback() + if hasattr(self._fileobj, 'signal_not_transferring'): + self._fileobj.signal_not_transferring() + + def enable_callback(self): + self._callbacks_enabled = True + + def disable_callback(self): + self._callbacks_enabled = False + + def seek(self, where, whence=0): + if whence not in (0, 1, 2): + # Mimic io's error for invalid whence values + raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)") + + # Recalculate where based on chunk attributes so seek from file + # start (whence=0) is always used + where += self._start_byte + if whence == 1: + where += self._amount_read + elif whence == 2: + where += self._size + + self._fileobj.seek(max(where, self._start_byte)) + if self._callbacks is not None and self._callbacks_enabled: + # To also rewind the callback() for an accurate progress report + bounded_where = max(min(where - self._start_byte, self._size), 0) + bounded_amount_read = min(self._amount_read, self._size) + amount = bounded_where - bounded_amount_read + invoke_progress_callbacks( + self._callbacks, bytes_transferred=amount + ) + self._amount_read = max(where - self._start_byte, 0) + + def close(self): + if self._close_callbacks is not None and self._callbacks_enabled: + for callback in self._close_callbacks: + callback() + self._fileobj.close() + + def tell(self): + return self._amount_read + + def __len__(self): + # __len__ is defined because requests will try to determine the length + # of the stream to set a content length. In the normal case + # of the file it will just stat the file, but we need to change that + # behavior. By providing a __len__, requests will use that instead + # of stat'ing the file. + return self._size + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + def __iter__(self): + # This is a workaround for http://bugs.python.org/issue17575 + # Basically httplib will try to iterate over the contents, even + # if its a file like object. This wasn't noticed because we've + # already exhausted the stream so iterating over the file immediately + # stops, which is what we're simulating here. + return iter([]) + + +class StreamReaderProgress: + """Wrapper for a read only stream that adds progress callbacks.""" + + def __init__(self, stream, callbacks=None): + self._stream = stream + self._callbacks = callbacks + if callbacks is None: + self._callbacks = [] + + def read(self, *args, **kwargs): + value = self._stream.read(*args, **kwargs) + invoke_progress_callbacks(self._callbacks, len(value)) + return value + + +class NoResourcesAvailable(Exception): + pass + + +class TaskSemaphore: + def __init__(self, count): + """A semaphore for the purpose of limiting the number of tasks + + :param count: The size of semaphore + """ + self._semaphore = threading.Semaphore(count) + + def acquire(self, tag, blocking=True): + """Acquire the semaphore + + :param tag: A tag identifying what is acquiring the semaphore. Note + that this is not really needed to directly use this class but is + needed for API compatibility with the SlidingWindowSemaphore + implementation. + :param block: If True, block until it can be acquired. If False, + do not block and raise an exception if cannot be acquired. + + :returns: A token (can be None) to use when releasing the semaphore + """ + logger.debug("Acquiring %s", tag) + if not self._semaphore.acquire(blocking): + raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'") + + def release(self, tag, acquire_token): + """Release the semaphore + + :param tag: A tag identifying what is releasing the semaphore + :param acquire_token: The token returned from when the semaphore was + acquired. Note that this is not really needed to directly use this + class but is needed for API compatibility with the + SlidingWindowSemaphore implementation. + """ + logger.debug(f"Releasing acquire {tag}/{acquire_token}") + self._semaphore.release() + + +class SlidingWindowSemaphore(TaskSemaphore): + """A semaphore used to coordinate sequential resource access. + + This class is similar to the stdlib BoundedSemaphore: + + * It's initialized with a count. + * Each call to ``acquire()`` decrements the counter. + * If the count is at zero, then ``acquire()`` will either block until the + count increases, or if ``blocking=False``, then it will raise + a NoResourcesAvailable exception indicating that it failed to acquire the + semaphore. + + The main difference is that this semaphore is used to limit + access to a resource that requires sequential access. For example, + if I want to access resource R that has 20 subresources R_0 - R_19, + this semaphore can also enforce that you only have a max range of + 10 at any given point in time. You must also specify a tag name + when you acquire the semaphore. The sliding window semantics apply + on a per tag basis. The internal count will only be incremented + when the minimum sequence number for a tag is released. + + """ + + def __init__(self, count): + self._count = count + # Dict[tag, next_sequence_number]. + self._tag_sequences = defaultdict(int) + self._lowest_sequence = {} + self._lock = threading.Lock() + self._condition = threading.Condition(self._lock) + # Dict[tag, List[sequence_number]] + self._pending_release = {} + + def current_count(self): + with self._lock: + return self._count + + def acquire(self, tag, blocking=True): + logger.debug("Acquiring %s", tag) + self._condition.acquire() + try: + if self._count == 0: + if not blocking: + raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'") + else: + while self._count == 0: + self._condition.wait() + # self._count is no longer zero. + # First, check if this is the first time we're seeing this tag. + sequence_number = self._tag_sequences[tag] + if sequence_number == 0: + # First time seeing the tag, so record we're at 0. + self._lowest_sequence[tag] = sequence_number + self._tag_sequences[tag] += 1 + self._count -= 1 + return sequence_number + finally: + self._condition.release() + + def release(self, tag, acquire_token): + sequence_number = acquire_token + logger.debug("Releasing acquire %s/%s", tag, sequence_number) + self._condition.acquire() + try: + if tag not in self._tag_sequences: + raise ValueError(f"Attempted to release unknown tag: {tag}") + max_sequence = self._tag_sequences[tag] + if self._lowest_sequence[tag] == sequence_number: + # We can immediately process this request and free up + # resources. + self._lowest_sequence[tag] += 1 + self._count += 1 + self._condition.notify() + queued = self._pending_release.get(tag, []) + while queued: + if self._lowest_sequence[tag] == queued[-1]: + queued.pop() + self._lowest_sequence[tag] += 1 + self._count += 1 + else: + break + elif self._lowest_sequence[tag] < sequence_number < max_sequence: + # We can't do anything right now because we're still waiting + # for the min sequence for the tag to be released. We have + # to queue this for pending release. + self._pending_release.setdefault(tag, []).append( + sequence_number + ) + self._pending_release[tag].sort(reverse=True) + else: + raise ValueError( + "Attempted to release unknown sequence number " + f"{sequence_number} for tag: {tag}" + ) + finally: + self._condition.release() + + +class ChunksizeAdjuster: + def __init__( + self, + max_size=MAX_SINGLE_UPLOAD_SIZE, + min_size=MIN_UPLOAD_CHUNKSIZE, + max_parts=MAX_PARTS, + ): + self.max_size = max_size + self.min_size = min_size + self.max_parts = max_parts + + def adjust_chunksize(self, current_chunksize, file_size=None): + """Get a chunksize close to current that fits within all S3 limits. + + :type current_chunksize: int + :param current_chunksize: The currently configured chunksize. + + :type file_size: int or None + :param file_size: The size of the file to upload. This might be None + if the object being transferred has an unknown size. + + :returns: A valid chunksize that fits within configured limits. + """ + chunksize = current_chunksize + if file_size is not None: + chunksize = self._adjust_for_max_parts(chunksize, file_size) + return self._adjust_for_chunksize_limits(chunksize) + + def _adjust_for_chunksize_limits(self, current_chunksize): + if current_chunksize > self.max_size: + logger.debug( + "Chunksize greater than maximum chunksize. " + f"Setting to {self.max_size} from {current_chunksize}." + ) + return self.max_size + elif current_chunksize < self.min_size: + logger.debug( + "Chunksize less than minimum chunksize. " + f"Setting to {self.min_size} from {current_chunksize}." + ) + return self.min_size + else: + return current_chunksize + + def _adjust_for_max_parts(self, current_chunksize, file_size): + chunksize = current_chunksize + num_parts = int(math.ceil(file_size / float(chunksize))) + + while num_parts > self.max_parts: + chunksize *= 2 + num_parts = int(math.ceil(file_size / float(chunksize))) + + if chunksize != current_chunksize: + logger.debug( + "Chunksize would result in the number of parts exceeding the " + f"maximum. Setting to {chunksize} from {current_chunksize}." + ) + + return chunksize + + +def add_s3express_defaults(bucket, extra_args): + """ + This function has been deprecated, but is kept for backwards compatibility. + This function is subject to removal in a future release. + """ + if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args: + # Default Transfer Operations to S3Express to use CRC32 + extra_args["ChecksumAlgorithm"] = "crc32" + + +def set_default_checksum_algorithm(extra_args): + """Set the default algorithm to CRC32 if not specified by the user.""" + if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS): + return + extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM) |