aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/retryhandler.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/retryhandler.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/retryhandler.py416
1 files changed, 416 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/retryhandler.py b/.venv/lib/python3.12/site-packages/botocore/retryhandler.py
new file mode 100644
index 00000000..c2eed1d9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/retryhandler.py
@@ -0,0 +1,416 @@
+# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
+# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+import functools
+import logging
+import random
+from binascii import crc32
+
+from botocore.exceptions import (
+ ChecksumError,
+ ConnectionClosedError,
+ ConnectionError,
+ EndpointConnectionError,
+ ReadTimeoutError,
+)
+
+logger = logging.getLogger(__name__)
+# The only supported error for now is GENERAL_CONNECTION_ERROR
+# which maps to requests generic ConnectionError. If we're able
+# to get more specific exceptions from requests we can update
+# this mapping with more specific exceptions.
+EXCEPTION_MAP = {
+ 'GENERAL_CONNECTION_ERROR': [
+ ConnectionError,
+ ConnectionClosedError,
+ ReadTimeoutError,
+ EndpointConnectionError,
+ ],
+}
+
+
+def delay_exponential(base, growth_factor, attempts):
+ """Calculate time to sleep based on exponential function.
+
+ The format is::
+
+ base * growth_factor ^ (attempts - 1)
+
+ If ``base`` is set to 'rand' then a random number between
+ 0 and 1 will be used as the base.
+ Base must be greater than 0, otherwise a ValueError will be
+ raised.
+
+ """
+ if base == 'rand':
+ base = random.random()
+ elif base <= 0:
+ raise ValueError(
+ f"The 'base' param must be greater than 0, got: {base}"
+ )
+ time_to_sleep = base * (growth_factor ** (attempts - 1))
+ return time_to_sleep
+
+
+def create_exponential_delay_function(base, growth_factor):
+ """Create an exponential delay function based on the attempts.
+
+ This is used so that you only have to pass it the attempts
+ parameter to calculate the delay.
+
+ """
+ return functools.partial(
+ delay_exponential, base=base, growth_factor=growth_factor
+ )
+
+
+def create_retry_handler(config, operation_name=None):
+ checker = create_checker_from_retry_config(
+ config, operation_name=operation_name
+ )
+ action = create_retry_action_from_config(
+ config, operation_name=operation_name
+ )
+ return RetryHandler(checker=checker, action=action)
+
+
+def create_retry_action_from_config(config, operation_name=None):
+ # The spec has the possibility of supporting per policy
+ # actions, but right now, we assume this comes from the
+ # default section, which means that delay functions apply
+ # for every policy in the retry config (per service).
+ delay_config = config['__default__']['delay']
+ if delay_config['type'] == 'exponential':
+ return create_exponential_delay_function(
+ base=delay_config['base'],
+ growth_factor=delay_config['growth_factor'],
+ )
+
+
+def create_checker_from_retry_config(config, operation_name=None):
+ checkers = []
+ max_attempts = None
+ retryable_exceptions = []
+ if '__default__' in config:
+ policies = config['__default__'].get('policies', [])
+ max_attempts = config['__default__']['max_attempts']
+ for key in policies:
+ current_config = policies[key]
+ checkers.append(_create_single_checker(current_config))
+ retry_exception = _extract_retryable_exception(current_config)
+ if retry_exception is not None:
+ retryable_exceptions.extend(retry_exception)
+ if operation_name is not None and config.get(operation_name) is not None:
+ operation_policies = config[operation_name]['policies']
+ for key in operation_policies:
+ checkers.append(_create_single_checker(operation_policies[key]))
+ retry_exception = _extract_retryable_exception(
+ operation_policies[key]
+ )
+ if retry_exception is not None:
+ retryable_exceptions.extend(retry_exception)
+ if len(checkers) == 1:
+ # Don't need to use a MultiChecker
+ return MaxAttemptsDecorator(checkers[0], max_attempts=max_attempts)
+ else:
+ multi_checker = MultiChecker(checkers)
+ return MaxAttemptsDecorator(
+ multi_checker,
+ max_attempts=max_attempts,
+ retryable_exceptions=tuple(retryable_exceptions),
+ )
+
+
+def _create_single_checker(config):
+ if 'response' in config['applies_when']:
+ return _create_single_response_checker(
+ config['applies_when']['response']
+ )
+ elif 'socket_errors' in config['applies_when']:
+ return ExceptionRaiser()
+
+
+def _create_single_response_checker(response):
+ if 'service_error_code' in response:
+ checker = ServiceErrorCodeChecker(
+ status_code=response['http_status_code'],
+ error_code=response['service_error_code'],
+ )
+ elif 'http_status_code' in response:
+ checker = HTTPStatusCodeChecker(
+ status_code=response['http_status_code']
+ )
+ elif 'crc32body' in response:
+ checker = CRC32Checker(header=response['crc32body'])
+ else:
+ # TODO: send a signal.
+ raise ValueError("Unknown retry policy")
+ return checker
+
+
+def _extract_retryable_exception(config):
+ applies_when = config['applies_when']
+ if 'crc32body' in applies_when.get('response', {}):
+ return [ChecksumError]
+ elif 'socket_errors' in applies_when:
+ exceptions = []
+ for name in applies_when['socket_errors']:
+ exceptions.extend(EXCEPTION_MAP[name])
+ return exceptions
+
+
+class RetryHandler:
+ """Retry handler.
+
+ The retry handler takes two params, ``checker`` object
+ and an ``action`` object.
+
+ The ``checker`` object must be a callable object and based on a response
+ and an attempt number, determines whether or not sufficient criteria for
+ a retry has been met. If this is the case then the ``action`` object
+ (which also is a callable) determines what needs to happen in the event
+ of a retry.
+
+ """
+
+ def __init__(self, checker, action):
+ self._checker = checker
+ self._action = action
+
+ def __call__(self, attempts, response, caught_exception, **kwargs):
+ """Handler for a retry.
+
+ Intended to be hooked up to an event handler (hence the **kwargs),
+ this will process retries appropriately.
+
+ """
+ checker_kwargs = {
+ 'attempt_number': attempts,
+ 'response': response,
+ 'caught_exception': caught_exception,
+ }
+ if isinstance(self._checker, MaxAttemptsDecorator):
+ retries_context = kwargs['request_dict']['context'].get('retries')
+ checker_kwargs.update({'retries_context': retries_context})
+
+ if self._checker(**checker_kwargs):
+ result = self._action(attempts=attempts)
+ logger.debug("Retry needed, action of: %s", result)
+ return result
+ logger.debug("No retry needed.")
+
+
+class BaseChecker:
+ """Base class for retry checkers.
+
+ Each class is responsible for checking a single criteria that determines
+ whether or not a retry should not happen.
+
+ """
+
+ def __call__(self, attempt_number, response, caught_exception):
+ """Determine if retry criteria matches.
+
+ Note that either ``response`` is not None and ``caught_exception`` is
+ None or ``response`` is None and ``caught_exception`` is not None.
+
+ :type attempt_number: int
+ :param attempt_number: The total number of times we've attempted
+ to send the request.
+
+ :param response: The HTTP response (if one was received).
+
+ :type caught_exception: Exception
+ :param caught_exception: Any exception that was caught while trying to
+ send the HTTP response.
+
+ :return: True, if the retry criteria matches (and therefore a retry
+ should occur. False if the criteria does not match.
+
+ """
+ # The default implementation allows subclasses to not have to check
+ # whether or not response is None or not.
+ if response is not None:
+ return self._check_response(attempt_number, response)
+ elif caught_exception is not None:
+ return self._check_caught_exception(
+ attempt_number, caught_exception
+ )
+ else:
+ raise ValueError("Both response and caught_exception are None.")
+
+ def _check_response(self, attempt_number, response):
+ pass
+
+ def _check_caught_exception(self, attempt_number, caught_exception):
+ pass
+
+
+class MaxAttemptsDecorator(BaseChecker):
+ """Allow retries up to a maximum number of attempts.
+
+ This will pass through calls to the decorated retry checker, provided
+ that the number of attempts does not exceed max_attempts. It will
+ also catch any retryable_exceptions passed in. Once max_attempts has
+ been exceeded, then False will be returned or the retryable_exceptions
+ that was previously being caught will be raised.
+
+ """
+
+ def __init__(self, checker, max_attempts, retryable_exceptions=None):
+ self._checker = checker
+ self._max_attempts = max_attempts
+ self._retryable_exceptions = retryable_exceptions
+
+ def __call__(
+ self, attempt_number, response, caught_exception, retries_context
+ ):
+ if retries_context:
+ retries_context['max'] = max(
+ retries_context.get('max', 0), self._max_attempts
+ )
+
+ should_retry = self._should_retry(
+ attempt_number, response, caught_exception
+ )
+ if should_retry:
+ if attempt_number >= self._max_attempts:
+ # explicitly set MaxAttemptsReached
+ if response is not None and 'ResponseMetadata' in response[1]:
+ response[1]['ResponseMetadata']['MaxAttemptsReached'] = (
+ True
+ )
+ logger.debug(
+ "Reached the maximum number of retry attempts: %s",
+ attempt_number,
+ )
+ return False
+ else:
+ return should_retry
+ else:
+ return False
+
+ def _should_retry(self, attempt_number, response, caught_exception):
+ if self._retryable_exceptions and attempt_number < self._max_attempts:
+ try:
+ return self._checker(
+ attempt_number, response, caught_exception
+ )
+ except self._retryable_exceptions as e:
+ logger.debug(
+ "retry needed, retryable exception caught: %s",
+ e,
+ exc_info=True,
+ )
+ return True
+ else:
+ # If we've exceeded the max attempts we just let the exception
+ # propagate if one has occurred.
+ return self._checker(attempt_number, response, caught_exception)
+
+
+class HTTPStatusCodeChecker(BaseChecker):
+ def __init__(self, status_code):
+ self._status_code = status_code
+
+ def _check_response(self, attempt_number, response):
+ if response[0].status_code == self._status_code:
+ logger.debug(
+ "retry needed: retryable HTTP status code received: %s",
+ self._status_code,
+ )
+ return True
+ else:
+ return False
+
+
+class ServiceErrorCodeChecker(BaseChecker):
+ def __init__(self, status_code, error_code):
+ self._status_code = status_code
+ self._error_code = error_code
+
+ def _check_response(self, attempt_number, response):
+ if response[0].status_code == self._status_code:
+ actual_error_code = response[1].get('Error', {}).get('Code')
+ if actual_error_code == self._error_code:
+ logger.debug(
+ "retry needed: matching HTTP status and error code seen: "
+ "%s, %s",
+ self._status_code,
+ self._error_code,
+ )
+ return True
+ return False
+
+
+class MultiChecker(BaseChecker):
+ def __init__(self, checkers):
+ self._checkers = checkers
+
+ def __call__(self, attempt_number, response, caught_exception):
+ for checker in self._checkers:
+ checker_response = checker(
+ attempt_number, response, caught_exception
+ )
+ if checker_response:
+ return checker_response
+ return False
+
+
+class CRC32Checker(BaseChecker):
+ def __init__(self, header):
+ # The header where the expected crc32 is located.
+ self._header_name = header
+
+ def _check_response(self, attempt_number, response):
+ http_response = response[0]
+ expected_crc = http_response.headers.get(self._header_name)
+ if expected_crc is None:
+ logger.debug(
+ "crc32 check skipped, the %s header is not "
+ "in the http response.",
+ self._header_name,
+ )
+ else:
+ actual_crc32 = crc32(response[0].content) & 0xFFFFFFFF
+ if not actual_crc32 == int(expected_crc):
+ logger.debug(
+ "retry needed: crc32 check failed, expected != actual: "
+ "%s != %s",
+ int(expected_crc),
+ actual_crc32,
+ )
+ raise ChecksumError(
+ checksum_type='crc32',
+ expected_checksum=int(expected_crc),
+ actual_checksum=actual_crc32,
+ )
+
+
+class ExceptionRaiser(BaseChecker):
+ """Raise any caught exceptions.
+
+ This class will raise any non None ``caught_exception``.
+
+ """
+
+ def _check_caught_exception(self, attempt_number, caught_exception):
+ # This is implementation specific, but this class is useful by
+ # coordinating with the MaxAttemptsDecorator.
+ # The MaxAttemptsDecorator has a list of exceptions it should catch
+ # and retry, but something needs to come along and actually raise the
+ # caught_exception. That's what this class is being used for. If
+ # the MaxAttemptsDecorator is not interested in retrying the exception
+ # then this exception just propagates out past the retry code.
+ raise caught_exception