about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/s3transfer/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/s3transfer/__init__.py')
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/__init__.py887
1 files changed, 887 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/__init__.py b/.venv/lib/python3.12/site-packages/s3transfer/__init__.py
new file mode 100644
index 00000000..b8abbf47
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/__init__.py
@@ -0,0 +1,887 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""Abstractions over S3's upload/download operations.
+
+This module provides high level abstractions for efficient
+uploads/downloads.  It handles several things for the user:
+
+* Automatically switching to multipart transfers when
+  a file is over a specific size threshold
+* Uploading/downloading a file in parallel
+* Throttling based on max bandwidth
+* Progress callbacks to monitor transfers
+* Retries.  While botocore handles retries for streaming uploads,
+  it is not possible for it to handle retries for streaming
+  downloads.  This module handles retries for both cases so
+  you don't need to implement any retry logic yourself.
+
+This module has a reasonable set of defaults.  It also allows you
+to configure many aspects of the transfer process including:
+
+* Multipart threshold size
+* Max parallel downloads
+* Max bandwidth
+* Socket timeouts
+* Retry amounts
+
+There is no support for s3->s3 multipart copies at this
+time.
+
+
+.. _ref_s3transfer_usage:
+
+Usage
+=====
+
+The simplest way to use this module is:
+
+.. code-block:: python
+
+    client = boto3.client('s3', 'us-west-2')
+    transfer = S3Transfer(client)
+    # Upload /tmp/myfile to s3://bucket/key
+    transfer.upload_file('/tmp/myfile', 'bucket', 'key')
+
+    # Download s3://bucket/key to /tmp/myfile
+    transfer.download_file('bucket', 'key', '/tmp/myfile')
+
+The ``upload_file`` and ``download_file`` methods also accept
+``**kwargs``, which will be forwarded through to the corresponding
+client operation.  Here are a few examples using ``upload_file``::
+
+    # Making the object public
+    transfer.upload_file('/tmp/myfile', 'bucket', 'key',
+                         extra_args={'ACL': 'public-read'})
+
+    # Setting metadata
+    transfer.upload_file('/tmp/myfile', 'bucket', 'key',
+                         extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
+
+    # Setting content type
+    transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
+                         extra_args={'ContentType': "application/json"})
+
+
+The ``S3Transfer`` class also supports progress callbacks so you can
+provide transfer progress to users.  Both the ``upload_file`` and
+``download_file`` methods take an optional ``callback`` parameter.
+Here's an example of how to print a simple progress percentage
+to the user:
+
+.. code-block:: python
+
+    class ProgressPercentage(object):
+        def __init__(self, filename):
+            self._filename = filename
+            self._size = float(os.path.getsize(filename))
+            self._seen_so_far = 0
+            self._lock = threading.Lock()
+
+        def __call__(self, bytes_amount):
+            # To simplify we'll assume this is hooked up
+            # to a single filename.
+            with self._lock:
+                self._seen_so_far += bytes_amount
+                percentage = (self._seen_so_far / self._size) * 100
+                sys.stdout.write(
+                    "\r%s  %s / %s  (%.2f%%)" % (self._filename, self._seen_so_far,
+                                                 self._size, percentage))
+                sys.stdout.flush()
+
+
+    transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
+    # Upload /tmp/myfile to s3://bucket/key and print upload progress.
+    transfer.upload_file('/tmp/myfile', 'bucket', 'key',
+                         callback=ProgressPercentage('/tmp/myfile'))
+
+
+
+You can also provide a TransferConfig object to the S3Transfer
+object that gives you more fine grained control over the
+transfer.  For example:
+
+.. code-block:: python
+
+    client = boto3.client('s3', 'us-west-2')
+    config = TransferConfig(
+        multipart_threshold=8 * 1024 * 1024,
+        max_concurrency=10,
+        num_download_attempts=10,
+    )
+    transfer = S3Transfer(client, config)
+    transfer.upload_file('/tmp/foo', 'bucket', 'key')
+
+
+"""
+
+import concurrent.futures
+import functools
+import logging
+import math
+import os
+import queue
+import random
+import socket
+import string
+import threading
+
+from botocore.compat import six  # noqa: F401
+from botocore.exceptions import IncompleteReadError, ResponseStreamingError
+from botocore.vendored.requests.packages.urllib3.exceptions import (
+    ReadTimeoutError,
+)
+
+import s3transfer.compat
+from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
+
+__author__ = 'Amazon Web Services'
+__version__ = '0.11.4'
+
+
+class NullHandler(logging.Handler):
+    def emit(self, record):
+        pass
+
+
+logger = logging.getLogger(__name__)
+logger.addHandler(NullHandler())
+
+MB = 1024 * 1024
+SHUTDOWN_SENTINEL = object()
+
+
+def random_file_extension(num_digits=8):
+    return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
+
+
+def disable_upload_callbacks(request, operation_name, **kwargs):
+    if operation_name in ['PutObject', 'UploadPart'] and hasattr(
+        request.body, 'disable_callback'
+    ):
+        request.body.disable_callback()
+
+
+def enable_upload_callbacks(request, operation_name, **kwargs):
+    if operation_name in ['PutObject', 'UploadPart'] and hasattr(
+        request.body, 'enable_callback'
+    ):
+        request.body.enable_callback()
+
+
+class QueueShutdownError(Exception):
+    pass
+
+
+class ReadFileChunk:
+    def __init__(
+        self,
+        fileobj,
+        start_byte,
+        chunk_size,
+        full_file_size,
+        callback=None,
+        enable_callback=True,
+    ):
+        """
+
+        Given a file object shown below:
+
+            |___________________________________________________|
+            0          |                 |                 full_file_size
+                       |----chunk_size---|
+                 start_byte
+
+        :type fileobj: file
+        :param fileobj: File like object
+
+        :type start_byte: int
+        :param start_byte: The first byte from which to start reading.
+
+        :type chunk_size: int
+        :param chunk_size: The max chunk size to read.  Trying to read
+            pass the end of the chunk size will behave like you've
+            reached the end of the file.
+
+        :type full_file_size: int
+        :param full_file_size: The entire content length associated
+            with ``fileobj``.
+
+        :type callback: function(amount_read)
+        :param callback: Called whenever data is read from this object.
+
+        """
+        self._fileobj = fileobj
+        self._start_byte = start_byte
+        self._size = self._calculate_file_size(
+            self._fileobj,
+            requested_size=chunk_size,
+            start_byte=start_byte,
+            actual_file_size=full_file_size,
+        )
+        self._fileobj.seek(self._start_byte)
+        self._amount_read = 0
+        self._callback = callback
+        self._callback_enabled = enable_callback
+
+    @classmethod
+    def from_filename(
+        cls,
+        filename,
+        start_byte,
+        chunk_size,
+        callback=None,
+        enable_callback=True,
+    ):
+        """Convenience factory function to create from a filename.
+
+        :type start_byte: int
+        :param start_byte: The first byte from which to start reading.
+
+        :type chunk_size: int
+        :param chunk_size: The max chunk size to read.  Trying to read
+            pass the end of the chunk size will behave like you've
+            reached the end of the file.
+
+        :type full_file_size: int
+        :param full_file_size: The entire content length associated
+            with ``fileobj``.
+
+        :type callback: function(amount_read)
+        :param callback: Called whenever data is read from this object.
+
+        :type enable_callback: bool
+        :param enable_callback: Indicate whether to invoke callback
+            during read() calls.
+
+        :rtype: ``ReadFileChunk``
+        :return: A new instance of ``ReadFileChunk``
+
+        """
+        f = open(filename, 'rb')
+        file_size = os.fstat(f.fileno()).st_size
+        return cls(
+            f, start_byte, chunk_size, file_size, callback, enable_callback
+        )
+
+    def _calculate_file_size(
+        self, fileobj, requested_size, start_byte, actual_file_size
+    ):
+        max_chunk_size = actual_file_size - start_byte
+        return min(max_chunk_size, requested_size)
+
+    def read(self, amount=None):
+        if amount is None:
+            amount_to_read = self._size - self._amount_read
+        else:
+            amount_to_read = min(self._size - self._amount_read, amount)
+        data = self._fileobj.read(amount_to_read)
+        self._amount_read += len(data)
+        if self._callback is not None and self._callback_enabled:
+            self._callback(len(data))
+        return data
+
+    def enable_callback(self):
+        self._callback_enabled = True
+
+    def disable_callback(self):
+        self._callback_enabled = False
+
+    def seek(self, where):
+        self._fileobj.seek(self._start_byte + where)
+        if self._callback is not None and self._callback_enabled:
+            # To also rewind the callback() for an accurate progress report
+            self._callback(where - self._amount_read)
+        self._amount_read = where
+
+    def close(self):
+        self._fileobj.close()
+
+    def tell(self):
+        return self._amount_read
+
+    def __len__(self):
+        # __len__ is defined because requests will try to determine the length
+        # of the stream to set a content length.  In the normal case
+        # of the file it will just stat the file, but we need to change that
+        # behavior.  By providing a __len__, requests will use that instead
+        # of stat'ing the file.
+        return self._size
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
+    def __iter__(self):
+        # This is a workaround for http://bugs.python.org/issue17575
+        # Basically httplib will try to iterate over the contents, even
+        # if its a file like object.  This wasn't noticed because we've
+        # already exhausted the stream so iterating over the file immediately
+        # stops, which is what we're simulating here.
+        return iter([])
+
+
+class StreamReaderProgress:
+    """Wrapper for a read only stream that adds progress callbacks."""
+
+    def __init__(self, stream, callback=None):
+        self._stream = stream
+        self._callback = callback
+
+    def read(self, *args, **kwargs):
+        value = self._stream.read(*args, **kwargs)
+        if self._callback is not None:
+            self._callback(len(value))
+        return value
+
+
+class OSUtils:
+    def get_file_size(self, filename):
+        return os.path.getsize(filename)
+
+    def open_file_chunk_reader(self, filename, start_byte, size, callback):
+        return ReadFileChunk.from_filename(
+            filename, start_byte, size, callback, enable_callback=False
+        )
+
+    def open(self, filename, mode):
+        return open(filename, mode)
+
+    def remove_file(self, filename):
+        """Remove a file, noop if file does not exist."""
+        # Unlike os.remove, if the file does not exist,
+        # then this method does nothing.
+        try:
+            os.remove(filename)
+        except OSError:
+            pass
+
+    def rename_file(self, current_filename, new_filename):
+        s3transfer.compat.rename_file(current_filename, new_filename)
+
+
+class MultipartUploader:
+    # These are the extra_args that need to be forwarded onto
+    # subsequent upload_parts.
+    UPLOAD_PART_ARGS = [
+        'SSECustomerKey',
+        'SSECustomerAlgorithm',
+        'SSECustomerKeyMD5',
+        'RequestPayer',
+    ]
+
+    def __init__(
+        self,
+        client,
+        config,
+        osutil,
+        executor_cls=concurrent.futures.ThreadPoolExecutor,
+    ):
+        self._client = client
+        self._config = config
+        self._os = osutil
+        self._executor_cls = executor_cls
+
+    def _extra_upload_part_args(self, extra_args):
+        # Only the args in UPLOAD_PART_ARGS actually need to be passed
+        # onto the upload_part calls.
+        upload_parts_args = {}
+        for key, value in extra_args.items():
+            if key in self.UPLOAD_PART_ARGS:
+                upload_parts_args[key] = value
+        return upload_parts_args
+
+    def upload_file(self, filename, bucket, key, callback, extra_args):
+        response = self._client.create_multipart_upload(
+            Bucket=bucket, Key=key, **extra_args
+        )
+        upload_id = response['UploadId']
+        try:
+            parts = self._upload_parts(
+                upload_id, filename, bucket, key, callback, extra_args
+            )
+        except Exception as e:
+            logger.debug(
+                "Exception raised while uploading parts, "
+                "aborting multipart upload.",
+                exc_info=True,
+            )
+            self._client.abort_multipart_upload(
+                Bucket=bucket, Key=key, UploadId=upload_id
+            )
+            raise S3UploadFailedError(
+                "Failed to upload {} to {}: {}".format(
+                    filename, '/'.join([bucket, key]), e
+                )
+            )
+        self._client.complete_multipart_upload(
+            Bucket=bucket,
+            Key=key,
+            UploadId=upload_id,
+            MultipartUpload={'Parts': parts},
+        )
+
+    def _upload_parts(
+        self, upload_id, filename, bucket, key, callback, extra_args
+    ):
+        upload_parts_extra_args = self._extra_upload_part_args(extra_args)
+        parts = []
+        part_size = self._config.multipart_chunksize
+        num_parts = int(
+            math.ceil(self._os.get_file_size(filename) / float(part_size))
+        )
+        max_workers = self._config.max_concurrency
+        with self._executor_cls(max_workers=max_workers) as executor:
+            upload_partial = functools.partial(
+                self._upload_one_part,
+                filename,
+                bucket,
+                key,
+                upload_id,
+                part_size,
+                upload_parts_extra_args,
+                callback,
+            )
+            for part in executor.map(upload_partial, range(1, num_parts + 1)):
+                parts.append(part)
+        return parts
+
+    def _upload_one_part(
+        self,
+        filename,
+        bucket,
+        key,
+        upload_id,
+        part_size,
+        extra_args,
+        callback,
+        part_number,
+    ):
+        open_chunk_reader = self._os.open_file_chunk_reader
+        with open_chunk_reader(
+            filename, part_size * (part_number - 1), part_size, callback
+        ) as body:
+            response = self._client.upload_part(
+                Bucket=bucket,
+                Key=key,
+                UploadId=upload_id,
+                PartNumber=part_number,
+                Body=body,
+                **extra_args,
+            )
+            etag = response['ETag']
+            return {'ETag': etag, 'PartNumber': part_number}
+
+
+class ShutdownQueue(queue.Queue):
+    """A queue implementation that can be shutdown.
+
+    Shutting down a queue means that this class adds a
+    trigger_shutdown method that will trigger all subsequent
+    calls to put() to fail with a ``QueueShutdownError``.
+
+    It purposefully deviates from queue.Queue, and is *not* meant
+    to be a drop in replacement for ``queue.Queue``.
+
+    """
+
+    def _init(self, maxsize):
+        self._shutdown = False
+        self._shutdown_lock = threading.Lock()
+        # queue.Queue is an old style class so we don't use super().
+        return queue.Queue._init(self, maxsize)
+
+    def trigger_shutdown(self):
+        with self._shutdown_lock:
+            self._shutdown = True
+            logger.debug("The IO queue is now shutdown.")
+
+    def put(self, item):
+        # Note: this is not sufficient, it's still possible to deadlock!
+        # Need to hook into the condition vars used by this class.
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise QueueShutdownError(
+                    "Cannot put item to queue when " "queue has been shutdown."
+                )
+        return queue.Queue.put(self, item)
+
+
+class MultipartDownloader:
+    def __init__(
+        self,
+        client,
+        config,
+        osutil,
+        executor_cls=concurrent.futures.ThreadPoolExecutor,
+    ):
+        self._client = client
+        self._config = config
+        self._os = osutil
+        self._executor_cls = executor_cls
+        self._ioqueue = ShutdownQueue(self._config.max_io_queue)
+
+    def download_file(
+        self, bucket, key, filename, object_size, extra_args, callback=None
+    ):
+        with self._executor_cls(max_workers=2) as controller:
+            # 1 thread for the future that manages the uploading of files
+            # 1 thread for the future that manages IO writes.
+            download_parts_handler = functools.partial(
+                self._download_file_as_future,
+                bucket,
+                key,
+                filename,
+                object_size,
+                callback,
+            )
+            parts_future = controller.submit(download_parts_handler)
+
+            io_writes_handler = functools.partial(
+                self._perform_io_writes, filename
+            )
+            io_future = controller.submit(io_writes_handler)
+            results = concurrent.futures.wait(
+                [parts_future, io_future],
+                return_when=concurrent.futures.FIRST_EXCEPTION,
+            )
+            self._process_future_results(results)
+
+    def _process_future_results(self, futures):
+        finished, unfinished = futures
+        for future in finished:
+            future.result()
+
+    def _download_file_as_future(
+        self, bucket, key, filename, object_size, callback
+    ):
+        part_size = self._config.multipart_chunksize
+        num_parts = int(math.ceil(object_size / float(part_size)))
+        max_workers = self._config.max_concurrency
+        download_partial = functools.partial(
+            self._download_range,
+            bucket,
+            key,
+            filename,
+            part_size,
+            num_parts,
+            callback,
+        )
+        try:
+            with self._executor_cls(max_workers=max_workers) as executor:
+                list(executor.map(download_partial, range(num_parts)))
+        finally:
+            self._ioqueue.put(SHUTDOWN_SENTINEL)
+
+    def _calculate_range_param(self, part_size, part_index, num_parts):
+        start_range = part_index * part_size
+        if part_index == num_parts - 1:
+            end_range = ''
+        else:
+            end_range = start_range + part_size - 1
+        range_param = f'bytes={start_range}-{end_range}'
+        return range_param
+
+    def _download_range(
+        self, bucket, key, filename, part_size, num_parts, callback, part_index
+    ):
+        try:
+            range_param = self._calculate_range_param(
+                part_size, part_index, num_parts
+            )
+
+            max_attempts = self._config.num_download_attempts
+            last_exception = None
+            for i in range(max_attempts):
+                try:
+                    logger.debug("Making get_object call.")
+                    response = self._client.get_object(
+                        Bucket=bucket, Key=key, Range=range_param
+                    )
+                    streaming_body = StreamReaderProgress(
+                        response['Body'], callback
+                    )
+                    buffer_size = 1024 * 16
+                    current_index = part_size * part_index
+                    for chunk in iter(
+                        lambda: streaming_body.read(buffer_size), b''
+                    ):
+                        self._ioqueue.put((current_index, chunk))
+                        current_index += len(chunk)
+                    return
+                except (
+                    socket.timeout,
+                    OSError,
+                    ReadTimeoutError,
+                    IncompleteReadError,
+                    ResponseStreamingError,
+                ) as e:
+                    logger.debug(
+                        "Retrying exception caught (%s), "
+                        "retrying request, (attempt %s / %s)",
+                        e,
+                        i,
+                        max_attempts,
+                        exc_info=True,
+                    )
+                    last_exception = e
+                    continue
+            raise RetriesExceededError(last_exception)
+        finally:
+            logger.debug("EXITING _download_range for part: %s", part_index)
+
+    def _perform_io_writes(self, filename):
+        with self._os.open(filename, 'wb') as f:
+            while True:
+                task = self._ioqueue.get()
+                if task is SHUTDOWN_SENTINEL:
+                    logger.debug(
+                        "Shutdown sentinel received in IO handler, "
+                        "shutting down IO handler."
+                    )
+                    return
+                else:
+                    try:
+                        offset, data = task
+                        f.seek(offset)
+                        f.write(data)
+                    except Exception as e:
+                        logger.debug(
+                            "Caught exception in IO thread: %s",
+                            e,
+                            exc_info=True,
+                        )
+                        self._ioqueue.trigger_shutdown()
+                        raise
+
+
+class TransferConfig:
+    def __init__(
+        self,
+        multipart_threshold=8 * MB,
+        max_concurrency=10,
+        multipart_chunksize=8 * MB,
+        num_download_attempts=5,
+        max_io_queue=100,
+    ):
+        self.multipart_threshold = multipart_threshold
+        self.max_concurrency = max_concurrency
+        self.multipart_chunksize = multipart_chunksize
+        self.num_download_attempts = num_download_attempts
+        self.max_io_queue = max_io_queue
+
+
+class S3Transfer:
+    ALLOWED_DOWNLOAD_ARGS = [
+        'VersionId',
+        'SSECustomerAlgorithm',
+        'SSECustomerKey',
+        'SSECustomerKeyMD5',
+        'RequestPayer',
+    ]
+
+    ALLOWED_UPLOAD_ARGS = [
+        'ACL',
+        'CacheControl',
+        'ContentDisposition',
+        'ContentEncoding',
+        'ContentLanguage',
+        'ContentType',
+        'Expires',
+        'GrantFullControl',
+        'GrantRead',
+        'GrantReadACP',
+        'GrantWriteACL',
+        'Metadata',
+        'RequestPayer',
+        'ServerSideEncryption',
+        'StorageClass',
+        'SSECustomerAlgorithm',
+        'SSECustomerKey',
+        'SSECustomerKeyMD5',
+        'SSEKMSKeyId',
+        'SSEKMSEncryptionContext',
+        'Tagging',
+    ]
+
+    def __init__(self, client, config=None, osutil=None):
+        self._client = client
+        self._client.meta.events.register(
+            'before-call.s3.*', self._update_checksum_context
+        )
+        if config is None:
+            config = TransferConfig()
+        self._config = config
+        if osutil is None:
+            osutil = OSUtils()
+        self._osutil = osutil
+
+    def _update_checksum_context(self, params, **kwargs):
+        request_context = params.get("context", {})
+        checksum_context = request_context.get("checksum", {})
+        if "request_algorithm" in checksum_context:
+            # Force request checksum algorithm in the header if specified.
+            checksum_context["request_algorithm"]["in"] = "header"
+
+    def upload_file(
+        self, filename, bucket, key, callback=None, extra_args=None
+    ):
+        """Upload a file to an S3 object.
+
+        Variants have also been injected into S3 client, Bucket and Object.
+        You don't have to use S3Transfer.upload_file() directly.
+        """
+        if extra_args is None:
+            extra_args = {}
+        self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
+        events = self._client.meta.events
+        events.register_first(
+            'request-created.s3',
+            disable_upload_callbacks,
+            unique_id='s3upload-callback-disable',
+        )
+        events.register_last(
+            'request-created.s3',
+            enable_upload_callbacks,
+            unique_id='s3upload-callback-enable',
+        )
+        if (
+            self._osutil.get_file_size(filename)
+            >= self._config.multipart_threshold
+        ):
+            self._multipart_upload(filename, bucket, key, callback, extra_args)
+        else:
+            self._put_object(filename, bucket, key, callback, extra_args)
+
+    def _put_object(self, filename, bucket, key, callback, extra_args):
+        # We're using open_file_chunk_reader so we can take advantage of the
+        # progress callback functionality.
+        open_chunk_reader = self._osutil.open_file_chunk_reader
+        with open_chunk_reader(
+            filename,
+            0,
+            self._osutil.get_file_size(filename),
+            callback=callback,
+        ) as body:
+            self._client.put_object(
+                Bucket=bucket, Key=key, Body=body, **extra_args
+            )
+
+    def download_file(
+        self, bucket, key, filename, extra_args=None, callback=None
+    ):
+        """Download an S3 object to a file.
+
+        Variants have also been injected into S3 client, Bucket and Object.
+        You don't have to use S3Transfer.download_file() directly.
+        """
+        # This method will issue a ``head_object`` request to determine
+        # the size of the S3 object.  This is used to determine if the
+        # object is downloaded in parallel.
+        if extra_args is None:
+            extra_args = {}
+        self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
+        object_size = self._object_size(bucket, key, extra_args)
+        temp_filename = filename + os.extsep + random_file_extension()
+        try:
+            self._download_file(
+                bucket, key, temp_filename, object_size, extra_args, callback
+            )
+        except Exception:
+            logger.debug(
+                "Exception caught in download_file, removing partial "
+                "file: %s",
+                temp_filename,
+                exc_info=True,
+            )
+            self._osutil.remove_file(temp_filename)
+            raise
+        else:
+            self._osutil.rename_file(temp_filename, filename)
+
+    def _download_file(
+        self, bucket, key, filename, object_size, extra_args, callback
+    ):
+        if object_size >= self._config.multipart_threshold:
+            self._ranged_download(
+                bucket, key, filename, object_size, extra_args, callback
+            )
+        else:
+            self._get_object(bucket, key, filename, extra_args, callback)
+
+    def _validate_all_known_args(self, actual, allowed):
+        for kwarg in actual:
+            if kwarg not in allowed:
+                raise ValueError(
+                    f"Invalid extra_args key '{kwarg}', "
+                    f"must be one of: {', '.join(allowed)}"
+                )
+
+    def _ranged_download(
+        self, bucket, key, filename, object_size, extra_args, callback
+    ):
+        downloader = MultipartDownloader(
+            self._client, self._config, self._osutil
+        )
+        downloader.download_file(
+            bucket, key, filename, object_size, extra_args, callback
+        )
+
+    def _get_object(self, bucket, key, filename, extra_args, callback):
+        # precondition: num_download_attempts > 0
+        max_attempts = self._config.num_download_attempts
+        last_exception = None
+        for i in range(max_attempts):
+            try:
+                return self._do_get_object(
+                    bucket, key, filename, extra_args, callback
+                )
+            except (
+                socket.timeout,
+                OSError,
+                ReadTimeoutError,
+                IncompleteReadError,
+                ResponseStreamingError,
+            ) as e:
+                # TODO: we need a way to reset the callback if the
+                # download failed.
+                logger.debug(
+                    "Retrying exception caught (%s), "
+                    "retrying request, (attempt %s / %s)",
+                    e,
+                    i,
+                    max_attempts,
+                    exc_info=True,
+                )
+                last_exception = e
+                continue
+        raise RetriesExceededError(last_exception)
+
+    def _do_get_object(self, bucket, key, filename, extra_args, callback):
+        response = self._client.get_object(
+            Bucket=bucket, Key=key, **extra_args
+        )
+        streaming_body = StreamReaderProgress(response['Body'], callback)
+        with self._osutil.open(filename, 'wb') as f:
+            for chunk in iter(lambda: streaming_body.read(8192), b''):
+                f.write(chunk)
+
+    def _object_size(self, bucket, key, extra_args):
+        return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
+            'ContentLength'
+        ]
+
+    def _multipart_upload(self, filename, bucket, key, callback, extra_args):
+        uploader = MultipartUploader(self._client, self._config, self._osutil)
+        uploader.upload_file(filename, bucket, key, callback, extra_args)