about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/s3transfer/processpool.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/s3transfer/processpool.py')
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/processpool.py1009
1 files changed, 1009 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/processpool.py b/.venv/lib/python3.12/site-packages/s3transfer/processpool.py
new file mode 100644
index 00000000..318f1e7f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/processpool.py
@@ -0,0 +1,1009 @@
+# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+"""Speeds up S3 throughput by using processes
+
+Getting Started
+===============
+
+The :class:`ProcessPoolDownloader` can be used to download a single file by
+calling :meth:`ProcessPoolDownloader.download_file`:
+
+.. code:: python
+
+     from s3transfer.processpool import ProcessPoolDownloader
+
+     with ProcessPoolDownloader() as downloader:
+          downloader.download_file('mybucket', 'mykey', 'myfile')
+
+
+This snippet downloads the S3 object located in the bucket ``mybucket`` at the
+key ``mykey`` to the local file ``myfile``. Any errors encountered during the
+transfer are not propagated. To determine if a transfer succeeded or
+failed, use the `Futures`_ interface.
+
+
+The :class:`ProcessPoolDownloader` can be used to download multiple files as
+well:
+
+.. code:: python
+
+     from s3transfer.processpool import ProcessPoolDownloader
+
+     with ProcessPoolDownloader() as downloader:
+          downloader.download_file('mybucket', 'mykey', 'myfile')
+          downloader.download_file('mybucket', 'myotherkey', 'myotherfile')
+
+
+When running this snippet, the downloading of ``mykey`` and ``myotherkey``
+happen in parallel. The first ``download_file`` call does not block the
+second ``download_file`` call. The snippet blocks when exiting
+the context manager and blocks until both downloads are complete.
+
+Alternatively, the ``ProcessPoolDownloader`` can be instantiated
+and explicitly be shutdown using :meth:`ProcessPoolDownloader.shutdown`:
+
+.. code:: python
+
+     from s3transfer.processpool import ProcessPoolDownloader
+
+     downloader = ProcessPoolDownloader()
+     downloader.download_file('mybucket', 'mykey', 'myfile')
+     downloader.download_file('mybucket', 'myotherkey', 'myotherfile')
+     downloader.shutdown()
+
+
+For this code snippet, the call to ``shutdown`` blocks until both
+downloads are complete.
+
+
+Additional Parameters
+=====================
+
+Additional parameters can be provided to the ``download_file`` method:
+
+* ``extra_args``: A dictionary containing any additional client arguments
+  to include in the
+  `GetObject <https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.get_object>`_
+  API request. For example:
+
+  .. code:: python
+
+     from s3transfer.processpool import ProcessPoolDownloader
+
+     with ProcessPoolDownloader() as downloader:
+          downloader.download_file(
+               'mybucket', 'mykey', 'myfile',
+               extra_args={'VersionId': 'myversion'})
+
+
+* ``expected_size``: By default, the downloader will make a HeadObject
+  call to determine the size of the object. To opt-out of this additional
+  API call, you can provide the size of the object in bytes:
+
+  .. code:: python
+
+     from s3transfer.processpool import ProcessPoolDownloader
+
+     MB = 1024 * 1024
+     with ProcessPoolDownloader() as downloader:
+          downloader.download_file(
+               'mybucket', 'mykey', 'myfile', expected_size=2 * MB)
+
+
+Futures
+=======
+
+When ``download_file`` is called, it immediately returns a
+:class:`ProcessPoolTransferFuture`. The future can be used to poll the state
+of a particular transfer. To get the result of the download,
+call :meth:`ProcessPoolTransferFuture.result`. The method blocks
+until the transfer completes, whether it succeeds or fails. For example:
+
+.. code:: python
+
+     from s3transfer.processpool import ProcessPoolDownloader
+
+     with ProcessPoolDownloader() as downloader:
+          future = downloader.download_file('mybucket', 'mykey', 'myfile')
+          print(future.result())
+
+
+If the download succeeds, the future returns ``None``:
+
+.. code:: python
+
+     None
+
+
+If the download fails, the exception causing the failure is raised. For
+example, if ``mykey`` did not exist, the following error would be raised
+
+
+.. code:: python
+
+     botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
+
+
+.. note::
+
+    :meth:`ProcessPoolTransferFuture.result` can only be called while the
+    ``ProcessPoolDownloader`` is running (e.g. before calling ``shutdown`` or
+    inside the context manager).
+
+
+Process Pool Configuration
+==========================
+
+By default, the downloader has the following configuration options:
+
+* ``multipart_threshold``: The threshold size for performing ranged downloads
+  in bytes. By default, ranged downloads happen for S3 objects that are
+  greater than or equal to 8 MB in size.
+
+* ``multipart_chunksize``: The size of each ranged download in bytes. By
+  default, the size of each ranged download is 8 MB.
+
+* ``max_request_processes``: The maximum number of processes used to download
+  S3 objects. By default, the maximum is 10 processes.
+
+
+To change the default configuration, use the :class:`ProcessTransferConfig`:
+
+.. code:: python
+
+     from s3transfer.processpool import ProcessPoolDownloader
+     from s3transfer.processpool import ProcessTransferConfig
+
+     config = ProcessTransferConfig(
+          multipart_threshold=64 * 1024 * 1024,  # 64 MB
+          max_request_processes=50
+     )
+     downloader = ProcessPoolDownloader(config=config)
+
+
+Client Configuration
+====================
+
+The process pool downloader creates ``botocore`` clients on your behalf. In
+order to affect how the client is created, pass the keyword arguments
+that would have been used in the :meth:`botocore.Session.create_client` call:
+
+.. code:: python
+
+
+     from s3transfer.processpool import ProcessPoolDownloader
+     from s3transfer.processpool import ProcessTransferConfig
+
+     downloader = ProcessPoolDownloader(
+          client_kwargs={'region_name': 'us-west-2'})
+
+
+This snippet ensures that all clients created by the ``ProcessPoolDownloader``
+are using ``us-west-2`` as their region.
+
+"""
+
+import collections
+import contextlib
+import logging
+import multiprocessing
+import signal
+import threading
+from copy import deepcopy
+
+import botocore.session
+from botocore.config import Config
+
+from s3transfer.compat import MAXINT, BaseManager
+from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, MB, PROCESS_USER_AGENT
+from s3transfer.exceptions import CancelledError, RetriesExceededError
+from s3transfer.futures import BaseTransferFuture, BaseTransferMeta
+from s3transfer.utils import (
+    S3_RETRYABLE_DOWNLOAD_ERRORS,
+    CallArgs,
+    OSUtils,
+    calculate_num_parts,
+    calculate_range_parameter,
+)
+
+logger = logging.getLogger(__name__)
+
+SHUTDOWN_SIGNAL = 'SHUTDOWN'
+
+# The DownloadFileRequest tuple is submitted from the ProcessPoolDownloader
+# to the GetObjectSubmitter in order for the submitter to begin submitting
+# GetObjectJobs to the GetObjectWorkers.
+DownloadFileRequest = collections.namedtuple(
+    'DownloadFileRequest',
+    [
+        'transfer_id',  # The unique id for the transfer
+        'bucket',  # The bucket to download the object from
+        'key',  # The key to download the object from
+        'filename',  # The user-requested download location
+        'extra_args',  # Extra arguments to provide to client calls
+        'expected_size',  # The user-provided expected size of the download
+    ],
+)
+
+# The GetObjectJob tuple is submitted from the GetObjectSubmitter
+# to the GetObjectWorkers to download the file or parts of the file.
+GetObjectJob = collections.namedtuple(
+    'GetObjectJob',
+    [
+        'transfer_id',  # The unique id for the transfer
+        'bucket',  # The bucket to download the object from
+        'key',  # The key to download the object from
+        'temp_filename',  # The temporary file to write the content to via
+        # completed GetObject calls.
+        'extra_args',  # Extra arguments to provide to the GetObject call
+        'offset',  # The offset to write the content for the temp file.
+        'filename',  # The user-requested download location. The worker
+        # of final GetObjectJob will move the file located at
+        # temp_filename to the location of filename.
+    ],
+)
+
+
+@contextlib.contextmanager
+def ignore_ctrl_c():
+    original_handler = _add_ignore_handler_for_interrupts()
+    yield
+    signal.signal(signal.SIGINT, original_handler)
+
+
+def _add_ignore_handler_for_interrupts():
+    # Windows is unable to pickle signal.signal directly so it needs to
+    # be wrapped in a function defined at the module level
+    return signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+
+class ProcessTransferConfig:
+    def __init__(
+        self,
+        multipart_threshold=8 * MB,
+        multipart_chunksize=8 * MB,
+        max_request_processes=10,
+    ):
+        """Configuration for the ProcessPoolDownloader
+
+        :param multipart_threshold: The threshold for which ranged downloads
+            occur.
+
+        :param multipart_chunksize: The chunk size of each ranged download.
+
+        :param max_request_processes: The maximum number of processes that
+            will be making S3 API transfer-related requests at a time.
+        """
+        self.multipart_threshold = multipart_threshold
+        self.multipart_chunksize = multipart_chunksize
+        self.max_request_processes = max_request_processes
+
+
+class ProcessPoolDownloader:
+    def __init__(self, client_kwargs=None, config=None):
+        """Downloads S3 objects using process pools
+
+        :type client_kwargs: dict
+        :param client_kwargs: The keyword arguments to provide when
+            instantiating S3 clients. The arguments must match the keyword
+            arguments provided to the
+            `botocore.session.Session.create_client()` method.
+
+        :type config: ProcessTransferConfig
+        :param config: Configuration for the downloader
+        """
+        if client_kwargs is None:
+            client_kwargs = {}
+        self._client_factory = ClientFactory(client_kwargs)
+
+        self._transfer_config = config
+        if config is None:
+            self._transfer_config = ProcessTransferConfig()
+
+        self._download_request_queue = multiprocessing.Queue(1000)
+        self._worker_queue = multiprocessing.Queue(1000)
+        self._osutil = OSUtils()
+
+        self._started = False
+        self._start_lock = threading.Lock()
+
+        # These below are initialized in the start() method
+        self._manager = None
+        self._transfer_monitor = None
+        self._submitter = None
+        self._workers = []
+
+    def download_file(
+        self, bucket, key, filename, extra_args=None, expected_size=None
+    ):
+        """Downloads the object's contents to a file
+
+        :type bucket: str
+        :param bucket: The name of the bucket to download from
+
+        :type key: str
+        :param key: The name of the key to download from
+
+        :type filename: str
+        :param filename: The name of a file to download to.
+
+        :type extra_args: dict
+        :param extra_args: Extra arguments that may be passed to the
+            client operation
+
+        :type expected_size: int
+        :param expected_size: The expected size in bytes of the download. If
+            provided, the downloader will not call HeadObject to determine the
+            object's size and use the provided value instead. The size is
+            needed to determine whether to do a multipart download.
+
+        :rtype: s3transfer.futures.TransferFuture
+        :returns: Transfer future representing the download
+        """
+        self._start_if_needed()
+        if extra_args is None:
+            extra_args = {}
+        self._validate_all_known_args(extra_args)
+        transfer_id = self._transfer_monitor.notify_new_transfer()
+        download_file_request = DownloadFileRequest(
+            transfer_id=transfer_id,
+            bucket=bucket,
+            key=key,
+            filename=filename,
+            extra_args=extra_args,
+            expected_size=expected_size,
+        )
+        logger.debug(
+            'Submitting download file request: %s.', download_file_request
+        )
+        self._download_request_queue.put(download_file_request)
+        call_args = CallArgs(
+            bucket=bucket,
+            key=key,
+            filename=filename,
+            extra_args=extra_args,
+            expected_size=expected_size,
+        )
+        future = self._get_transfer_future(transfer_id, call_args)
+        return future
+
+    def shutdown(self):
+        """Shutdown the downloader
+
+        It will wait till all downloads are complete before returning.
+        """
+        self._shutdown_if_needed()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, *args):
+        if isinstance(exc_value, KeyboardInterrupt):
+            if self._transfer_monitor is not None:
+                self._transfer_monitor.notify_cancel_all_in_progress()
+        self.shutdown()
+
+    def _start_if_needed(self):
+        with self._start_lock:
+            if not self._started:
+                self._start()
+
+    def _start(self):
+        self._start_transfer_monitor_manager()
+        self._start_submitter()
+        self._start_get_object_workers()
+        self._started = True
+
+    def _validate_all_known_args(self, provided):
+        for kwarg in provided:
+            if kwarg not in ALLOWED_DOWNLOAD_ARGS:
+                download_args = ', '.join(ALLOWED_DOWNLOAD_ARGS)
+                raise ValueError(
+                    f"Invalid extra_args key '{kwarg}', "
+                    f"must be one of: {download_args}"
+                )
+
+    def _get_transfer_future(self, transfer_id, call_args):
+        meta = ProcessPoolTransferMeta(
+            call_args=call_args, transfer_id=transfer_id
+        )
+        future = ProcessPoolTransferFuture(
+            monitor=self._transfer_monitor, meta=meta
+        )
+        return future
+
+    def _start_transfer_monitor_manager(self):
+        logger.debug('Starting the TransferMonitorManager.')
+        self._manager = TransferMonitorManager()
+        # We do not want Ctrl-C's to cause the manager to shutdown immediately
+        # as worker processes will still need to communicate with it when they
+        # are shutting down. So instead we ignore Ctrl-C and let the manager
+        # be explicitly shutdown when shutting down the downloader.
+        self._manager.start(_add_ignore_handler_for_interrupts)
+        self._transfer_monitor = self._manager.TransferMonitor()
+
+    def _start_submitter(self):
+        logger.debug('Starting the GetObjectSubmitter.')
+        self._submitter = GetObjectSubmitter(
+            transfer_config=self._transfer_config,
+            client_factory=self._client_factory,
+            transfer_monitor=self._transfer_monitor,
+            osutil=self._osutil,
+            download_request_queue=self._download_request_queue,
+            worker_queue=self._worker_queue,
+        )
+        self._submitter.start()
+
+    def _start_get_object_workers(self):
+        logger.debug(
+            'Starting %s GetObjectWorkers.',
+            self._transfer_config.max_request_processes,
+        )
+        for _ in range(self._transfer_config.max_request_processes):
+            worker = GetObjectWorker(
+                queue=self._worker_queue,
+                client_factory=self._client_factory,
+                transfer_monitor=self._transfer_monitor,
+                osutil=self._osutil,
+            )
+            worker.start()
+            self._workers.append(worker)
+
+    def _shutdown_if_needed(self):
+        with self._start_lock:
+            if self._started:
+                self._shutdown()
+
+    def _shutdown(self):
+        self._shutdown_submitter()
+        self._shutdown_get_object_workers()
+        self._shutdown_transfer_monitor_manager()
+        self._started = False
+
+    def _shutdown_transfer_monitor_manager(self):
+        logger.debug('Shutting down the TransferMonitorManager.')
+        self._manager.shutdown()
+
+    def _shutdown_submitter(self):
+        logger.debug('Shutting down the GetObjectSubmitter.')
+        self._download_request_queue.put(SHUTDOWN_SIGNAL)
+        self._submitter.join()
+
+    def _shutdown_get_object_workers(self):
+        logger.debug('Shutting down the GetObjectWorkers.')
+        for _ in self._workers:
+            self._worker_queue.put(SHUTDOWN_SIGNAL)
+        for worker in self._workers:
+            worker.join()
+
+
+class ProcessPoolTransferFuture(BaseTransferFuture):
+    def __init__(self, monitor, meta):
+        """The future associated to a submitted process pool transfer request
+
+        :type monitor: TransferMonitor
+        :param monitor: The monitor associated to the process pool downloader
+
+        :type meta: ProcessPoolTransferMeta
+        :param meta: The metadata associated to the request. This object
+            is visible to the requester.
+        """
+        self._monitor = monitor
+        self._meta = meta
+
+    @property
+    def meta(self):
+        return self._meta
+
+    def done(self):
+        return self._monitor.is_done(self._meta.transfer_id)
+
+    def result(self):
+        try:
+            return self._monitor.poll_for_result(self._meta.transfer_id)
+        except KeyboardInterrupt:
+            # For the multiprocessing Manager, a thread is given a single
+            # connection to reuse in communicating between the thread in the
+            # main process and the Manager's process. If a Ctrl-C happens when
+            # polling for the result, it will make the main thread stop trying
+            # to receive from the connection, but the Manager process will not
+            # know that the main process has stopped trying to receive and
+            # will not close the connection. As a result if another message is
+            # sent to the Manager process, the listener in the Manager
+            # processes will not process the new message as it is still trying
+            # trying to process the previous message (that was Ctrl-C'd) and
+            # thus cause the thread in the main process to hang on its send.
+            # The only way around this is to create a new connection and send
+            # messages from that new connection instead.
+            self._monitor._connect()
+            self.cancel()
+            raise
+
+    def cancel(self):
+        self._monitor.notify_exception(
+            self._meta.transfer_id, CancelledError()
+        )
+
+
+class ProcessPoolTransferMeta(BaseTransferMeta):
+    """Holds metadata about the ProcessPoolTransferFuture"""
+
+    def __init__(self, transfer_id, call_args):
+        self._transfer_id = transfer_id
+        self._call_args = call_args
+        self._user_context = {}
+
+    @property
+    def call_args(self):
+        return self._call_args
+
+    @property
+    def transfer_id(self):
+        return self._transfer_id
+
+    @property
+    def user_context(self):
+        return self._user_context
+
+
+class ClientFactory:
+    def __init__(self, client_kwargs=None):
+        """Creates S3 clients for processes
+
+        Botocore sessions and clients are not pickleable so they cannot be
+        inherited across Process boundaries. Instead, they must be instantiated
+        once a process is running.
+        """
+        self._client_kwargs = client_kwargs
+        if self._client_kwargs is None:
+            self._client_kwargs = {}
+
+        client_config = deepcopy(self._client_kwargs.get('config', Config()))
+        if not client_config.user_agent_extra:
+            client_config.user_agent_extra = PROCESS_USER_AGENT
+        else:
+            client_config.user_agent_extra += " " + PROCESS_USER_AGENT
+        self._client_kwargs['config'] = client_config
+
+    def create_client(self):
+        """Create a botocore S3 client"""
+        return botocore.session.Session().create_client(
+            's3', **self._client_kwargs
+        )
+
+
+class TransferMonitor:
+    def __init__(self):
+        """Monitors transfers for cross-process communication
+
+        Notifications can be sent to the monitor and information can be
+        retrieved from the monitor for a particular transfer. This abstraction
+        is ran in a ``multiprocessing.managers.BaseManager`` in order to be
+        shared across processes.
+        """
+        # TODO: Add logic that removes the TransferState if the transfer is
+        #  marked as done and the reference to the future is no longer being
+        #  held onto. Without this logic, this dictionary will continue to
+        #  grow in size with no limit.
+        self._transfer_states = {}
+        self._id_count = 0
+        self._init_lock = threading.Lock()
+
+    def notify_new_transfer(self):
+        with self._init_lock:
+            transfer_id = self._id_count
+            self._transfer_states[transfer_id] = TransferState()
+            self._id_count += 1
+            return transfer_id
+
+    def is_done(self, transfer_id):
+        """Determine a particular transfer is complete
+
+        :param transfer_id: Unique identifier for the transfer
+        :return: True, if done. False, otherwise.
+        """
+        return self._transfer_states[transfer_id].done
+
+    def notify_done(self, transfer_id):
+        """Notify a particular transfer is complete
+
+        :param transfer_id: Unique identifier for the transfer
+        """
+        self._transfer_states[transfer_id].set_done()
+
+    def poll_for_result(self, transfer_id):
+        """Poll for the result of a transfer
+
+        :param transfer_id: Unique identifier for the transfer
+        :return: If the transfer succeeded, it will return the result. If the
+            transfer failed, it will raise the exception associated to the
+            failure.
+        """
+        self._transfer_states[transfer_id].wait_till_done()
+        exception = self._transfer_states[transfer_id].exception
+        if exception:
+            raise exception
+        return None
+
+    def notify_exception(self, transfer_id, exception):
+        """Notify an exception was encountered for a transfer
+
+        :param transfer_id: Unique identifier for the transfer
+        :param exception: The exception encountered for that transfer
+        """
+        # TODO: Not all exceptions are pickleable so if we are running
+        # this in a multiprocessing.BaseManager we will want to
+        # make sure to update this signature to ensure pickleability of the
+        # arguments or have the ProxyObject do the serialization.
+        self._transfer_states[transfer_id].exception = exception
+
+    def notify_cancel_all_in_progress(self):
+        for transfer_state in self._transfer_states.values():
+            if not transfer_state.done:
+                transfer_state.exception = CancelledError()
+
+    def get_exception(self, transfer_id):
+        """Retrieve the exception encountered for the transfer
+
+        :param transfer_id: Unique identifier for the transfer
+        :return: The exception encountered for that transfer. Otherwise
+            if there were no exceptions, returns None.
+        """
+        return self._transfer_states[transfer_id].exception
+
+    def notify_expected_jobs_to_complete(self, transfer_id, num_jobs):
+        """Notify the amount of jobs expected for a transfer
+
+        :param transfer_id: Unique identifier for the transfer
+        :param num_jobs: The number of jobs to complete the transfer
+        """
+        self._transfer_states[transfer_id].jobs_to_complete = num_jobs
+
+    def notify_job_complete(self, transfer_id):
+        """Notify that a single job is completed for a transfer
+
+        :param transfer_id: Unique identifier for the transfer
+        :return: The number of jobs remaining to complete the transfer
+        """
+        return self._transfer_states[transfer_id].decrement_jobs_to_complete()
+
+
+class TransferState:
+    """Represents the current state of an individual transfer"""
+
+    # NOTE: Ideally the TransferState object would be used directly by the
+    # various different abstractions in the ProcessPoolDownloader and remove
+    # the need for the TransferMonitor. However, it would then impose the
+    # constraint that two hops are required to make or get any changes in the
+    # state of a transfer across processes: one hop to get a proxy object for
+    # the TransferState and then a second hop to communicate calling the
+    # specific TransferState method.
+    def __init__(self):
+        self._exception = None
+        self._done_event = threading.Event()
+        self._job_lock = threading.Lock()
+        self._jobs_to_complete = 0
+
+    @property
+    def done(self):
+        return self._done_event.is_set()
+
+    def set_done(self):
+        self._done_event.set()
+
+    def wait_till_done(self):
+        self._done_event.wait(MAXINT)
+
+    @property
+    def exception(self):
+        return self._exception
+
+    @exception.setter
+    def exception(self, val):
+        self._exception = val
+
+    @property
+    def jobs_to_complete(self):
+        return self._jobs_to_complete
+
+    @jobs_to_complete.setter
+    def jobs_to_complete(self, val):
+        self._jobs_to_complete = val
+
+    def decrement_jobs_to_complete(self):
+        with self._job_lock:
+            self._jobs_to_complete -= 1
+            return self._jobs_to_complete
+
+
+class TransferMonitorManager(BaseManager):
+    pass
+
+
+TransferMonitorManager.register('TransferMonitor', TransferMonitor)
+
+
+class BaseS3TransferProcess(multiprocessing.Process):
+    def __init__(self, client_factory):
+        super().__init__()
+        self._client_factory = client_factory
+        self._client = None
+
+    def run(self):
+        # Clients are not pickleable so their instantiation cannot happen
+        # in the __init__ for processes that are created under the
+        # spawn method.
+        self._client = self._client_factory.create_client()
+        with ignore_ctrl_c():
+            # By default these processes are ran as child processes to the
+            # main process. Any Ctrl-c encountered in the main process is
+            # propagated to the child process and interrupt it at any time.
+            # To avoid any potentially bad states caused from an interrupt
+            # (i.e. a transfer failing to notify its done or making the
+            # communication protocol become out of sync with the
+            # TransferMonitor), we ignore all Ctrl-C's and allow the main
+            # process to notify these child processes when to stop processing
+            # jobs.
+            self._do_run()
+
+    def _do_run(self):
+        raise NotImplementedError('_do_run()')
+
+
+class GetObjectSubmitter(BaseS3TransferProcess):
+    def __init__(
+        self,
+        transfer_config,
+        client_factory,
+        transfer_monitor,
+        osutil,
+        download_request_queue,
+        worker_queue,
+    ):
+        """Submit GetObjectJobs to fulfill a download file request
+
+        :param transfer_config: Configuration for transfers.
+        :param client_factory: ClientFactory for creating S3 clients.
+        :param transfer_monitor: Monitor for notifying and retrieving state
+            of transfer.
+        :param osutil: OSUtils object to use for os-related behavior when
+            performing the transfer.
+        :param download_request_queue: Queue to retrieve download file
+            requests.
+        :param worker_queue: Queue to submit GetObjectJobs for workers
+            to perform.
+        """
+        super().__init__(client_factory)
+        self._transfer_config = transfer_config
+        self._transfer_monitor = transfer_monitor
+        self._osutil = osutil
+        self._download_request_queue = download_request_queue
+        self._worker_queue = worker_queue
+
+    def _do_run(self):
+        while True:
+            download_file_request = self._download_request_queue.get()
+            if download_file_request == SHUTDOWN_SIGNAL:
+                logger.debug('Submitter shutdown signal received.')
+                return
+            try:
+                self._submit_get_object_jobs(download_file_request)
+            except Exception as e:
+                logger.debug(
+                    'Exception caught when submitting jobs for '
+                    'download file request %s: %s',
+                    download_file_request,
+                    e,
+                    exc_info=True,
+                )
+                self._transfer_monitor.notify_exception(
+                    download_file_request.transfer_id, e
+                )
+                self._transfer_monitor.notify_done(
+                    download_file_request.transfer_id
+                )
+
+    def _submit_get_object_jobs(self, download_file_request):
+        size = self._get_size(download_file_request)
+        temp_filename = self._allocate_temp_file(download_file_request, size)
+        if size < self._transfer_config.multipart_threshold:
+            self._submit_single_get_object_job(
+                download_file_request, temp_filename
+            )
+        else:
+            self._submit_ranged_get_object_jobs(
+                download_file_request, temp_filename, size
+            )
+
+    def _get_size(self, download_file_request):
+        expected_size = download_file_request.expected_size
+        if expected_size is None:
+            expected_size = self._client.head_object(
+                Bucket=download_file_request.bucket,
+                Key=download_file_request.key,
+                **download_file_request.extra_args,
+            )['ContentLength']
+        return expected_size
+
+    def _allocate_temp_file(self, download_file_request, size):
+        temp_filename = self._osutil.get_temp_filename(
+            download_file_request.filename
+        )
+        self._osutil.allocate(temp_filename, size)
+        return temp_filename
+
+    def _submit_single_get_object_job(
+        self, download_file_request, temp_filename
+    ):
+        self._notify_jobs_to_complete(download_file_request.transfer_id, 1)
+        self._submit_get_object_job(
+            transfer_id=download_file_request.transfer_id,
+            bucket=download_file_request.bucket,
+            key=download_file_request.key,
+            temp_filename=temp_filename,
+            offset=0,
+            extra_args=download_file_request.extra_args,
+            filename=download_file_request.filename,
+        )
+
+    def _submit_ranged_get_object_jobs(
+        self, download_file_request, temp_filename, size
+    ):
+        part_size = self._transfer_config.multipart_chunksize
+        num_parts = calculate_num_parts(size, part_size)
+        self._notify_jobs_to_complete(
+            download_file_request.transfer_id, num_parts
+        )
+        for i in range(num_parts):
+            offset = i * part_size
+            range_parameter = calculate_range_parameter(
+                part_size, i, num_parts
+            )
+            get_object_kwargs = {'Range': range_parameter}
+            get_object_kwargs.update(download_file_request.extra_args)
+            self._submit_get_object_job(
+                transfer_id=download_file_request.transfer_id,
+                bucket=download_file_request.bucket,
+                key=download_file_request.key,
+                temp_filename=temp_filename,
+                offset=offset,
+                extra_args=get_object_kwargs,
+                filename=download_file_request.filename,
+            )
+
+    def _submit_get_object_job(self, **get_object_job_kwargs):
+        self._worker_queue.put(GetObjectJob(**get_object_job_kwargs))
+
+    def _notify_jobs_to_complete(self, transfer_id, jobs_to_complete):
+        logger.debug(
+            'Notifying %s job(s) to complete for transfer_id %s.',
+            jobs_to_complete,
+            transfer_id,
+        )
+        self._transfer_monitor.notify_expected_jobs_to_complete(
+            transfer_id, jobs_to_complete
+        )
+
+
+class GetObjectWorker(BaseS3TransferProcess):
+    # TODO: It may make sense to expose these class variables as configuration
+    # options if users want to tweak them.
+    _MAX_ATTEMPTS = 5
+    _IO_CHUNKSIZE = 2 * MB
+
+    def __init__(self, queue, client_factory, transfer_monitor, osutil):
+        """Fulfills GetObjectJobs
+
+        Downloads the S3 object, writes it to the specified file, and
+        renames the file to its final location if it completes the final
+        job for a particular transfer.
+
+        :param queue: Queue for retrieving GetObjectJob's
+        :param client_factory: ClientFactory for creating S3 clients
+        :param transfer_monitor: Monitor for notifying
+        :param osutil: OSUtils object to use for os-related behavior when
+            performing the transfer.
+        """
+        super().__init__(client_factory)
+        self._queue = queue
+        self._client_factory = client_factory
+        self._transfer_monitor = transfer_monitor
+        self._osutil = osutil
+
+    def _do_run(self):
+        while True:
+            job = self._queue.get()
+            if job == SHUTDOWN_SIGNAL:
+                logger.debug('Worker shutdown signal received.')
+                return
+            if not self._transfer_monitor.get_exception(job.transfer_id):
+                self._run_get_object_job(job)
+            else:
+                logger.debug(
+                    'Skipping get object job %s because there was a previous '
+                    'exception.',
+                    job,
+                )
+            remaining = self._transfer_monitor.notify_job_complete(
+                job.transfer_id
+            )
+            logger.debug(
+                '%s jobs remaining for transfer_id %s.',
+                remaining,
+                job.transfer_id,
+            )
+            if not remaining:
+                self._finalize_download(
+                    job.transfer_id, job.temp_filename, job.filename
+                )
+
+    def _run_get_object_job(self, job):
+        try:
+            self._do_get_object(
+                bucket=job.bucket,
+                key=job.key,
+                temp_filename=job.temp_filename,
+                extra_args=job.extra_args,
+                offset=job.offset,
+            )
+        except Exception as e:
+            logger.debug(
+                'Exception caught when downloading object for '
+                'get object job %s: %s',
+                job,
+                e,
+                exc_info=True,
+            )
+            self._transfer_monitor.notify_exception(job.transfer_id, e)
+
+    def _do_get_object(self, bucket, key, extra_args, temp_filename, offset):
+        last_exception = None
+        for i in range(self._MAX_ATTEMPTS):
+            try:
+                response = self._client.get_object(
+                    Bucket=bucket, Key=key, **extra_args
+                )
+                self._write_to_file(temp_filename, offset, response['Body'])
+                return
+            except S3_RETRYABLE_DOWNLOAD_ERRORS as e:
+                logger.debug(
+                    'Retrying exception caught (%s), '
+                    'retrying request, (attempt %s / %s)',
+                    e,
+                    i + 1,
+                    self._MAX_ATTEMPTS,
+                    exc_info=True,
+                )
+                last_exception = e
+        raise RetriesExceededError(last_exception)
+
+    def _write_to_file(self, filename, offset, body):
+        with open(filename, 'rb+') as f:
+            f.seek(offset)
+            chunks = iter(lambda: body.read(self._IO_CHUNKSIZE), b'')
+            for chunk in chunks:
+                f.write(chunk)
+
+    def _finalize_download(self, transfer_id, temp_filename, filename):
+        if self._transfer_monitor.get_exception(transfer_id):
+            self._osutil.remove_file(temp_filename)
+        else:
+            self._do_file_rename(transfer_id, temp_filename, filename)
+        self._transfer_monitor.notify_done(transfer_id)
+
+    def _do_file_rename(self, transfer_id, temp_filename, filename):
+        try:
+            self._osutil.rename_file(temp_filename, filename)
+        except Exception as e:
+            self._transfer_monitor.notify_exception(transfer_id, e)
+            self._osutil.remove_file(temp_filename)