aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py132
1 files changed, 132 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py b/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py
new file mode 100644
index 00000000..5e638ddb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py
@@ -0,0 +1,132 @@
+import logging
+import math
+import threading
+
+from botocore.retries import bucket, standard, throttling
+
+logger = logging.getLogger(__name__)
+
+
+def register_retry_handler(client):
+ clock = bucket.Clock()
+ rate_adjustor = throttling.CubicCalculator(
+ starting_max_rate=0, start_time=clock.current_time()
+ )
+ token_bucket = bucket.TokenBucket(max_rate=1, clock=clock)
+ rate_clocker = RateClocker(clock)
+ throttling_detector = standard.ThrottlingErrorDetector(
+ retry_event_adapter=standard.RetryEventAdapter(),
+ )
+ limiter = ClientRateLimiter(
+ rate_adjustor=rate_adjustor,
+ rate_clocker=rate_clocker,
+ token_bucket=token_bucket,
+ throttling_detector=throttling_detector,
+ clock=clock,
+ )
+ client.meta.events.register(
+ 'before-send',
+ limiter.on_sending_request,
+ )
+ client.meta.events.register(
+ 'needs-retry',
+ limiter.on_receiving_response,
+ )
+ return limiter
+
+
+class ClientRateLimiter:
+ _MAX_RATE_ADJUST_SCALE = 2.0
+
+ def __init__(
+ self,
+ rate_adjustor,
+ rate_clocker,
+ token_bucket,
+ throttling_detector,
+ clock,
+ ):
+ self._rate_adjustor = rate_adjustor
+ self._rate_clocker = rate_clocker
+ self._token_bucket = token_bucket
+ self._throttling_detector = throttling_detector
+ self._clock = clock
+ self._enabled = False
+ self._lock = threading.Lock()
+
+ def on_sending_request(self, request, **kwargs):
+ if self._enabled:
+ self._token_bucket.acquire()
+
+ # Hooked up to needs-retry.
+ def on_receiving_response(self, **kwargs):
+ measured_rate = self._rate_clocker.record()
+ timestamp = self._clock.current_time()
+ with self._lock:
+ if not self._throttling_detector.is_throttling_error(**kwargs):
+ new_rate = self._rate_adjustor.success_received(timestamp)
+ else:
+ if not self._enabled:
+ rate_to_use = measured_rate
+ else:
+ rate_to_use = min(
+ measured_rate, self._token_bucket.max_rate
+ )
+ new_rate = self._rate_adjustor.error_received(
+ rate_to_use, timestamp
+ )
+ logger.debug(
+ "Throttling response received, new send rate: %s "
+ "measured rate: %s, token bucket capacity "
+ "available: %s",
+ new_rate,
+ measured_rate,
+ self._token_bucket.available_capacity,
+ )
+ self._enabled = True
+ self._token_bucket.max_rate = min(
+ new_rate, self._MAX_RATE_ADJUST_SCALE * measured_rate
+ )
+
+
+class RateClocker:
+ """Tracks the rate at which a client is sending a request."""
+
+ _DEFAULT_SMOOTHING = 0.8
+ # Update the rate every _TIME_BUCKET_RANGE seconds.
+ _TIME_BUCKET_RANGE = 0.5
+
+ def __init__(
+ self,
+ clock,
+ smoothing=_DEFAULT_SMOOTHING,
+ time_bucket_range=_TIME_BUCKET_RANGE,
+ ):
+ self._clock = clock
+ self._measured_rate = 0
+ self._smoothing = smoothing
+ self._last_bucket = math.floor(self._clock.current_time())
+ self._time_bucket_scale = 1 / self._TIME_BUCKET_RANGE
+ self._count = 0
+ self._lock = threading.Lock()
+
+ def record(self, amount=1):
+ with self._lock:
+ t = self._clock.current_time()
+ bucket = (
+ math.floor(t * self._time_bucket_scale)
+ / self._time_bucket_scale
+ )
+ self._count += amount
+ if bucket > self._last_bucket:
+ current_rate = self._count / float(bucket - self._last_bucket)
+ self._measured_rate = (current_rate * self._smoothing) + (
+ self._measured_rate * (1 - self._smoothing)
+ )
+ self._count = 0
+ self._last_bucket = bucket
+ return self._measured_rate
+
+ @property
+ def measured_rate(self):
+ return self._measured_rate