diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/retryhandler.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/retryhandler.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/botocore/retryhandler.py | 416 |
1 files changed, 416 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/retryhandler.py b/.venv/lib/python3.12/site-packages/botocore/retryhandler.py new file mode 100644 index 00000000..c2eed1d9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/botocore/retryhandler.py @@ -0,0 +1,416 @@ +# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ +# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import functools +import logging +import random +from binascii import crc32 + +from botocore.exceptions import ( + ChecksumError, + ConnectionClosedError, + ConnectionError, + EndpointConnectionError, + ReadTimeoutError, +) + +logger = logging.getLogger(__name__) +# The only supported error for now is GENERAL_CONNECTION_ERROR +# which maps to requests generic ConnectionError. If we're able +# to get more specific exceptions from requests we can update +# this mapping with more specific exceptions. +EXCEPTION_MAP = { + 'GENERAL_CONNECTION_ERROR': [ + ConnectionError, + ConnectionClosedError, + ReadTimeoutError, + EndpointConnectionError, + ], +} + + +def delay_exponential(base, growth_factor, attempts): + """Calculate time to sleep based on exponential function. + + The format is:: + + base * growth_factor ^ (attempts - 1) + + If ``base`` is set to 'rand' then a random number between + 0 and 1 will be used as the base. + Base must be greater than 0, otherwise a ValueError will be + raised. + + """ + if base == 'rand': + base = random.random() + elif base <= 0: + raise ValueError( + f"The 'base' param must be greater than 0, got: {base}" + ) + time_to_sleep = base * (growth_factor ** (attempts - 1)) + return time_to_sleep + + +def create_exponential_delay_function(base, growth_factor): + """Create an exponential delay function based on the attempts. + + This is used so that you only have to pass it the attempts + parameter to calculate the delay. + + """ + return functools.partial( + delay_exponential, base=base, growth_factor=growth_factor + ) + + +def create_retry_handler(config, operation_name=None): + checker = create_checker_from_retry_config( + config, operation_name=operation_name + ) + action = create_retry_action_from_config( + config, operation_name=operation_name + ) + return RetryHandler(checker=checker, action=action) + + +def create_retry_action_from_config(config, operation_name=None): + # The spec has the possibility of supporting per policy + # actions, but right now, we assume this comes from the + # default section, which means that delay functions apply + # for every policy in the retry config (per service). + delay_config = config['__default__']['delay'] + if delay_config['type'] == 'exponential': + return create_exponential_delay_function( + base=delay_config['base'], + growth_factor=delay_config['growth_factor'], + ) + + +def create_checker_from_retry_config(config, operation_name=None): + checkers = [] + max_attempts = None + retryable_exceptions = [] + if '__default__' in config: + policies = config['__default__'].get('policies', []) + max_attempts = config['__default__']['max_attempts'] + for key in policies: + current_config = policies[key] + checkers.append(_create_single_checker(current_config)) + retry_exception = _extract_retryable_exception(current_config) + if retry_exception is not None: + retryable_exceptions.extend(retry_exception) + if operation_name is not None and config.get(operation_name) is not None: + operation_policies = config[operation_name]['policies'] + for key in operation_policies: + checkers.append(_create_single_checker(operation_policies[key])) + retry_exception = _extract_retryable_exception( + operation_policies[key] + ) + if retry_exception is not None: + retryable_exceptions.extend(retry_exception) + if len(checkers) == 1: + # Don't need to use a MultiChecker + return MaxAttemptsDecorator(checkers[0], max_attempts=max_attempts) + else: + multi_checker = MultiChecker(checkers) + return MaxAttemptsDecorator( + multi_checker, + max_attempts=max_attempts, + retryable_exceptions=tuple(retryable_exceptions), + ) + + +def _create_single_checker(config): + if 'response' in config['applies_when']: + return _create_single_response_checker( + config['applies_when']['response'] + ) + elif 'socket_errors' in config['applies_when']: + return ExceptionRaiser() + + +def _create_single_response_checker(response): + if 'service_error_code' in response: + checker = ServiceErrorCodeChecker( + status_code=response['http_status_code'], + error_code=response['service_error_code'], + ) + elif 'http_status_code' in response: + checker = HTTPStatusCodeChecker( + status_code=response['http_status_code'] + ) + elif 'crc32body' in response: + checker = CRC32Checker(header=response['crc32body']) + else: + # TODO: send a signal. + raise ValueError("Unknown retry policy") + return checker + + +def _extract_retryable_exception(config): + applies_when = config['applies_when'] + if 'crc32body' in applies_when.get('response', {}): + return [ChecksumError] + elif 'socket_errors' in applies_when: + exceptions = [] + for name in applies_when['socket_errors']: + exceptions.extend(EXCEPTION_MAP[name]) + return exceptions + + +class RetryHandler: + """Retry handler. + + The retry handler takes two params, ``checker`` object + and an ``action`` object. + + The ``checker`` object must be a callable object and based on a response + and an attempt number, determines whether or not sufficient criteria for + a retry has been met. If this is the case then the ``action`` object + (which also is a callable) determines what needs to happen in the event + of a retry. + + """ + + def __init__(self, checker, action): + self._checker = checker + self._action = action + + def __call__(self, attempts, response, caught_exception, **kwargs): + """Handler for a retry. + + Intended to be hooked up to an event handler (hence the **kwargs), + this will process retries appropriately. + + """ + checker_kwargs = { + 'attempt_number': attempts, + 'response': response, + 'caught_exception': caught_exception, + } + if isinstance(self._checker, MaxAttemptsDecorator): + retries_context = kwargs['request_dict']['context'].get('retries') + checker_kwargs.update({'retries_context': retries_context}) + + if self._checker(**checker_kwargs): + result = self._action(attempts=attempts) + logger.debug("Retry needed, action of: %s", result) + return result + logger.debug("No retry needed.") + + +class BaseChecker: + """Base class for retry checkers. + + Each class is responsible for checking a single criteria that determines + whether or not a retry should not happen. + + """ + + def __call__(self, attempt_number, response, caught_exception): + """Determine if retry criteria matches. + + Note that either ``response`` is not None and ``caught_exception`` is + None or ``response`` is None and ``caught_exception`` is not None. + + :type attempt_number: int + :param attempt_number: The total number of times we've attempted + to send the request. + + :param response: The HTTP response (if one was received). + + :type caught_exception: Exception + :param caught_exception: Any exception that was caught while trying to + send the HTTP response. + + :return: True, if the retry criteria matches (and therefore a retry + should occur. False if the criteria does not match. + + """ + # The default implementation allows subclasses to not have to check + # whether or not response is None or not. + if response is not None: + return self._check_response(attempt_number, response) + elif caught_exception is not None: + return self._check_caught_exception( + attempt_number, caught_exception + ) + else: + raise ValueError("Both response and caught_exception are None.") + + def _check_response(self, attempt_number, response): + pass + + def _check_caught_exception(self, attempt_number, caught_exception): + pass + + +class MaxAttemptsDecorator(BaseChecker): + """Allow retries up to a maximum number of attempts. + + This will pass through calls to the decorated retry checker, provided + that the number of attempts does not exceed max_attempts. It will + also catch any retryable_exceptions passed in. Once max_attempts has + been exceeded, then False will be returned or the retryable_exceptions + that was previously being caught will be raised. + + """ + + def __init__(self, checker, max_attempts, retryable_exceptions=None): + self._checker = checker + self._max_attempts = max_attempts + self._retryable_exceptions = retryable_exceptions + + def __call__( + self, attempt_number, response, caught_exception, retries_context + ): + if retries_context: + retries_context['max'] = max( + retries_context.get('max', 0), self._max_attempts + ) + + should_retry = self._should_retry( + attempt_number, response, caught_exception + ) + if should_retry: + if attempt_number >= self._max_attempts: + # explicitly set MaxAttemptsReached + if response is not None and 'ResponseMetadata' in response[1]: + response[1]['ResponseMetadata']['MaxAttemptsReached'] = ( + True + ) + logger.debug( + "Reached the maximum number of retry attempts: %s", + attempt_number, + ) + return False + else: + return should_retry + else: + return False + + def _should_retry(self, attempt_number, response, caught_exception): + if self._retryable_exceptions and attempt_number < self._max_attempts: + try: + return self._checker( + attempt_number, response, caught_exception + ) + except self._retryable_exceptions as e: + logger.debug( + "retry needed, retryable exception caught: %s", + e, + exc_info=True, + ) + return True + else: + # If we've exceeded the max attempts we just let the exception + # propagate if one has occurred. + return self._checker(attempt_number, response, caught_exception) + + +class HTTPStatusCodeChecker(BaseChecker): + def __init__(self, status_code): + self._status_code = status_code + + def _check_response(self, attempt_number, response): + if response[0].status_code == self._status_code: + logger.debug( + "retry needed: retryable HTTP status code received: %s", + self._status_code, + ) + return True + else: + return False + + +class ServiceErrorCodeChecker(BaseChecker): + def __init__(self, status_code, error_code): + self._status_code = status_code + self._error_code = error_code + + def _check_response(self, attempt_number, response): + if response[0].status_code == self._status_code: + actual_error_code = response[1].get('Error', {}).get('Code') + if actual_error_code == self._error_code: + logger.debug( + "retry needed: matching HTTP status and error code seen: " + "%s, %s", + self._status_code, + self._error_code, + ) + return True + return False + + +class MultiChecker(BaseChecker): + def __init__(self, checkers): + self._checkers = checkers + + def __call__(self, attempt_number, response, caught_exception): + for checker in self._checkers: + checker_response = checker( + attempt_number, response, caught_exception + ) + if checker_response: + return checker_response + return False + + +class CRC32Checker(BaseChecker): + def __init__(self, header): + # The header where the expected crc32 is located. + self._header_name = header + + def _check_response(self, attempt_number, response): + http_response = response[0] + expected_crc = http_response.headers.get(self._header_name) + if expected_crc is None: + logger.debug( + "crc32 check skipped, the %s header is not " + "in the http response.", + self._header_name, + ) + else: + actual_crc32 = crc32(response[0].content) & 0xFFFFFFFF + if not actual_crc32 == int(expected_crc): + logger.debug( + "retry needed: crc32 check failed, expected != actual: " + "%s != %s", + int(expected_crc), + actual_crc32, + ) + raise ChecksumError( + checksum_type='crc32', + expected_checksum=int(expected_crc), + actual_checksum=actual_crc32, + ) + + +class ExceptionRaiser(BaseChecker): + """Raise any caught exceptions. + + This class will raise any non None ``caught_exception``. + + """ + + def _check_caught_exception(self, attempt_number, caught_exception): + # This is implementation specific, but this class is useful by + # coordinating with the MaxAttemptsDecorator. + # The MaxAttemptsDecorator has a list of exceptions it should catch + # and retry, but something needs to come along and actually raise the + # caught_exception. That's what this class is being used for. If + # the MaxAttemptsDecorator is not interested in retrying the exception + # then this exception just propagates out past the retry code. + raise caught_exception |