diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/boto3/s3/transfer.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/boto3/s3/transfer.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/boto3/s3/transfer.py | 438 |
1 files changed, 438 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/boto3/s3/transfer.py b/.venv/lib/python3.12/site-packages/boto3/s3/transfer.py new file mode 100644 index 00000000..73a3388b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/boto3/s3/transfer.py @@ -0,0 +1,438 @@ +# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# https://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Abstractions over S3's upload/download operations. + +This module provides high level abstractions for efficient +uploads/downloads. It handles several things for the user: + +* Automatically switching to multipart transfers when + a file is over a specific size threshold +* Uploading/downloading a file in parallel +* Progress callbacks to monitor transfers +* Retries. While botocore handles retries for streaming uploads, + it is not possible for it to handle retries for streaming + downloads. This module handles retries for both cases so + you don't need to implement any retry logic yourself. + +This module has a reasonable set of defaults. It also allows you +to configure many aspects of the transfer process including: + +* Multipart threshold size +* Max parallel downloads +* Socket timeouts +* Retry amounts + +There is no support for s3->s3 multipart copies at this +time. + + +.. _ref_s3transfer_usage: + +Usage +===== + +The simplest way to use this module is: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + transfer = S3Transfer(client) + # Upload /tmp/myfile to s3://bucket/key + transfer.upload_file('/tmp/myfile', 'bucket', 'key') + + # Download s3://bucket/key to /tmp/myfile + transfer.download_file('bucket', 'key', '/tmp/myfile') + +The ``upload_file`` and ``download_file`` methods also accept +``**kwargs``, which will be forwarded through to the corresponding +client operation. Here are a few examples using ``upload_file``:: + + # Making the object public + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'ACL': 'public-read'}) + + # Setting metadata + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) + + # Setting content type + transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', + extra_args={'ContentType': "application/json"}) + + +The ``S3Transfer`` class also supports progress callbacks so you can +provide transfer progress to users. Both the ``upload_file`` and +``download_file`` methods take an optional ``callback`` parameter. +Here's an example of how to print a simple progress percentage +to the user: + +.. code-block:: python + + class ProgressPercentage(object): + def __init__(self, filename): + self._filename = filename + self._size = float(os.path.getsize(filename)) + self._seen_so_far = 0 + self._lock = threading.Lock() + + def __call__(self, bytes_amount): + # To simplify we'll assume this is hooked up + # to a single filename. + with self._lock: + self._seen_so_far += bytes_amount + percentage = (self._seen_so_far / self._size) * 100 + sys.stdout.write( + "\r%s %s / %s (%.2f%%)" % ( + self._filename, self._seen_so_far, self._size, + percentage)) + sys.stdout.flush() + + + transfer = S3Transfer(boto3.client('s3', 'us-west-2')) + # Upload /tmp/myfile to s3://bucket/key and print upload progress. + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + callback=ProgressPercentage('/tmp/myfile')) + + + +You can also provide a TransferConfig object to the S3Transfer +object that gives you more fine grained control over the +transfer. For example: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + config = TransferConfig( + multipart_threshold=8 * 1024 * 1024, + max_concurrency=10, + num_download_attempts=10, + ) + transfer = S3Transfer(client, config) + transfer.upload_file('/tmp/foo', 'bucket', 'key') + + +""" + +import logging +import threading +from os import PathLike, fspath, getpid + +from botocore.compat import HAS_CRT +from botocore.exceptions import ClientError +from s3transfer.exceptions import ( + RetriesExceededError as S3TransferRetriesExceededError, +) +from s3transfer.futures import NonThreadedExecutor +from s3transfer.manager import TransferConfig as S3TransferConfig +from s3transfer.manager import TransferManager +from s3transfer.subscribers import BaseSubscriber +from s3transfer.utils import OSUtils + +import boto3.s3.constants as constants +from boto3.exceptions import RetriesExceededError, S3UploadFailedError + +if HAS_CRT: + import awscrt.s3 + + from boto3.crt import create_crt_transfer_manager + +KB = 1024 +MB = KB * KB + +logger = logging.getLogger(__name__) + + +def create_transfer_manager(client, config, osutil=None): + """Creates a transfer manager based on configuration + + :type client: boto3.client + :param client: The S3 client to use + + :type config: boto3.s3.transfer.TransferConfig + :param config: The transfer config to use + + :type osutil: s3transfer.utils.OSUtils + :param osutil: The os utility to use + + :rtype: s3transfer.manager.TransferManager + :returns: A transfer manager based on parameters provided + """ + if _should_use_crt(config): + crt_transfer_manager = create_crt_transfer_manager(client, config) + if crt_transfer_manager is not None: + logger.debug( + f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}" + ) + return crt_transfer_manager + + # If we don't resolve something above, fallback to the default. + logger.debug( + f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}" + ) + return _create_default_transfer_manager(client, config, osutil) + + +def _should_use_crt(config): + # This feature requires awscrt>=0.19.18 + if HAS_CRT and has_minimum_crt_version((0, 19, 18)): + is_optimized_instance = awscrt.s3.is_optimized_for_system() + else: + is_optimized_instance = False + pref_transfer_client = config.preferred_transfer_client.lower() + + if ( + is_optimized_instance + and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT + ): + logger.debug( + "Attempting to use CRTTransferManager. Config settings may be ignored." + ) + return True + + logger.debug( + "Opting out of CRT Transfer Manager. Preferred client: " + f"{pref_transfer_client}, CRT available: {HAS_CRT}, " + f"Instance Optimized: {is_optimized_instance}." + ) + return False + + +def has_minimum_crt_version(minimum_version): + """Not intended for use outside boto3.""" + if not HAS_CRT: + return False + + crt_version_str = awscrt.__version__ + try: + crt_version_ints = map(int, crt_version_str.split(".")) + crt_version_tuple = tuple(crt_version_ints) + except (TypeError, ValueError): + return False + + return crt_version_tuple >= minimum_version + + +def _create_default_transfer_manager(client, config, osutil): + """Create the default TransferManager implementation for s3transfer.""" + executor_cls = None + if not config.use_threads: + executor_cls = NonThreadedExecutor + return TransferManager(client, config, osutil, executor_cls) + + +class TransferConfig(S3TransferConfig): + ALIAS = { + 'max_concurrency': 'max_request_concurrency', + 'max_io_queue': 'max_io_queue_size', + } + + def __init__( + self, + multipart_threshold=8 * MB, + max_concurrency=10, + multipart_chunksize=8 * MB, + num_download_attempts=5, + max_io_queue=100, + io_chunksize=256 * KB, + use_threads=True, + max_bandwidth=None, + preferred_transfer_client=constants.AUTO_RESOLVE_TRANSFER_CLIENT, + ): + """Configuration object for managed S3 transfers + + :param multipart_threshold: The transfer size threshold for which + multipart uploads, downloads, and copies will automatically be + triggered. + + :param max_concurrency: The maximum number of threads that will be + making requests to perform a transfer. If ``use_threads`` is + set to ``False``, the value provided is ignored as the transfer + will only ever use the current thread. + + :param multipart_chunksize: The partition size of each part for a + multipart transfer. + + :param num_download_attempts: The number of download attempts that + will be retried upon errors with downloading an object in S3. + Note that these retries account for errors that occur when + streaming down the data from s3 (i.e. socket errors and read + timeouts that occur after receiving an OK response from s3). + Other retryable exceptions such as throttling errors and 5xx + errors are already retried by botocore (this default is 5). This + does not take into account the number of exceptions retried by + botocore. + + :param max_io_queue: The maximum amount of read parts that can be + queued in memory to be written for a download. The size of each + of these read parts is at most the size of ``io_chunksize``. + + :param io_chunksize: The max size of each chunk in the io queue. + Currently, this is size used when ``read`` is called on the + downloaded stream as well. + + :param use_threads: If True, threads will be used when performing + S3 transfers. If False, no threads will be used in + performing transfers; all logic will be run in the current thread. + + :param max_bandwidth: The maximum bandwidth that will be consumed + in uploading and downloading file content. The value is an integer + in terms of bytes per second. + + :param preferred_transfer_client: String specifying preferred transfer + client for transfer operations. + + Current supported settings are: + * auto (default) - Use the CRTTransferManager when calls + are made with supported environment and settings. + * classic - Only use the origin S3TransferManager with + requests. Disables possible CRT upgrade on requests. + """ + super().__init__( + multipart_threshold=multipart_threshold, + max_request_concurrency=max_concurrency, + multipart_chunksize=multipart_chunksize, + num_download_attempts=num_download_attempts, + max_io_queue_size=max_io_queue, + io_chunksize=io_chunksize, + max_bandwidth=max_bandwidth, + ) + # Some of the argument names are not the same as the inherited + # S3TransferConfig so we add aliases so you can still access the + # old version of the names. + for alias in self.ALIAS: + setattr(self, alias, getattr(self, self.ALIAS[alias])) + self.use_threads = use_threads + self.preferred_transfer_client = preferred_transfer_client + + def __setattr__(self, name, value): + # If the alias name is used, make sure we set the name that it points + # to as that is what actually is used in governing the TransferManager. + if name in self.ALIAS: + super().__setattr__(self.ALIAS[name], value) + # Always set the value of the actual name provided. + super().__setattr__(name, value) + + +class S3Transfer: + ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS + ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS + + def __init__(self, client=None, config=None, osutil=None, manager=None): + if not client and not manager: + raise ValueError( + 'Either a boto3.Client or s3transfer.manager.TransferManager ' + 'must be provided' + ) + if manager and any([client, config, osutil]): + raise ValueError( + 'Manager cannot be provided with client, config, ' + 'nor osutil. These parameters are mutually exclusive.' + ) + if config is None: + config = TransferConfig() + if osutil is None: + osutil = OSUtils() + if manager: + self._manager = manager + else: + self._manager = create_transfer_manager(client, config, osutil) + + def upload_file( + self, filename, bucket, key, callback=None, extra_args=None + ): + """Upload a file to an S3 object. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.upload_file() directly. + + .. seealso:: + :py:meth:`S3.Client.upload_file` + :py:meth:`S3.Client.upload_fileobj` + """ + if isinstance(filename, PathLike): + filename = fspath(filename) + if not isinstance(filename, str): + raise ValueError('Filename must be a string or a path-like object') + + subscribers = self._get_subscribers(callback) + future = self._manager.upload( + filename, bucket, key, extra_args, subscribers + ) + try: + future.result() + # If a client error was raised, add the backwards compatibility layer + # that raises a S3UploadFailedError. These specific errors were only + # ever thrown for upload_parts but now can be thrown for any related + # client error. + except ClientError as e: + raise S3UploadFailedError( + "Failed to upload {} to {}: {}".format( + filename, '/'.join([bucket, key]), e + ) + ) + + def download_file( + self, bucket, key, filename, extra_args=None, callback=None + ): + """Download an S3 object to a file. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.download_file() directly. + + .. seealso:: + :py:meth:`S3.Client.download_file` + :py:meth:`S3.Client.download_fileobj` + """ + if isinstance(filename, PathLike): + filename = fspath(filename) + if not isinstance(filename, str): + raise ValueError('Filename must be a string or a path-like object') + + subscribers = self._get_subscribers(callback) + future = self._manager.download( + bucket, key, filename, extra_args, subscribers + ) + try: + future.result() + # This is for backwards compatibility where when retries are + # exceeded we need to throw the same error from boto3 instead of + # s3transfer's built in RetriesExceededError as current users are + # catching the boto3 one instead of the s3transfer exception to do + # their own retries. + except S3TransferRetriesExceededError as e: + raise RetriesExceededError(e.last_exception) + + def _get_subscribers(self, callback): + if not callback: + return None + return [ProgressCallbackInvoker(callback)] + + def __enter__(self): + return self + + def __exit__(self, *args): + self._manager.__exit__(*args) + + +class ProgressCallbackInvoker(BaseSubscriber): + """A back-compat wrapper to invoke a provided callback via a subscriber + + :param callback: A callable that takes a single positional argument for + how many bytes were transferred. + """ + + def __init__(self, callback): + self._callback = callback + + def on_progress(self, bytes_transferred, **kwargs): + self._callback(bytes_transferred) |