diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/botocore/retries/adaptive.py | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py b/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py new file mode 100644 index 00000000..5e638ddb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py @@ -0,0 +1,132 @@ +import logging +import math +import threading + +from botocore.retries import bucket, standard, throttling + +logger = logging.getLogger(__name__) + + +def register_retry_handler(client): + clock = bucket.Clock() + rate_adjustor = throttling.CubicCalculator( + starting_max_rate=0, start_time=clock.current_time() + ) + token_bucket = bucket.TokenBucket(max_rate=1, clock=clock) + rate_clocker = RateClocker(clock) + throttling_detector = standard.ThrottlingErrorDetector( + retry_event_adapter=standard.RetryEventAdapter(), + ) + limiter = ClientRateLimiter( + rate_adjustor=rate_adjustor, + rate_clocker=rate_clocker, + token_bucket=token_bucket, + throttling_detector=throttling_detector, + clock=clock, + ) + client.meta.events.register( + 'before-send', + limiter.on_sending_request, + ) + client.meta.events.register( + 'needs-retry', + limiter.on_receiving_response, + ) + return limiter + + +class ClientRateLimiter: + _MAX_RATE_ADJUST_SCALE = 2.0 + + def __init__( + self, + rate_adjustor, + rate_clocker, + token_bucket, + throttling_detector, + clock, + ): + self._rate_adjustor = rate_adjustor + self._rate_clocker = rate_clocker + self._token_bucket = token_bucket + self._throttling_detector = throttling_detector + self._clock = clock + self._enabled = False + self._lock = threading.Lock() + + def on_sending_request(self, request, **kwargs): + if self._enabled: + self._token_bucket.acquire() + + # Hooked up to needs-retry. + def on_receiving_response(self, **kwargs): + measured_rate = self._rate_clocker.record() + timestamp = self._clock.current_time() + with self._lock: + if not self._throttling_detector.is_throttling_error(**kwargs): + new_rate = self._rate_adjustor.success_received(timestamp) + else: + if not self._enabled: + rate_to_use = measured_rate + else: + rate_to_use = min( + measured_rate, self._token_bucket.max_rate + ) + new_rate = self._rate_adjustor.error_received( + rate_to_use, timestamp + ) + logger.debug( + "Throttling response received, new send rate: %s " + "measured rate: %s, token bucket capacity " + "available: %s", + new_rate, + measured_rate, + self._token_bucket.available_capacity, + ) + self._enabled = True + self._token_bucket.max_rate = min( + new_rate, self._MAX_RATE_ADJUST_SCALE * measured_rate + ) + + +class RateClocker: + """Tracks the rate at which a client is sending a request.""" + + _DEFAULT_SMOOTHING = 0.8 + # Update the rate every _TIME_BUCKET_RANGE seconds. + _TIME_BUCKET_RANGE = 0.5 + + def __init__( + self, + clock, + smoothing=_DEFAULT_SMOOTHING, + time_bucket_range=_TIME_BUCKET_RANGE, + ): + self._clock = clock + self._measured_rate = 0 + self._smoothing = smoothing + self._last_bucket = math.floor(self._clock.current_time()) + self._time_bucket_scale = 1 / self._TIME_BUCKET_RANGE + self._count = 0 + self._lock = threading.Lock() + + def record(self, amount=1): + with self._lock: + t = self._clock.current_time() + bucket = ( + math.floor(t * self._time_bucket_scale) + / self._time_bucket_scale + ) + self._count += amount + if bucket > self._last_bucket: + current_rate = self._count / float(bucket - self._last_bucket) + self._measured_rate = (current_rate * self._smoothing) + ( + self._measured_rate * (1 - self._smoothing) + ) + self._count = 0 + self._last_bucket = bucket + return self._measured_rate + + @property + def measured_rate(self): + return self._measured_rate |