about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py')
-rw-r--r--.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py437
1 files changed, 437 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py b/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py
new file mode 100644
index 00000000..9301c8e3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/s3transfer/bandwidth.py
@@ -0,0 +1,437 @@
+# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import threading
+import time
+
+
+class RequestExceededException(Exception):
+    def __init__(self, requested_amt, retry_time):
+        """Error when requested amount exceeds what is allowed
+
+        The request that raised this error should be retried after waiting
+        the time specified by ``retry_time``.
+
+        :type requested_amt: int
+        :param requested_amt: The originally requested byte amount
+
+        :type retry_time: float
+        :param retry_time: The length in time to wait to retry for the
+            requested amount
+        """
+        self.requested_amt = requested_amt
+        self.retry_time = retry_time
+        msg = f'Request amount {requested_amt} exceeded the amount available. Retry in {retry_time}'
+        super().__init__(msg)
+
+
+class RequestToken:
+    """A token to pass as an identifier when consuming from the LeakyBucket"""
+
+    pass
+
+
+class TimeUtils:
+    def time(self):
+        """Get the current time back
+
+        :rtype: float
+        :returns: The current time in seconds
+        """
+        return time.time()
+
+    def sleep(self, value):
+        """Sleep for a designated time
+
+        :type value: float
+        :param value: The time to sleep for in seconds
+        """
+        return time.sleep(value)
+
+
+class BandwidthLimiter:
+    def __init__(self, leaky_bucket, time_utils=None):
+        """Limits bandwidth for shared S3 transfers
+
+        :type leaky_bucket: LeakyBucket
+        :param leaky_bucket: The leaky bucket to use limit bandwidth
+
+        :type time_utils: TimeUtils
+        :param time_utils: Time utility to use for interacting with time.
+        """
+        self._leaky_bucket = leaky_bucket
+        self._time_utils = time_utils
+        if time_utils is None:
+            self._time_utils = TimeUtils()
+
+    def get_bandwith_limited_stream(
+        self, fileobj, transfer_coordinator, enabled=True
+    ):
+        """Wraps a fileobj in a bandwidth limited stream wrapper
+
+        :type fileobj: file-like obj
+        :param fileobj: The file-like obj to wrap
+
+        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+        param transfer_coordinator: The coordinator for the general transfer
+            that the wrapped stream is a part of
+
+        :type enabled: boolean
+        :param enabled: Whether bandwidth limiting should be enabled to start
+        """
+        stream = BandwidthLimitedStream(
+            fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils
+        )
+        if not enabled:
+            stream.disable_bandwidth_limiting()
+        return stream
+
+
+class BandwidthLimitedStream:
+    def __init__(
+        self,
+        fileobj,
+        leaky_bucket,
+        transfer_coordinator,
+        time_utils=None,
+        bytes_threshold=256 * 1024,
+    ):
+        """Limits bandwidth for reads on a wrapped stream
+
+        :type fileobj: file-like object
+        :param fileobj: The file like object to wrap
+
+        :type leaky_bucket: LeakyBucket
+        :param leaky_bucket: The leaky bucket to use to throttle reads on
+            the stream
+
+        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+        param transfer_coordinator: The coordinator for the general transfer
+            that the wrapped stream is a part of
+
+        :type time_utils: TimeUtils
+        :param time_utils: The time utility to use for interacting with time
+        """
+        self._fileobj = fileobj
+        self._leaky_bucket = leaky_bucket
+        self._transfer_coordinator = transfer_coordinator
+        self._time_utils = time_utils
+        if time_utils is None:
+            self._time_utils = TimeUtils()
+        self._bandwidth_limiting_enabled = True
+        self._request_token = RequestToken()
+        self._bytes_seen = 0
+        self._bytes_threshold = bytes_threshold
+
+    def enable_bandwidth_limiting(self):
+        """Enable bandwidth limiting on reads to the stream"""
+        self._bandwidth_limiting_enabled = True
+
+    def disable_bandwidth_limiting(self):
+        """Disable bandwidth limiting on reads to the stream"""
+        self._bandwidth_limiting_enabled = False
+
+    def read(self, amount):
+        """Read a specified amount
+
+        Reads will only be throttled if bandwidth limiting is enabled.
+        """
+        if not self._bandwidth_limiting_enabled:
+            return self._fileobj.read(amount)
+
+        # We do not want to be calling consume on every read as the read
+        # amounts can be small causing the lock of the leaky bucket to
+        # introduce noticeable overhead. So instead we keep track of
+        # how many bytes we have seen and only call consume once we pass a
+        # certain threshold.
+        self._bytes_seen += amount
+        if self._bytes_seen < self._bytes_threshold:
+            return self._fileobj.read(amount)
+
+        self._consume_through_leaky_bucket()
+        return self._fileobj.read(amount)
+
+    def _consume_through_leaky_bucket(self):
+        # NOTE: If the read amount on the stream are high, it will result
+        # in large bursty behavior as there is not an interface for partial
+        # reads. However given the read's on this abstraction are at most 256KB
+        # (via downloads), it reduces the burstiness to be small KB bursts at
+        # worst.
+        while not self._transfer_coordinator.exception:
+            try:
+                self._leaky_bucket.consume(
+                    self._bytes_seen, self._request_token
+                )
+                self._bytes_seen = 0
+                return
+            except RequestExceededException as e:
+                self._time_utils.sleep(e.retry_time)
+        else:
+            raise self._transfer_coordinator.exception
+
+    def signal_transferring(self):
+        """Signal that data being read is being transferred to S3"""
+        self.enable_bandwidth_limiting()
+
+    def signal_not_transferring(self):
+        """Signal that data being read is not being transferred to S3"""
+        self.disable_bandwidth_limiting()
+
+    def seek(self, where, whence=0):
+        self._fileobj.seek(where, whence)
+
+    def tell(self):
+        return self._fileobj.tell()
+
+    def close(self):
+        if self._bandwidth_limiting_enabled and self._bytes_seen:
+            # This handles the case where the file is small enough to never
+            # trigger the threshold and thus is never subjugated to the
+            # leaky bucket on read(). This specifically happens for small
+            # uploads. So instead to account for those bytes, have
+            # it go through the leaky bucket when the file gets closed.
+            self._consume_through_leaky_bucket()
+        self._fileobj.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
+
+class LeakyBucket:
+    def __init__(
+        self,
+        max_rate,
+        time_utils=None,
+        rate_tracker=None,
+        consumption_scheduler=None,
+    ):
+        """A leaky bucket abstraction to limit bandwidth consumption
+
+        :type rate: int
+        :type rate: The maximum rate to allow. This rate is in terms of
+            bytes per second.
+
+        :type time_utils: TimeUtils
+        :param time_utils: The time utility to use for interacting with time
+
+        :type rate_tracker: BandwidthRateTracker
+        :param rate_tracker: Tracks bandwidth consumption
+
+        :type consumption_scheduler: ConsumptionScheduler
+        :param consumption_scheduler: Schedules consumption retries when
+            necessary
+        """
+        self._max_rate = float(max_rate)
+        self._time_utils = time_utils
+        if time_utils is None:
+            self._time_utils = TimeUtils()
+        self._lock = threading.Lock()
+        self._rate_tracker = rate_tracker
+        if rate_tracker is None:
+            self._rate_tracker = BandwidthRateTracker()
+        self._consumption_scheduler = consumption_scheduler
+        if consumption_scheduler is None:
+            self._consumption_scheduler = ConsumptionScheduler()
+
+    def consume(self, amt, request_token):
+        """Consume an a requested amount
+
+        :type amt: int
+        :param amt: The amount of bytes to request to consume
+
+        :type request_token: RequestToken
+        :param request_token: The token associated to the consumption
+            request that is used to identify the request. So if a
+            RequestExceededException is raised the token should be used
+            in subsequent retry consume() request.
+
+        :raises RequestExceededException: If the consumption amount would
+            exceed the maximum allocated bandwidth
+
+        :rtype: int
+        :returns: The amount consumed
+        """
+        with self._lock:
+            time_now = self._time_utils.time()
+            if self._consumption_scheduler.is_scheduled(request_token):
+                return self._release_requested_amt_for_scheduled_request(
+                    amt, request_token, time_now
+                )
+            elif self._projected_to_exceed_max_rate(amt, time_now):
+                self._raise_request_exceeded_exception(
+                    amt, request_token, time_now
+                )
+            else:
+                return self._release_requested_amt(amt, time_now)
+
+    def _projected_to_exceed_max_rate(self, amt, time_now):
+        projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
+        return projected_rate > self._max_rate
+
+    def _release_requested_amt_for_scheduled_request(
+        self, amt, request_token, time_now
+    ):
+        self._consumption_scheduler.process_scheduled_consumption(
+            request_token
+        )
+        return self._release_requested_amt(amt, time_now)
+
+    def _raise_request_exceeded_exception(self, amt, request_token, time_now):
+        allocated_time = amt / float(self._max_rate)
+        retry_time = self._consumption_scheduler.schedule_consumption(
+            amt, request_token, allocated_time
+        )
+        raise RequestExceededException(
+            requested_amt=amt, retry_time=retry_time
+        )
+
+    def _release_requested_amt(self, amt, time_now):
+        self._rate_tracker.record_consumption_rate(amt, time_now)
+        return amt
+
+
+class ConsumptionScheduler:
+    def __init__(self):
+        """Schedules when to consume a desired amount"""
+        self._tokens_to_scheduled_consumption = {}
+        self._total_wait = 0
+
+    def is_scheduled(self, token):
+        """Indicates if a consumption request has been scheduled
+
+        :type token: RequestToken
+        :param token: The token associated to the consumption
+            request that is used to identify the request.
+        """
+        return token in self._tokens_to_scheduled_consumption
+
+    def schedule_consumption(self, amt, token, time_to_consume):
+        """Schedules a wait time to be able to consume an amount
+
+        :type amt: int
+        :param amt: The amount of bytes scheduled to be consumed
+
+        :type token: RequestToken
+        :param token: The token associated to the consumption
+            request that is used to identify the request.
+
+        :type time_to_consume: float
+        :param time_to_consume: The desired time it should take for that
+            specific request amount to be consumed in regardless of previously
+            scheduled consumption requests
+
+        :rtype: float
+        :returns: The amount of time to wait for the specific request before
+            actually consuming the specified amount.
+        """
+        self._total_wait += time_to_consume
+        self._tokens_to_scheduled_consumption[token] = {
+            'wait_duration': self._total_wait,
+            'time_to_consume': time_to_consume,
+        }
+        return self._total_wait
+
+    def process_scheduled_consumption(self, token):
+        """Processes a scheduled consumption request that has completed
+
+        :type token: RequestToken
+        :param token: The token associated to the consumption
+            request that is used to identify the request.
+        """
+        scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
+        self._total_wait = max(
+            self._total_wait - scheduled_retry['time_to_consume'], 0
+        )
+
+
+class BandwidthRateTracker:
+    def __init__(self, alpha=0.8):
+        """Tracks the rate of bandwidth consumption
+
+        :type a: float
+        :param a: The constant to use in calculating the exponentional moving
+            average of the bandwidth rate. Specifically it is used in the
+            following calculation:
+
+            current_rate = alpha * new_rate + (1 - alpha) * current_rate
+
+            This value of this constant should be between 0 and 1.
+        """
+        self._alpha = alpha
+        self._last_time = None
+        self._current_rate = None
+
+    @property
+    def current_rate(self):
+        """The current transfer rate
+
+        :rtype: float
+        :returns: The current tracked transfer rate
+        """
+        if self._last_time is None:
+            return 0.0
+        return self._current_rate
+
+    def get_projected_rate(self, amt, time_at_consumption):
+        """Get the projected rate using a provided amount and time
+
+        :type amt: int
+        :param amt: The proposed amount to consume
+
+        :type time_at_consumption: float
+        :param time_at_consumption: The proposed time to consume at
+
+        :rtype: float
+        :returns: The consumption rate if that amt and time were consumed
+        """
+        if self._last_time is None:
+            return 0.0
+        return self._calculate_exponential_moving_average_rate(
+            amt, time_at_consumption
+        )
+
+    def record_consumption_rate(self, amt, time_at_consumption):
+        """Record the consumption rate based off amount and time point
+
+        :type amt: int
+        :param amt: The amount that got consumed
+
+        :type time_at_consumption: float
+        :param time_at_consumption: The time at which the amount was consumed
+        """
+        if self._last_time is None:
+            self._last_time = time_at_consumption
+            self._current_rate = 0.0
+            return
+        self._current_rate = self._calculate_exponential_moving_average_rate(
+            amt, time_at_consumption
+        )
+        self._last_time = time_at_consumption
+
+    def _calculate_rate(self, amt, time_at_consumption):
+        time_delta = time_at_consumption - self._last_time
+        if time_delta <= 0:
+            # While it is really unlikely to see this in an actual transfer,
+            # we do not want to be returning back a negative rate or try to
+            # divide the amount by zero. So instead return back an infinite
+            # rate as the time delta is infinitesimally small.
+            return float('inf')
+        return amt / (time_delta)
+
+    def _calculate_exponential_moving_average_rate(
+        self, amt, time_at_consumption
+    ):
+        new_rate = self._calculate_rate(amt, time_at_consumption)
+        return self._alpha * new_rate + (1 - self._alpha) * self._current_rate