aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/s3transfer/futures.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/s3transfer/futures.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/s3transfer/futures.py')
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/futures.py613
1 files changed, 613 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/futures.py b/.venv/lib/python3.12/site-packages/s3transfer/futures.py
new file mode 100644
index 00000000..9ab30a7a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/futures.py
@@ -0,0 +1,613 @@
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import copy
+import logging
+import sys
+import threading
+from collections import namedtuple
+from concurrent import futures
+
+from s3transfer.compat import MAXINT
+from s3transfer.exceptions import CancelledError, TransferNotDoneError
+from s3transfer.utils import FunctionContainer, TaskSemaphore
+
+try:
+ from botocore.context import get_context
+except ImportError:
+
+ def get_context():
+ return None
+
+
+logger = logging.getLogger(__name__)
+
+
+class BaseTransferFuture:
+ @property
+ def meta(self):
+ """The metadata associated to the TransferFuture"""
+ raise NotImplementedError('meta')
+
+ def done(self):
+ """Determines if a TransferFuture has completed
+
+ :returns: True if completed. False, otherwise.
+ """
+ raise NotImplementedError('done()')
+
+ def result(self):
+ """Waits until TransferFuture is done and returns the result
+
+ If the TransferFuture succeeded, it will return the result. If the
+ TransferFuture failed, it will raise the exception associated to the
+ failure.
+ """
+ raise NotImplementedError('result()')
+
+ def cancel(self):
+ """Cancels the request associated with the TransferFuture"""
+ raise NotImplementedError('cancel()')
+
+
+class BaseTransferMeta:
+ @property
+ def call_args(self):
+ """The call args used in the transfer request"""
+ raise NotImplementedError('call_args')
+
+ @property
+ def transfer_id(self):
+ """The unique id of the transfer"""
+ raise NotImplementedError('transfer_id')
+
+ @property
+ def user_context(self):
+ """A dictionary that requesters can store data in"""
+ raise NotImplementedError('user_context')
+
+
+class TransferFuture(BaseTransferFuture):
+ def __init__(self, meta=None, coordinator=None):
+ """The future associated to a submitted transfer request
+
+ :type meta: TransferMeta
+ :param meta: The metadata associated to the request. This object
+ is visible to the requester.
+
+ :type coordinator: TransferCoordinator
+ :param coordinator: The coordinator associated to the request. This
+ object is not visible to the requester.
+ """
+ self._meta = meta
+ if meta is None:
+ self._meta = TransferMeta()
+
+ self._coordinator = coordinator
+ if coordinator is None:
+ self._coordinator = TransferCoordinator()
+
+ @property
+ def meta(self):
+ return self._meta
+
+ def done(self):
+ return self._coordinator.done()
+
+ def result(self):
+ try:
+ # Usually the result() method blocks until the transfer is done,
+ # however if a KeyboardInterrupt is raised we want want to exit
+ # out of this and propagate the exception.
+ return self._coordinator.result()
+ except KeyboardInterrupt as e:
+ self.cancel()
+ raise e
+
+ def cancel(self):
+ self._coordinator.cancel()
+
+ def set_exception(self, exception):
+ """Sets the exception on the future."""
+ if not self.done():
+ raise TransferNotDoneError(
+ 'set_exception can only be called once the transfer is '
+ 'complete.'
+ )
+ self._coordinator.set_exception(exception, override=True)
+
+
+class TransferMeta(BaseTransferMeta):
+ """Holds metadata about the TransferFuture"""
+
+ def __init__(self, call_args=None, transfer_id=None):
+ self._call_args = call_args
+ self._transfer_id = transfer_id
+ self._size = None
+ self._user_context = {}
+
+ @property
+ def call_args(self):
+ """The call args used in the transfer request"""
+ return self._call_args
+
+ @property
+ def transfer_id(self):
+ """The unique id of the transfer"""
+ return self._transfer_id
+
+ @property
+ def size(self):
+ """The size of the transfer request if known"""
+ return self._size
+
+ @property
+ def user_context(self):
+ """A dictionary that requesters can store data in"""
+ return self._user_context
+
+ def provide_transfer_size(self, size):
+ """A method to provide the size of a transfer request
+
+ By providing this value, the TransferManager will not try to
+ call HeadObject or use the use OS to determine the size of the
+ transfer.
+ """
+ self._size = size
+
+
+class TransferCoordinator:
+ """A helper class for managing TransferFuture"""
+
+ def __init__(self, transfer_id=None):
+ self.transfer_id = transfer_id
+ self._status = 'not-started'
+ self._result = None
+ self._exception = None
+ self._associated_futures = set()
+ self._failure_cleanups = []
+ self._done_callbacks = []
+ self._done_event = threading.Event()
+ self._lock = threading.Lock()
+ self._associated_futures_lock = threading.Lock()
+ self._done_callbacks_lock = threading.Lock()
+ self._failure_cleanups_lock = threading.Lock()
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}(transfer_id={self.transfer_id})'
+
+ @property
+ def exception(self):
+ return self._exception
+
+ @property
+ def associated_futures(self):
+ """The list of futures associated to the inprogress TransferFuture
+
+ Once the transfer finishes this list becomes empty as the transfer
+ is considered done and there should be no running futures left.
+ """
+ with self._associated_futures_lock:
+ # We return a copy of the list because we do not want to
+ # processing the returned list while another thread is adding
+ # more futures to the actual list.
+ return copy.copy(self._associated_futures)
+
+ @property
+ def failure_cleanups(self):
+ """The list of callbacks to call when the TransferFuture fails"""
+ return self._failure_cleanups
+
+ @property
+ def status(self):
+ """The status of the TransferFuture
+
+ The currently supported states are:
+ * not-started - Has yet to start. If in this state, a transfer
+ can be canceled immediately and nothing will happen.
+ * queued - SubmissionTask is about to submit tasks
+ * running - Is inprogress. In-progress as of now means that
+ the SubmissionTask that runs the transfer is being executed. So
+ there is no guarantee any transfer requests had been made to
+ S3 if this state is reached.
+ * cancelled - Was cancelled
+ * failed - An exception other than CancelledError was thrown
+ * success - No exceptions were thrown and is done.
+ """
+ return self._status
+
+ def set_result(self, result):
+ """Set a result for the TransferFuture
+
+ Implies that the TransferFuture succeeded. This will always set a
+ result because it is invoked on the final task where there is only
+ ever one final task and it is ran at the very end of a transfer
+ process. So if a result is being set for this final task, the transfer
+ succeeded even if something came a long and canceled the transfer
+ on the final task.
+ """
+ with self._lock:
+ self._exception = None
+ self._result = result
+ self._status = 'success'
+
+ def set_exception(self, exception, override=False):
+ """Set an exception for the TransferFuture
+
+ Implies the TransferFuture failed.
+
+ :param exception: The exception that cause the transfer to fail.
+ :param override: If True, override any existing state.
+ """
+ with self._lock:
+ if not self.done() or override:
+ self._exception = exception
+ self._status = 'failed'
+
+ def result(self):
+ """Waits until TransferFuture is done and returns the result
+
+ If the TransferFuture succeeded, it will return the result. If the
+ TransferFuture failed, it will raise the exception associated to the
+ failure.
+ """
+ # Doing a wait() with no timeout cannot be interrupted in python2 but
+ # can be interrupted in python3 so we just wait with the largest
+ # possible value integer value, which is on the scale of billions of
+ # years...
+ self._done_event.wait(MAXINT)
+
+ # Once done waiting, raise an exception if present or return the
+ # final result.
+ if self._exception:
+ raise self._exception
+ return self._result
+
+ def cancel(self, msg='', exc_type=CancelledError):
+ """Cancels the TransferFuture
+
+ :param msg: The message to attach to the cancellation
+ :param exc_type: The type of exception to set for the cancellation
+ """
+ with self._lock:
+ if not self.done():
+ should_announce_done = False
+ logger.debug('%s cancel(%s) called', self, msg)
+ self._exception = exc_type(msg)
+ if self._status == 'not-started':
+ should_announce_done = True
+ self._status = 'cancelled'
+ if should_announce_done:
+ self.announce_done()
+
+ def set_status_to_queued(self):
+ """Sets the TransferFutrue's status to running"""
+ self._transition_to_non_done_state('queued')
+
+ def set_status_to_running(self):
+ """Sets the TransferFuture's status to running"""
+ self._transition_to_non_done_state('running')
+
+ def _transition_to_non_done_state(self, desired_state):
+ with self._lock:
+ if self.done():
+ raise RuntimeError(
+ f'Unable to transition from done state {self.status} to non-done '
+ f'state {desired_state}.'
+ )
+ self._status = desired_state
+
+ def submit(self, executor, task, tag=None):
+ """Submits a task to a provided executor
+
+ :type executor: s3transfer.futures.BoundedExecutor
+ :param executor: The executor to submit the callable to
+
+ :type task: s3transfer.tasks.Task
+ :param task: The task to submit to the executor
+
+ :type tag: s3transfer.futures.TaskTag
+ :param tag: A tag to associate to the submitted task
+
+ :rtype: concurrent.futures.Future
+ :returns: A future representing the submitted task
+ """
+ logger.debug(
+ f"Submitting task {task} to executor {executor} for transfer request: {self.transfer_id}."
+ )
+ future = executor.submit(task, tag=tag)
+ # Add this created future to the list of associated future just
+ # in case it is needed during cleanups.
+ self.add_associated_future(future)
+ future.add_done_callback(
+ FunctionContainer(self.remove_associated_future, future)
+ )
+ return future
+
+ def done(self):
+ """Determines if a TransferFuture has completed
+
+ :returns: False if status is equal to 'failed', 'cancelled', or
+ 'success'. True, otherwise
+ """
+ return self.status in ['failed', 'cancelled', 'success']
+
+ def add_associated_future(self, future):
+ """Adds a future to be associated with the TransferFuture"""
+ with self._associated_futures_lock:
+ self._associated_futures.add(future)
+
+ def remove_associated_future(self, future):
+ """Removes a future's association to the TransferFuture"""
+ with self._associated_futures_lock:
+ self._associated_futures.remove(future)
+
+ def add_done_callback(self, function, *args, **kwargs):
+ """Add a done callback to be invoked when transfer is done"""
+ with self._done_callbacks_lock:
+ self._done_callbacks.append(
+ FunctionContainer(function, *args, **kwargs)
+ )
+
+ def add_failure_cleanup(self, function, *args, **kwargs):
+ """Adds a callback to call upon failure"""
+ with self._failure_cleanups_lock:
+ self._failure_cleanups.append(
+ FunctionContainer(function, *args, **kwargs)
+ )
+
+ def announce_done(self):
+ """Announce that future is done running and run associated callbacks
+
+ This will run any failure cleanups if the transfer failed if not
+ they have not been run, allows the result() to be unblocked, and will
+ run any done callbacks associated to the TransferFuture if they have
+ not already been ran.
+ """
+ if self.status != 'success':
+ self._run_failure_cleanups()
+ self._done_event.set()
+ self._run_done_callbacks()
+
+ def _run_done_callbacks(self):
+ # Run the callbacks and remove the callbacks from the internal
+ # list so they do not get ran again if done is announced more than
+ # once.
+ with self._done_callbacks_lock:
+ self._run_callbacks(self._done_callbacks)
+ self._done_callbacks = []
+
+ def _run_failure_cleanups(self):
+ # Run the cleanup callbacks and remove the callbacks from the internal
+ # list so they do not get ran again if done is announced more than
+ # once.
+ with self._failure_cleanups_lock:
+ self._run_callbacks(self.failure_cleanups)
+ self._failure_cleanups = []
+
+ def _run_callbacks(self, callbacks):
+ for callback in callbacks:
+ self._run_callback(callback)
+
+ def _run_callback(self, callback):
+ try:
+ callback()
+ # We do not want a callback interrupting the process, especially
+ # in the failure cleanups. So log and catch, the exception.
+ except Exception:
+ logger.debug(f"Exception raised in {callback}.", exc_info=True)
+
+
+class BoundedExecutor:
+ EXECUTOR_CLS = futures.ThreadPoolExecutor
+
+ def __init__(
+ self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None
+ ):
+ """An executor implementation that has a maximum queued up tasks
+
+ The executor will block if the number of tasks that have been
+ submitted and is currently working on is past its maximum.
+
+ :params max_size: The maximum number of inflight futures. An inflight
+ future means that the task is either queued up or is currently
+ being executed. A size of None or 0 means that the executor will
+ have no bound in terms of the number of inflight futures.
+
+ :params max_num_threads: The maximum number of threads the executor
+ uses.
+
+ :type tag_semaphores: dict
+ :params tag_semaphores: A dictionary where the key is the name of the
+ tag and the value is the semaphore to use when limiting the
+ number of tasks the executor is processing at a time.
+
+ :type executor_cls: BaseExecutor
+ :param underlying_executor_cls: The executor class that
+ get bounded by this executor. If None is provided, the
+ concurrent.futures.ThreadPoolExecutor class is used.
+ """
+ self._max_num_threads = max_num_threads
+ if executor_cls is None:
+ executor_cls = self.EXECUTOR_CLS
+ self._executor = executor_cls(max_workers=self._max_num_threads)
+ self._semaphore = TaskSemaphore(max_size)
+ self._tag_semaphores = tag_semaphores
+
+ def submit(self, task, tag=None, block=True):
+ """Submit a task to complete
+
+ :type task: s3transfer.tasks.Task
+ :param task: The task to run __call__ on
+
+
+ :type tag: s3transfer.futures.TaskTag
+ :param tag: An optional tag to associate to the task. This
+ is used to override which semaphore to use.
+
+ :type block: boolean
+ :param block: True if to wait till it is possible to submit a task.
+ False, if not to wait and raise an error if not able to submit
+ a task.
+
+ :returns: The future associated to the submitted task
+ """
+ semaphore = self._semaphore
+ # If a tag was provided, use the semaphore associated to that
+ # tag.
+ if tag:
+ semaphore = self._tag_semaphores[tag]
+
+ # Call acquire on the semaphore.
+ acquire_token = semaphore.acquire(task.transfer_id, block)
+ # Create a callback to invoke when task is done in order to call
+ # release on the semaphore.
+ release_callback = FunctionContainer(
+ semaphore.release, task.transfer_id, acquire_token
+ )
+ # Submit the task to the underlying executor.
+ # Pass the current context to ensure child threads persist the
+ # parent thread's context.
+ future = ExecutorFuture(self._executor.submit(task, get_context()))
+ # Add the Semaphore.release() callback to the future such that
+ # it is invoked once the future completes.
+ future.add_done_callback(release_callback)
+ return future
+
+ def shutdown(self, wait=True):
+ self._executor.shutdown(wait)
+
+
+class ExecutorFuture:
+ def __init__(self, future):
+ """A future returned from the executor
+
+ Currently, it is just a wrapper around a concurrent.futures.Future.
+ However, this can eventually grow to implement the needed functionality
+ of concurrent.futures.Future if we move off of the library and not
+ affect the rest of the codebase.
+
+ :type future: concurrent.futures.Future
+ :param future: The underlying future
+ """
+ self._future = future
+
+ def result(self):
+ return self._future.result()
+
+ def add_done_callback(self, fn):
+ """Adds a callback to be completed once future is done
+
+ :param fn: A callable that takes no arguments. Note that is different
+ than concurrent.futures.Future.add_done_callback that requires
+ a single argument for the future.
+ """
+
+ # The done callback for concurrent.futures.Future will always pass a
+ # the future in as the only argument. So we need to create the
+ # proper signature wrapper that will invoke the callback provided.
+ def done_callback(future_passed_to_callback):
+ return fn()
+
+ self._future.add_done_callback(done_callback)
+
+ def done(self):
+ return self._future.done()
+
+
+class BaseExecutor:
+ """Base Executor class implementation needed to work with s3transfer"""
+
+ def __init__(self, max_workers=None):
+ pass
+
+ def submit(self, fn, *args, **kwargs):
+ raise NotImplementedError('submit()')
+
+ def shutdown(self, wait=True):
+ raise NotImplementedError('shutdown()')
+
+
+class NonThreadedExecutor(BaseExecutor):
+ """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
+
+ def submit(self, fn, *args, **kwargs):
+ future = NonThreadedExecutorFuture()
+ try:
+ result = fn(*args, **kwargs)
+ future.set_result(result)
+ except Exception:
+ e, tb = sys.exc_info()[1:]
+ logger.debug(
+ 'Setting exception for %s to %s with traceback %s',
+ future,
+ e,
+ tb,
+ )
+ future.set_exception_info(e, tb)
+ return future
+
+ def shutdown(self, wait=True):
+ pass
+
+
+class NonThreadedExecutorFuture:
+ """The Future returned from NonThreadedExecutor
+
+ Note that this future is **not** thread-safe as it is being used
+ from the context of a non-threaded environment.
+ """
+
+ def __init__(self):
+ self._result = None
+ self._exception = None
+ self._traceback = None
+ self._done = False
+ self._done_callbacks = []
+
+ def set_result(self, result):
+ self._result = result
+ self._set_done()
+
+ def set_exception_info(self, exception, traceback):
+ self._exception = exception
+ self._traceback = traceback
+ self._set_done()
+
+ def result(self, timeout=None):
+ if self._exception:
+ raise self._exception.with_traceback(self._traceback)
+ return self._result
+
+ def _set_done(self):
+ self._done = True
+ for done_callback in self._done_callbacks:
+ self._invoke_done_callback(done_callback)
+ self._done_callbacks = []
+
+ def _invoke_done_callback(self, done_callback):
+ return done_callback(self)
+
+ def done(self):
+ return self._done
+
+ def add_done_callback(self, fn):
+ if self._done:
+ self._invoke_done_callback(fn)
+ else:
+ self._done_callbacks.append(fn)
+
+
+TaskTag = namedtuple('TaskTag', ['name'])
+
+IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
+IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')