diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/retries')
8 files changed, 971 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/__init__.py b/.venv/lib/python3.12/site-packages/botocore/retries/__init__.py new file mode 100644 index 00000000..a6d6b377 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/__init__.py @@ -0,0 +1,6 @@ +"""New retry v2 handlers. + +This package obsoletes the botocore/retryhandler.py module and contains +new retry logic. + +""" diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py b/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py new file mode 100644 index 00000000..5e638ddb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/adaptive.py @@ -0,0 +1,132 @@ +import logging +import math +import threading + +from botocore.retries import bucket, standard, throttling + +logger = logging.getLogger(__name__) + + +def register_retry_handler(client): + clock = bucket.Clock() + rate_adjustor = throttling.CubicCalculator( + starting_max_rate=0, start_time=clock.current_time() + ) + token_bucket = bucket.TokenBucket(max_rate=1, clock=clock) + rate_clocker = RateClocker(clock) + throttling_detector = standard.ThrottlingErrorDetector( + retry_event_adapter=standard.RetryEventAdapter(), + ) + limiter = ClientRateLimiter( + rate_adjustor=rate_adjustor, + rate_clocker=rate_clocker, + token_bucket=token_bucket, + throttling_detector=throttling_detector, + clock=clock, + ) + client.meta.events.register( + 'before-send', + limiter.on_sending_request, + ) + client.meta.events.register( + 'needs-retry', + limiter.on_receiving_response, + ) + return limiter + + +class ClientRateLimiter: + _MAX_RATE_ADJUST_SCALE = 2.0 + + def __init__( + self, + rate_adjustor, + rate_clocker, + token_bucket, + throttling_detector, + clock, + ): + self._rate_adjustor = rate_adjustor + self._rate_clocker = rate_clocker + self._token_bucket = token_bucket + self._throttling_detector = throttling_detector + self._clock = clock + self._enabled = False + self._lock = threading.Lock() + + def on_sending_request(self, request, **kwargs): + if self._enabled: + self._token_bucket.acquire() + + # Hooked up to needs-retry. + def on_receiving_response(self, **kwargs): + measured_rate = self._rate_clocker.record() + timestamp = self._clock.current_time() + with self._lock: + if not self._throttling_detector.is_throttling_error(**kwargs): + new_rate = self._rate_adjustor.success_received(timestamp) + else: + if not self._enabled: + rate_to_use = measured_rate + else: + rate_to_use = min( + measured_rate, self._token_bucket.max_rate + ) + new_rate = self._rate_adjustor.error_received( + rate_to_use, timestamp + ) + logger.debug( + "Throttling response received, new send rate: %s " + "measured rate: %s, token bucket capacity " + "available: %s", + new_rate, + measured_rate, + self._token_bucket.available_capacity, + ) + self._enabled = True + self._token_bucket.max_rate = min( + new_rate, self._MAX_RATE_ADJUST_SCALE * measured_rate + ) + + +class RateClocker: + """Tracks the rate at which a client is sending a request.""" + + _DEFAULT_SMOOTHING = 0.8 + # Update the rate every _TIME_BUCKET_RANGE seconds. + _TIME_BUCKET_RANGE = 0.5 + + def __init__( + self, + clock, + smoothing=_DEFAULT_SMOOTHING, + time_bucket_range=_TIME_BUCKET_RANGE, + ): + self._clock = clock + self._measured_rate = 0 + self._smoothing = smoothing + self._last_bucket = math.floor(self._clock.current_time()) + self._time_bucket_scale = 1 / self._TIME_BUCKET_RANGE + self._count = 0 + self._lock = threading.Lock() + + def record(self, amount=1): + with self._lock: + t = self._clock.current_time() + bucket = ( + math.floor(t * self._time_bucket_scale) + / self._time_bucket_scale + ) + self._count += amount + if bucket > self._last_bucket: + current_rate = self._count / float(bucket - self._last_bucket) + self._measured_rate = (current_rate * self._smoothing) + ( + self._measured_rate * (1 - self._smoothing) + ) + self._count = 0 + self._last_bucket = bucket + return self._measured_rate + + @property + def measured_rate(self): + return self._measured_rate diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/base.py b/.venv/lib/python3.12/site-packages/botocore/retries/base.py new file mode 100644 index 00000000..108bfed6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/base.py @@ -0,0 +1,26 @@ +class BaseRetryBackoff: + def delay_amount(self, context): + """Calculate how long we should delay before retrying. + + :type context: RetryContext + + """ + raise NotImplementedError("delay_amount") + + +class BaseRetryableChecker: + """Base class for determining if a retry should happen. + + This base class checks for specific retryable conditions. + A single retryable checker doesn't necessarily indicate a retry + will happen. It's up to the ``RetryPolicy`` to use its + ``BaseRetryableCheckers`` to make the final decision on whether a retry + should happen. + """ + + def is_retryable(self, context): + """Returns True if retryable, False if not. + + :type context: RetryContext + """ + raise NotImplementedError("is_retryable") diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/bucket.py b/.venv/lib/python3.12/site-packages/botocore/retries/bucket.py new file mode 100644 index 00000000..09d33c77 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/bucket.py @@ -0,0 +1,115 @@ +"""This module implements token buckets used for client side throttling.""" + +import threading +import time + +from botocore.exceptions import CapacityNotAvailableError + + +class Clock: + def __init__(self): + pass + + def sleep(self, amount): + time.sleep(amount) + + def current_time(self): + return time.time() + + +class TokenBucket: + _MIN_RATE = 0.5 + + def __init__(self, max_rate, clock, min_rate=_MIN_RATE): + self._fill_rate = None + self._max_capacity = None + self._current_capacity = 0 + self._clock = clock + self._last_timestamp = None + self._min_rate = min_rate + self._lock = threading.Lock() + self._new_fill_rate_condition = threading.Condition(self._lock) + self.max_rate = max_rate + + @property + def max_rate(self): + return self._fill_rate + + @max_rate.setter + def max_rate(self, value): + with self._new_fill_rate_condition: + # Before we can change the rate we need to fill any pending + # tokens we might have based on the current rate. If we don't + # do this it means everything since the last recorded timestamp + # will accumulate at the rate we're about to set which isn't + # correct. + self._refill() + self._fill_rate = max(value, self._min_rate) + if value >= 1: + self._max_capacity = value + else: + self._max_capacity = 1 + # If we're scaling down, we also can't have a capacity that's + # more than our max_capacity. + self._current_capacity = min( + self._current_capacity, self._max_capacity + ) + self._new_fill_rate_condition.notify() + + @property + def max_capacity(self): + return self._max_capacity + + @property + def available_capacity(self): + return self._current_capacity + + def acquire(self, amount=1, block=True): + """Acquire token or return amount of time until next token available. + + If block is True, then this method will block until there's sufficient + capacity to acquire the desired amount. + + If block is False, then this method will return True is capacity + was successfully acquired, False otherwise. + + """ + with self._new_fill_rate_condition: + return self._acquire(amount=amount, block=block) + + def _acquire(self, amount, block): + self._refill() + if amount <= self._current_capacity: + self._current_capacity -= amount + return True + else: + if not block: + raise CapacityNotAvailableError() + # Not enough capacity. + sleep_amount = self._sleep_amount(amount) + while sleep_amount > 0: + # Until python3.2, wait() always returned None so we can't + # tell if a timeout occurred waiting on the cond var. + # Because of this we'll unconditionally call _refill(). + # The downside to this is that we were waken up via + # a notify(), we're calling unnecessarily calling _refill() an + # extra time. + self._new_fill_rate_condition.wait(sleep_amount) + self._refill() + sleep_amount = self._sleep_amount(amount) + self._current_capacity -= amount + return True + + def _sleep_amount(self, amount): + return (amount - self._current_capacity) / self._fill_rate + + def _refill(self): + timestamp = self._clock.current_time() + if self._last_timestamp is None: + self._last_timestamp = timestamp + return + current_capacity = self._current_capacity + fill_amount = (timestamp - self._last_timestamp) * self._fill_rate + new_capacity = min(self._max_capacity, current_capacity + fill_amount) + self._current_capacity = new_capacity + self._last_timestamp = timestamp diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/quota.py b/.venv/lib/python3.12/site-packages/botocore/retries/quota.py new file mode 100644 index 00000000..f0394291 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/quota.py @@ -0,0 +1,54 @@ +"""Retry quota implementation.""" + +import threading + + +class RetryQuota: + INITIAL_CAPACITY = 500 + + def __init__(self, initial_capacity=INITIAL_CAPACITY, lock=None): + self._max_capacity = initial_capacity + self._available_capacity = initial_capacity + if lock is None: + lock = threading.Lock() + self._lock = lock + + def acquire(self, capacity_amount): + """Attempt to aquire a certain amount of capacity. + + If there's not sufficient amount of capacity available, ``False`` + is returned. Otherwise, ``True`` is returned, which indicates that + capacity was successfully allocated. + + """ + # The acquire() is only called when we encounter a retryable + # response so we aren't worried about locking the entire method. + with self._lock: + if capacity_amount > self._available_capacity: + return False + self._available_capacity -= capacity_amount + return True + + def release(self, capacity_amount): + """Release capacity back to the retry quota. + + The capacity being released will be truncated if necessary + to ensure the max capacity is never exceeded. + + """ + # Implementation note: The release() method is called as part + # of the "after-call" event, which means it gets invoked for + # every API call. In the common case where the request is + # successful and we're at full capacity, we can avoid locking. + # We can't exceed max capacity so there's no work we have to do. + if self._max_capacity == self._available_capacity: + return + with self._lock: + amount = min( + self._max_capacity - self._available_capacity, capacity_amount + ) + self._available_capacity += amount + + @property + def available_capacity(self): + return self._available_capacity diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/special.py b/.venv/lib/python3.12/site-packages/botocore/retries/special.py new file mode 100644 index 00000000..9b782601 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/special.py @@ -0,0 +1,51 @@ +"""Special cased retries. + +These are additional retry cases we still have to handle from the legacy +retry handler. They don't make sense as part of the standard mode retry +module. Ideally we should be able to remove this module. + +""" + +import logging +from binascii import crc32 + +from botocore.retries.base import BaseRetryableChecker + +logger = logging.getLogger(__name__) + + +# TODO: This is an ideal candidate for the retryable trait once that's +# available. +class RetryIDPCommunicationError(BaseRetryableChecker): + _SERVICE_NAME = 'sts' + + def is_retryable(self, context): + service_name = context.operation_model.service_model.service_name + if service_name != self._SERVICE_NAME: + return False + error_code = context.get_error_code() + return error_code == 'IDPCommunicationError' + + +class RetryDDBChecksumError(BaseRetryableChecker): + _CHECKSUM_HEADER = 'x-amz-crc32' + _SERVICE_NAME = 'dynamodb' + + def is_retryable(self, context): + service_name = context.operation_model.service_model.service_name + if service_name != self._SERVICE_NAME: + return False + if context.http_response is None: + return False + checksum = context.http_response.headers.get(self._CHECKSUM_HEADER) + if checksum is None: + return False + actual_crc32 = crc32(context.http_response.content) & 0xFFFFFFFF + if actual_crc32 != int(checksum): + logger.debug( + "DynamoDB crc32 checksum does not match, " + "expected: %s, actual: %s", + checksum, + actual_crc32, + ) + return True diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/standard.py b/.venv/lib/python3.12/site-packages/botocore/retries/standard.py new file mode 100644 index 00000000..8801530b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/standard.py @@ -0,0 +1,532 @@ +"""Standard retry behavior. + +This contains the default standard retry behavior. +It provides consistent behavior with other AWS SDKs. + +The key base classes uses for retries: + + * ``BaseRetryableChecker`` - Use to check a specific condition that + indicates a retry should happen. This can include things like + max attempts, HTTP status code checks, error code checks etc. + * ``RetryBackoff`` - Use to determine how long we should backoff until + we retry a request. This is the class that will implement delay such + as exponential backoff. + * ``RetryPolicy`` - Main class that determines if a retry should + happen. It can combine data from a various BaseRetryableCheckers + to make a final call as to whether or not a retry should happen. + It then uses a ``BaseRetryBackoff`` to determine how long to delay. + * ``RetryHandler`` - The bridge between botocore's event system + used by endpoint.py to manage retries and the interfaces defined + in this module. + +This allows us to define an API that has minimal coupling to the event +based API used by botocore. + +""" + +import logging +import random + +from botocore.exceptions import ( + ConnectionError, + ConnectTimeoutError, + HTTPClientError, + ReadTimeoutError, +) +from botocore.retries import quota, special +from botocore.retries.base import BaseRetryableChecker, BaseRetryBackoff + +DEFAULT_MAX_ATTEMPTS = 3 +logger = logging.getLogger(__name__) + + +def register_retry_handler(client, max_attempts=DEFAULT_MAX_ATTEMPTS): + retry_quota = RetryQuotaChecker(quota.RetryQuota()) + + service_id = client.meta.service_model.service_id + service_event_name = service_id.hyphenize() + client.meta.events.register( + f'after-call.{service_event_name}', retry_quota.release_retry_quota + ) + + handler = RetryHandler( + retry_policy=RetryPolicy( + retry_checker=StandardRetryConditions(max_attempts=max_attempts), + retry_backoff=ExponentialBackoff(), + ), + retry_event_adapter=RetryEventAdapter(), + retry_quota=retry_quota, + ) + + unique_id = f'retry-config-{service_event_name}' + client.meta.events.register( + f'needs-retry.{service_event_name}', + handler.needs_retry, + unique_id=unique_id, + ) + return handler + + +class RetryHandler: + """Bridge between botocore's event system and this module. + + This class is intended to be hooked to botocore's event system + as an event handler. + """ + + def __init__(self, retry_policy, retry_event_adapter, retry_quota): + self._retry_policy = retry_policy + self._retry_event_adapter = retry_event_adapter + self._retry_quota = retry_quota + + def needs_retry(self, **kwargs): + """Connect as a handler to the needs-retry event.""" + retry_delay = None + context = self._retry_event_adapter.create_retry_context(**kwargs) + if self._retry_policy.should_retry(context): + # Before we can retry we need to ensure we have sufficient + # capacity in our retry quota. + if self._retry_quota.acquire_retry_quota(context): + retry_delay = self._retry_policy.compute_retry_delay(context) + logger.debug( + "Retry needed, retrying request after delay of: %s", + retry_delay, + ) + else: + logger.debug( + "Retry needed but retry quota reached, " + "not retrying request." + ) + else: + logger.debug("Not retrying request.") + self._retry_event_adapter.adapt_retry_response_from_context(context) + return retry_delay + + +class RetryEventAdapter: + """Adapter to existing retry interface used in the endpoints layer. + + This existing interface for determining if a retry needs to happen + is event based and used in ``botocore.endpoint``. The interface has + grown organically over the years and could use some cleanup. This + adapter converts that interface into the interface used by the + new retry strategies. + + """ + + def create_retry_context(self, **kwargs): + """Create context based on needs-retry kwargs.""" + response = kwargs['response'] + if response is None: + # If response is None it means that an exception was raised + # because we never received a response from the service. This + # could be something like a ConnectionError we get from our + # http layer. + http_response = None + parsed_response = None + else: + http_response, parsed_response = response + # This provides isolation between the kwargs emitted in the + # needs-retry event, and what this module uses to check for + # retries. + context = RetryContext( + attempt_number=kwargs['attempts'], + operation_model=kwargs['operation'], + http_response=http_response, + parsed_response=parsed_response, + caught_exception=kwargs['caught_exception'], + request_context=kwargs['request_dict']['context'], + ) + return context + + def adapt_retry_response_from_context(self, context): + """Modify response back to user back from context.""" + # This will mutate attributes that are returned back to the end + # user. We do it this way so that all the various retry classes + # don't mutate any input parameters from the needs-retry event. + metadata = context.get_retry_metadata() + if context.parsed_response is not None: + context.parsed_response.setdefault('ResponseMetadata', {}).update( + metadata + ) + + +# Implementation note: this is meant to encapsulate all the misc stuff +# that gets sent in the needs-retry event. This is mapped so that params +# are more clear and explicit. +class RetryContext: + """Normalize a response that we use to check if a retry should occur. + + This class smoothes over the different types of responses we may get + from a service including: + + * A modeled error response from the service that contains a service + code and error message. + * A raw HTTP response that doesn't contain service protocol specific + error keys. + * An exception received while attempting to retrieve a response. + This could be a ConnectionError we receive from our HTTP layer which + could represent that we weren't able to receive a response from + the service. + + This class guarantees that at least one of the above attributes will be + non None. + + This class is meant to provide a read-only view into the properties + associated with a possible retryable response. None of the properties + are meant to be modified directly. + + """ + + def __init__( + self, + attempt_number, + operation_model=None, + parsed_response=None, + http_response=None, + caught_exception=None, + request_context=None, + ): + # 1-based attempt number. + self.attempt_number = attempt_number + self.operation_model = operation_model + # This is the parsed response dictionary we get from parsing + # the HTTP response from the service. + self.parsed_response = parsed_response + # This is an instance of botocore.awsrequest.AWSResponse. + self.http_response = http_response + # This is a subclass of Exception that will be non None if + # an exception was raised when retrying to retrieve a response. + self.caught_exception = caught_exception + # This is the request context dictionary that's added to the + # request dict. This is used to story any additional state + # about the request. We use this for storing retry quota + # capacity. + if request_context is None: + request_context = {} + self.request_context = request_context + self._retry_metadata = {} + + # These are misc helper methods to avoid duplication in the various + # checkers. + def get_error_code(self): + """Check if there was a parsed response with an error code. + + If we could not find any error codes, ``None`` is returned. + + """ + if self.parsed_response is None: + return + error = self.parsed_response.get('Error', {}) + if not isinstance(error, dict): + return + return error.get('Code') + + def add_retry_metadata(self, **kwargs): + """Add key/value pairs to the retry metadata. + + This allows any objects during the retry process to add + metadata about any checks/validations that happened. + + This gets added to the response metadata in the retry handler. + + """ + self._retry_metadata.update(**kwargs) + + def get_retry_metadata(self): + return self._retry_metadata.copy() + + +class RetryPolicy: + def __init__(self, retry_checker, retry_backoff): + self._retry_checker = retry_checker + self._retry_backoff = retry_backoff + + def should_retry(self, context): + return self._retry_checker.is_retryable(context) + + def compute_retry_delay(self, context): + return self._retry_backoff.delay_amount(context) + + +class ExponentialBackoff(BaseRetryBackoff): + _BASE = 2 + _MAX_BACKOFF = 20 + + def __init__(self, max_backoff=20, random=random.random): + self._base = self._BASE + self._max_backoff = max_backoff + self._random = random + + def delay_amount(self, context): + """Calculates delay based on exponential backoff. + + This class implements truncated binary exponential backoff + with jitter:: + + t_i = rand(0, 1) * min(2 ** attempt, MAX_BACKOFF) + + where ``i`` is the request attempt (0 based). + + """ + # The context.attempt_number is a 1-based value, but we have + # to calculate the delay based on i based a 0-based value. We + # want the first delay to just be ``rand(0, 1)``. + return self._random() * min( + (self._base ** (context.attempt_number - 1)), + self._max_backoff, + ) + + +class MaxAttemptsChecker(BaseRetryableChecker): + def __init__(self, max_attempts): + self._max_attempts = max_attempts + + def is_retryable(self, context): + under_max_attempts = context.attempt_number < self._max_attempts + retries_context = context.request_context.get('retries') + if retries_context: + retries_context['max'] = max( + retries_context.get('max', 0), self._max_attempts + ) + if not under_max_attempts: + logger.debug("Max attempts of %s reached.", self._max_attempts) + context.add_retry_metadata(MaxAttemptsReached=True) + return under_max_attempts + + +class TransientRetryableChecker(BaseRetryableChecker): + _TRANSIENT_ERROR_CODES = [ + 'RequestTimeout', + 'RequestTimeoutException', + 'PriorRequestNotComplete', + ] + _TRANSIENT_STATUS_CODES = [500, 502, 503, 504] + _TRANSIENT_EXCEPTION_CLS = ( + ConnectionError, + HTTPClientError, + ) + + def __init__( + self, + transient_error_codes=None, + transient_status_codes=None, + transient_exception_cls=None, + ): + if transient_error_codes is None: + transient_error_codes = self._TRANSIENT_ERROR_CODES[:] + if transient_status_codes is None: + transient_status_codes = self._TRANSIENT_STATUS_CODES[:] + if transient_exception_cls is None: + transient_exception_cls = self._TRANSIENT_EXCEPTION_CLS + self._transient_error_codes = transient_error_codes + self._transient_status_codes = transient_status_codes + self._transient_exception_cls = transient_exception_cls + + def is_retryable(self, context): + if context.get_error_code() in self._transient_error_codes: + return True + if context.http_response is not None: + if ( + context.http_response.status_code + in self._transient_status_codes + ): + return True + if context.caught_exception is not None: + return isinstance( + context.caught_exception, self._transient_exception_cls + ) + return False + + +class ThrottledRetryableChecker(BaseRetryableChecker): + # This is the union of all error codes we've seen that represent + # a throttled error. + _THROTTLED_ERROR_CODES = [ + 'Throttling', + 'ThrottlingException', + 'ThrottledException', + 'RequestThrottledException', + 'TooManyRequestsException', + 'ProvisionedThroughputExceededException', + 'TransactionInProgressException', + 'RequestLimitExceeded', + 'BandwidthLimitExceeded', + 'LimitExceededException', + 'RequestThrottled', + 'SlowDown', + 'PriorRequestNotComplete', + 'EC2ThrottledException', + ] + + def __init__(self, throttled_error_codes=None): + if throttled_error_codes is None: + throttled_error_codes = self._THROTTLED_ERROR_CODES[:] + self._throttled_error_codes = throttled_error_codes + + def is_retryable(self, context): + # Only the error code from a parsed service response is used + # to determine if the response is a throttled response. + return context.get_error_code() in self._throttled_error_codes + + +class ModeledRetryableChecker(BaseRetryableChecker): + """Check if an error has been modeled as retryable.""" + + def __init__(self): + self._error_detector = ModeledRetryErrorDetector() + + def is_retryable(self, context): + error_code = context.get_error_code() + if error_code is None: + return False + return self._error_detector.detect_error_type(context) is not None + + +class ModeledRetryErrorDetector: + """Checks whether or not an error is a modeled retryable error.""" + + # There are return values from the detect_error_type() method. + TRANSIENT_ERROR = 'TRANSIENT_ERROR' + THROTTLING_ERROR = 'THROTTLING_ERROR' + # This class is lower level than ModeledRetryableChecker, which + # implements BaseRetryableChecker. This object allows you to distinguish + # between the various types of retryable errors. + + def detect_error_type(self, context): + """Detect the error type associated with an error code and model. + + This will either return: + + * ``self.TRANSIENT_ERROR`` - If the error is a transient error + * ``self.THROTTLING_ERROR`` - If the error is a throttling error + * ``None`` - If the error is neither type of error. + + """ + error_code = context.get_error_code() + op_model = context.operation_model + if op_model is None or not op_model.error_shapes: + return + for shape in op_model.error_shapes: + if shape.metadata.get('retryable') is not None: + # Check if this error code matches the shape. This can + # be either by name or by a modeled error code. + error_code_to_check = ( + shape.metadata.get('error', {}).get('code') or shape.name + ) + if error_code == error_code_to_check: + if shape.metadata['retryable'].get('throttling'): + return self.THROTTLING_ERROR + return self.TRANSIENT_ERROR + + +class ThrottlingErrorDetector: + def __init__(self, retry_event_adapter): + self._modeled_error_detector = ModeledRetryErrorDetector() + self._fixed_error_code_detector = ThrottledRetryableChecker() + self._retry_event_adapter = retry_event_adapter + + # This expects the kwargs from needs-retry to be passed through. + def is_throttling_error(self, **kwargs): + context = self._retry_event_adapter.create_retry_context(**kwargs) + if self._fixed_error_code_detector.is_retryable(context): + return True + error_type = self._modeled_error_detector.detect_error_type(context) + return error_type == self._modeled_error_detector.THROTTLING_ERROR + + +class StandardRetryConditions(BaseRetryableChecker): + """Concrete class that implements the standard retry policy checks. + + Specifically: + + not max_attempts and (transient or throttled or modeled_retry) + + """ + + def __init__(self, max_attempts=DEFAULT_MAX_ATTEMPTS): + # Note: This class is for convenience so you can have the + # standard retry condition in a single class. + self._max_attempts_checker = MaxAttemptsChecker(max_attempts) + self._additional_checkers = OrRetryChecker( + [ + TransientRetryableChecker(), + ThrottledRetryableChecker(), + ModeledRetryableChecker(), + OrRetryChecker( + [ + special.RetryIDPCommunicationError(), + special.RetryDDBChecksumError(), + ] + ), + ] + ) + + def is_retryable(self, context): + return self._max_attempts_checker.is_retryable( + context + ) and self._additional_checkers.is_retryable(context) + + +class OrRetryChecker(BaseRetryableChecker): + def __init__(self, checkers): + self._checkers = checkers + + def is_retryable(self, context): + return any(checker.is_retryable(context) for checker in self._checkers) + + +class RetryQuotaChecker: + _RETRY_COST = 5 + _NO_RETRY_INCREMENT = 1 + _TIMEOUT_RETRY_REQUEST = 10 + _TIMEOUT_EXCEPTIONS = (ConnectTimeoutError, ReadTimeoutError) + + # Implementation note: We're not making this a BaseRetryableChecker + # because this isn't just a check if we can retry. This also changes + # state so we have to careful when/how we call this. Making it + # a BaseRetryableChecker implies you can call .is_retryable(context) + # as many times as you want and not affect anything. + + def __init__(self, quota): + self._quota = quota + # This tracks the last amount + self._last_amount_acquired = None + + def acquire_retry_quota(self, context): + if self._is_timeout_error(context): + capacity_amount = self._TIMEOUT_RETRY_REQUEST + else: + capacity_amount = self._RETRY_COST + success = self._quota.acquire(capacity_amount) + if success: + # We add the capacity amount to the request context so we know + # how much to release later. The capacity amount can vary based + # on the error. + context.request_context['retry_quota_capacity'] = capacity_amount + return True + context.add_retry_metadata(RetryQuotaReached=True) + return False + + def _is_timeout_error(self, context): + return isinstance(context.caught_exception, self._TIMEOUT_EXCEPTIONS) + + # This is intended to be hooked up to ``after-call``. + def release_retry_quota(self, context, http_response, **kwargs): + # There's three possible options. + # 1. The HTTP response did not have a 2xx response. In that case we + # give no quota back. + # 2. The HTTP request was successful and was never retried. In + # that case we give _NO_RETRY_INCREMENT back. + # 3. The API call had retries, and we eventually receive an HTTP + # response with a 2xx status code. In that case we give back + # whatever quota was associated with the last acquisition. + if http_response is None: + return + status_code = http_response.status_code + if 200 <= status_code < 300: + if 'retry_quota_capacity' not in context: + self._quota.release(self._NO_RETRY_INCREMENT) + else: + capacity_amount = context['retry_quota_capacity'] + self._quota.release(capacity_amount) diff --git a/.venv/lib/python3.12/site-packages/botocore/retries/throttling.py b/.venv/lib/python3.12/site-packages/botocore/retries/throttling.py new file mode 100644 index 00000000..34ab4172 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retries/throttling.py @@ -0,0 +1,55 @@ +from collections import namedtuple + +CubicParams = namedtuple('CubicParams', ['w_max', 'k', 'last_fail']) + + +class CubicCalculator: + _SCALE_CONSTANT = 0.4 + _BETA = 0.7 + + def __init__( + self, + starting_max_rate, + start_time, + scale_constant=_SCALE_CONSTANT, + beta=_BETA, + ): + self._w_max = starting_max_rate + self._scale_constant = scale_constant + self._beta = beta + self._k = self._calculate_zero_point() + self._last_fail = start_time + + def _calculate_zero_point(self): + scaled_value = (self._w_max * (1 - self._beta)) / self._scale_constant + k = scaled_value ** (1 / 3.0) + return k + + def success_received(self, timestamp): + dt = timestamp - self._last_fail + new_rate = self._scale_constant * (dt - self._k) ** 3 + self._w_max + return new_rate + + def error_received(self, current_rate, timestamp): + # Consider not having this be the current measured rate. + + # We have a new max rate, which is the current rate we were sending + # at when we received an error response. + self._w_max = current_rate + self._k = self._calculate_zero_point() + self._last_fail = timestamp + return current_rate * self._beta + + def get_params_snapshot(self): + """Return a read-only object of the current cubic parameters. + + These parameters are intended to be used for debug/troubleshooting + purposes. These object is a read-only snapshot and cannot be used + to modify the behavior of the CUBIC calculations. + + New parameters may be added to this object in the future. + + """ + return CubicParams( + w_max=self._w_max, k=self._k, last_fail=self._last_fail + ) |