about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/s3transfer/tasks.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/s3transfer/tasks.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/s3transfer/tasks.py')
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/tasks.py390
1 files changed, 390 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/tasks.py b/.venv/lib/python3.12/site-packages/s3transfer/tasks.py
new file mode 100644
index 00000000..4ed3b88c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/tasks.py
@@ -0,0 +1,390 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import copy
+import logging
+
+from s3transfer.utils import get_callbacks
+
+try:
+    from botocore.context import start_as_current_context
+except ImportError:
+    from contextlib import nullcontext as start_as_current_context
+
+
+logger = logging.getLogger(__name__)
+
+
+class Task:
+    """A task associated to a TransferFuture request
+
+    This is a base class for other classes to subclass from. All subclassed
+    classes must implement the main() method.
+    """
+
+    def __init__(
+        self,
+        transfer_coordinator,
+        main_kwargs=None,
+        pending_main_kwargs=None,
+        done_callbacks=None,
+        is_final=False,
+    ):
+        """
+        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+        :param transfer_coordinator: The context associated to the
+            TransferFuture for which this Task is associated with.
+
+        :type main_kwargs: dict
+        :param main_kwargs: The keyword args that can be immediately supplied
+            to the _main() method of the task
+
+        :type pending_main_kwargs: dict
+        :param pending_main_kwargs: The keyword args that are depended upon
+            by the result from a dependent future(s). The result returned by
+            the future(s) will be used as the value for the keyword argument
+            when _main() is called. The values for each key can be:
+                * a single future - Once completed, its value will be the
+                  result of that single future
+                * a list of futures - Once all of the futures complete, the
+                  value used will be a list of each completed future result
+                  value in order of when they were originally supplied.
+
+        :type done_callbacks: list of callbacks
+        :param done_callbacks: A list of callbacks to call once the task is
+            done completing. Each callback will be called with no arguments
+            and will be called no matter if the task succeeds or an exception
+            is raised.
+
+        :type is_final: boolean
+        :param is_final: True, to indicate that this task is the final task
+            for the TransferFuture request. By setting this value to True, it
+            will set the result of the entire TransferFuture to the result
+            returned by this task's main() method.
+        """
+        self._transfer_coordinator = transfer_coordinator
+
+        self._main_kwargs = main_kwargs
+        if self._main_kwargs is None:
+            self._main_kwargs = {}
+
+        self._pending_main_kwargs = pending_main_kwargs
+        if pending_main_kwargs is None:
+            self._pending_main_kwargs = {}
+
+        self._done_callbacks = done_callbacks
+        if self._done_callbacks is None:
+            self._done_callbacks = []
+
+        self._is_final = is_final
+
+    def __repr__(self):
+        # These are the general main_kwarg parameters that we want to
+        # display in the repr.
+        params_to_display = [
+            'bucket',
+            'key',
+            'part_number',
+            'final_filename',
+            'transfer_future',
+            'offset',
+            'extra_args',
+        ]
+        main_kwargs_to_display = self._get_kwargs_with_params_to_include(
+            self._main_kwargs, params_to_display
+        )
+        return f'{self.__class__.__name__}(transfer_id={self._transfer_coordinator.transfer_id}, {main_kwargs_to_display})'
+
+    @property
+    def transfer_id(self):
+        """The id for the transfer request that the task belongs to"""
+        return self._transfer_coordinator.transfer_id
+
+    def _get_kwargs_with_params_to_include(self, kwargs, include):
+        filtered_kwargs = {}
+        for param in include:
+            if param in kwargs:
+                filtered_kwargs[param] = kwargs[param]
+        return filtered_kwargs
+
+    def _get_kwargs_with_params_to_exclude(self, kwargs, exclude):
+        filtered_kwargs = {}
+        for param, value in kwargs.items():
+            if param in exclude:
+                continue
+            filtered_kwargs[param] = value
+        return filtered_kwargs
+
+    def __call__(self, ctx=None):
+        """The callable to use when submitting a Task to an executor"""
+        with start_as_current_context(ctx):
+            try:
+                # Wait for all of futures this task depends on.
+                self._wait_on_dependent_futures()
+                # Gather up all of the main keyword arguments for main().
+                # This includes the immediately provided main_kwargs and
+                # the values for pending_main_kwargs that source from the return
+                # values from the task's dependent futures.
+                kwargs = self._get_all_main_kwargs()
+                # If the task is not done (really only if some other related
+                # task to the TransferFuture had failed) then execute the task's
+                # main() method.
+                if not self._transfer_coordinator.done():
+                    return self._execute_main(kwargs)
+            except Exception as e:
+                self._log_and_set_exception(e)
+            finally:
+                # Run any done callbacks associated to the task no matter what.
+                for done_callback in self._done_callbacks:
+                    done_callback()
+
+                if self._is_final:
+                    # If this is the final task announce that it is done if results
+                    # are waiting on its completion.
+                    self._transfer_coordinator.announce_done()
+
+    def _execute_main(self, kwargs):
+        # Do not display keyword args that should not be printed, especially
+        # if they are going to make the logs hard to follow.
+        params_to_exclude = ['data']
+        kwargs_to_display = self._get_kwargs_with_params_to_exclude(
+            kwargs, params_to_exclude
+        )
+        # Log what is about to be executed.
+        logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}")
+
+        return_value = self._main(**kwargs)
+        # If the task is the final task, then set the TransferFuture's
+        # value to the return value from main().
+        if self._is_final:
+            self._transfer_coordinator.set_result(return_value)
+        return return_value
+
+    def _log_and_set_exception(self, exception):
+        # If an exception is ever thrown than set the exception for the
+        # entire TransferFuture.
+        logger.debug("Exception raised.", exc_info=True)
+        self._transfer_coordinator.set_exception(exception)
+
+    def _main(self, **kwargs):
+        """The method that will be ran in the executor
+
+        This method must be implemented by subclasses from Task. main() can
+        be implemented with any arguments decided upon by the subclass.
+        """
+        raise NotImplementedError('_main() must be implemented')
+
+    def _wait_on_dependent_futures(self):
+        # Gather all of the futures into that main() depends on.
+        futures_to_wait_on = []
+        for _, future in self._pending_main_kwargs.items():
+            # If the pending main keyword arg is a list then extend the list.
+            if isinstance(future, list):
+                futures_to_wait_on.extend(future)
+            # If the pending main keyword arg is a future append it to the list.
+            else:
+                futures_to_wait_on.append(future)
+        # Now wait for all of the futures to complete.
+        self._wait_until_all_complete(futures_to_wait_on)
+
+    def _wait_until_all_complete(self, futures):
+        # This is a basic implementation of the concurrent.futures.wait()
+        #
+        # concurrent.futures.wait() is not used instead because of this
+        # reported issue: https://bugs.python.org/issue20319.
+        # The issue would occasionally cause multipart uploads to hang
+        # when wait() was called. With this approach, it avoids the
+        # concurrency bug by removing any association with concurrent.futures
+        # implementation of waiters.
+        logger.debug(
+            '%s about to wait for the following futures %s', self, futures
+        )
+        for future in futures:
+            try:
+                logger.debug('%s about to wait for %s', self, future)
+                future.result()
+            except Exception:
+                # result() can also produce exceptions. We want to ignore
+                # these to be deferred to error handling down the road.
+                pass
+        logger.debug('%s done waiting for dependent futures', self)
+
+    def _get_all_main_kwargs(self):
+        # Copy over all of the kwargs that we know is available.
+        kwargs = copy.copy(self._main_kwargs)
+
+        # Iterate through the kwargs whose values are pending on the result
+        # of a future.
+        for key, pending_value in self._pending_main_kwargs.items():
+            # If the value is a list of futures, iterate though the list
+            # appending on the result from each future.
+            if isinstance(pending_value, list):
+                result = []
+                for future in pending_value:
+                    result.append(future.result())
+            # Otherwise if the pending_value is a future, just wait for it.
+            else:
+                result = pending_value.result()
+            # Add the retrieved value to the kwargs to be sent to the
+            # main() call.
+            kwargs[key] = result
+        return kwargs
+
+
+class SubmissionTask(Task):
+    """A base class for any submission task
+
+    Submission tasks are the top-level task used to submit a series of tasks
+    to execute a particular transfer.
+    """
+
+    def _main(self, transfer_future, **kwargs):
+        """
+        :type transfer_future: s3transfer.futures.TransferFuture
+        :param transfer_future: The transfer future associated with the
+            transfer request that tasks are being submitted for
+
+        :param kwargs: Any additional kwargs that you may want to pass
+            to the _submit() method
+        """
+        try:
+            self._transfer_coordinator.set_status_to_queued()
+
+            # Before submitting any tasks, run all of the on_queued callbacks
+            on_queued_callbacks = get_callbacks(transfer_future, 'queued')
+            for on_queued_callback in on_queued_callbacks:
+                on_queued_callback()
+
+            # Once callbacks have been ran set the status to running.
+            self._transfer_coordinator.set_status_to_running()
+
+            # Call the submit method to start submitting tasks to execute the
+            # transfer.
+            self._submit(transfer_future=transfer_future, **kwargs)
+        except BaseException as e:
+            # If there was an exception raised during the submission of task
+            # there is a chance that the final task that signals if a transfer
+            # is done and too run the cleanup may never have been submitted in
+            # the first place so we need to account accordingly.
+            #
+            # Note that BaseException is caught, instead of Exception, because
+            # for some implementations of executors, specifically the serial
+            # implementation, the SubmissionTask is directly exposed to
+            # KeyboardInterupts and so needs to cleanup and signal done
+            # for those as well.
+
+            # Set the exception, that caused the process to fail.
+            self._log_and_set_exception(e)
+
+            # Wait for all possibly associated futures that may have spawned
+            # from this submission task have finished before we announce the
+            # transfer done.
+            self._wait_for_all_submitted_futures_to_complete()
+
+            # Announce the transfer as done, which will run any cleanups
+            # and done callbacks as well.
+            self._transfer_coordinator.announce_done()
+
+    def _submit(self, transfer_future, **kwargs):
+        """The submission method to be implemented
+
+        :type transfer_future: s3transfer.futures.TransferFuture
+        :param transfer_future: The transfer future associated with the
+            transfer request that tasks are being submitted for
+
+        :param kwargs: Any additional keyword arguments you want to be passed
+            in
+        """
+        raise NotImplementedError('_submit() must be implemented')
+
+    def _wait_for_all_submitted_futures_to_complete(self):
+        # We want to wait for all futures that were submitted to
+        # complete as we do not want the cleanup callbacks or done callbacks
+        # to be called to early. The main problem is any task that was
+        # submitted may have submitted even more during its process and so
+        # we need to account accordingly.
+
+        # First get all of the futures that were submitted up to this point.
+        submitted_futures = self._transfer_coordinator.associated_futures
+        while submitted_futures:
+            # Wait for those futures to complete.
+            self._wait_until_all_complete(submitted_futures)
+            # However, more futures may have been submitted as we waited so
+            # we need to check again for any more associated futures.
+            possibly_more_submitted_futures = (
+                self._transfer_coordinator.associated_futures
+            )
+            # If the current list of submitted futures is equal to the
+            # the list of associated futures for when after the wait completes,
+            # we can ensure no more futures were submitted in waiting on
+            # the current list of futures to complete ultimately meaning all
+            # futures that may have spawned from the original submission task
+            # have completed.
+            if submitted_futures == possibly_more_submitted_futures:
+                break
+            submitted_futures = possibly_more_submitted_futures
+
+
+class CreateMultipartUploadTask(Task):
+    """Task to initiate a multipart upload"""
+
+    def _main(self, client, bucket, key, extra_args):
+        """
+        :param client: The client to use when calling CreateMultipartUpload
+        :param bucket: The name of the bucket to upload to
+        :param key: The name of the key to upload to
+        :param extra_args: A dictionary of any extra arguments that may be
+            used in the initialization.
+
+        :returns: The upload id of the multipart upload
+        """
+        # Create the multipart upload.
+        response = client.create_multipart_upload(
+            Bucket=bucket, Key=key, **extra_args
+        )
+        upload_id = response['UploadId']
+
+        # Add a cleanup if the multipart upload fails at any point.
+        self._transfer_coordinator.add_failure_cleanup(
+            client.abort_multipart_upload,
+            Bucket=bucket,
+            Key=key,
+            UploadId=upload_id,
+        )
+        return upload_id
+
+
+class CompleteMultipartUploadTask(Task):
+    """Task to complete a multipart upload"""
+
+    def _main(self, client, bucket, key, upload_id, parts, extra_args):
+        """
+        :param client: The client to use when calling CompleteMultipartUpload
+        :param bucket: The name of the bucket to upload to
+        :param key: The name of the key to upload to
+        :param upload_id: The id of the upload
+        :param parts: A list of parts to use to complete the multipart upload::
+
+            [{'Etag': etag_value, 'PartNumber': part_number}, ...]
+
+            Each element in the list consists of a return value from
+            ``UploadPartTask.main()``.
+        :param extra_args:  A dictionary of any extra arguments that may be
+            used in completing the multipart transfer.
+        """
+        client.complete_multipart_upload(
+            Bucket=bucket,
+            Key=key,
+            UploadId=upload_id,
+            MultipartUpload={'Parts': parts},
+            **extra_args,
+        )