aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/discovery.py
blob: 95b51b81baff31ad0132cf8aa57e451f68a576a7 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import time
import weakref

from botocore import xform_name
from botocore.exceptions import BotoCoreError, ConnectionError, HTTPClientError
from botocore.model import OperationNotFoundError
from botocore.utils import CachedProperty

logger = logging.getLogger(__name__)


class EndpointDiscoveryException(BotoCoreError):
    pass


class EndpointDiscoveryRequired(EndpointDiscoveryException):
    """Endpoint Discovery is disabled but is required for this operation."""

    fmt = 'Endpoint Discovery is not enabled but this operation requires it.'


class EndpointDiscoveryRefreshFailed(EndpointDiscoveryException):
    """Endpoint Discovery failed to the refresh the known endpoints."""

    fmt = 'Endpoint Discovery failed to refresh the required endpoints.'


def block_endpoint_discovery_required_operations(model, **kwargs):
    endpoint_discovery = model.endpoint_discovery
    if endpoint_discovery and endpoint_discovery.get('required'):
        raise EndpointDiscoveryRequired()


class EndpointDiscoveryModel:
    def __init__(self, service_model):
        self._service_model = service_model

    @CachedProperty
    def discovery_operation_name(self):
        discovery_operation = self._service_model.endpoint_discovery_operation
        return xform_name(discovery_operation.name)

    @CachedProperty
    def discovery_operation_keys(self):
        discovery_operation = self._service_model.endpoint_discovery_operation
        keys = []
        if discovery_operation.input_shape:
            keys = list(discovery_operation.input_shape.members.keys())
        return keys

    def discovery_required_for(self, operation_name):
        try:
            operation_model = self._service_model.operation_model(
                operation_name
            )
            return operation_model.endpoint_discovery.get('required', False)
        except OperationNotFoundError:
            return False

    def discovery_operation_kwargs(self, **kwargs):
        input_keys = self.discovery_operation_keys
        # Operation and Identifiers are only sent if there are Identifiers
        if not kwargs.get('Identifiers'):
            kwargs.pop('Operation', None)
            kwargs.pop('Identifiers', None)
        return {k: v for k, v in kwargs.items() if k in input_keys}

    def gather_identifiers(self, operation, params):
        return self._gather_ids(operation.input_shape, params)

    def _gather_ids(self, shape, params, ids=None):
        # Traverse the input shape and corresponding parameters, gathering
        # any input fields labeled as an endpoint discovery id
        if ids is None:
            ids = {}
        for member_name, member_shape in shape.members.items():
            if member_shape.metadata.get('endpointdiscoveryid'):
                ids[member_name] = params[member_name]
            elif (
                member_shape.type_name == 'structure' and member_name in params
            ):
                self._gather_ids(member_shape, params[member_name], ids)
        return ids


class EndpointDiscoveryManager:
    def __init__(
        self, client, cache=None, current_time=None, always_discover=True
    ):
        if cache is None:
            cache = {}
        self._cache = cache
        self._failed_attempts = {}
        if current_time is None:
            current_time = time.time
        self._time = current_time
        self._always_discover = always_discover

        # This needs to be a weak ref in order to prevent memory leaks on
        # python 2.6
        self._client = weakref.proxy(client)
        self._model = EndpointDiscoveryModel(client.meta.service_model)

    def _parse_endpoints(self, response):
        endpoints = response['Endpoints']
        current_time = self._time()
        for endpoint in endpoints:
            cache_time = endpoint.get('CachePeriodInMinutes')
            endpoint['Expiration'] = current_time + cache_time * 60
        return endpoints

    def _cache_item(self, value):
        if isinstance(value, dict):
            return tuple(sorted(value.items()))
        else:
            return value

    def _create_cache_key(self, **kwargs):
        kwargs = self._model.discovery_operation_kwargs(**kwargs)
        return tuple(self._cache_item(v) for k, v in sorted(kwargs.items()))

    def gather_identifiers(self, operation, params):
        return self._model.gather_identifiers(operation, params)

    def delete_endpoints(self, **kwargs):
        cache_key = self._create_cache_key(**kwargs)
        if cache_key in self._cache:
            del self._cache[cache_key]

    def _describe_endpoints(self, **kwargs):
        # This is effectively a proxy to whatever name/kwargs the service
        # supports for endpoint discovery.
        kwargs = self._model.discovery_operation_kwargs(**kwargs)
        operation_name = self._model.discovery_operation_name
        discovery_operation = getattr(self._client, operation_name)
        logger.debug('Discovering endpoints with kwargs: %s', kwargs)
        return discovery_operation(**kwargs)

    def _get_current_endpoints(self, key):
        if key not in self._cache:
            return None
        now = self._time()
        return [e for e in self._cache[key] if now < e['Expiration']]

    def _refresh_current_endpoints(self, **kwargs):
        cache_key = self._create_cache_key(**kwargs)
        try:
            response = self._describe_endpoints(**kwargs)
            endpoints = self._parse_endpoints(response)
            self._cache[cache_key] = endpoints
            self._failed_attempts.pop(cache_key, None)
            return endpoints
        except (ConnectionError, HTTPClientError):
            self._failed_attempts[cache_key] = self._time() + 60
            return None

    def _recently_failed(self, cache_key):
        if cache_key in self._failed_attempts:
            now = self._time()
            if now < self._failed_attempts[cache_key]:
                return True
            del self._failed_attempts[cache_key]
        return False

    def _select_endpoint(self, endpoints):
        return endpoints[0]['Address']

    def describe_endpoint(self, **kwargs):
        operation = kwargs['Operation']
        discovery_required = self._model.discovery_required_for(operation)

        if not self._always_discover and not discovery_required:
            # Discovery set to only run on required operations
            logger.debug(
                f'Optional discovery disabled. Skipping discovery for Operation: {operation}'
            )
            return None

        # Get the endpoint for the provided operation and identifiers
        cache_key = self._create_cache_key(**kwargs)
        endpoints = self._get_current_endpoints(cache_key)
        if endpoints:
            return self._select_endpoint(endpoints)
        # All known endpoints are stale
        recently_failed = self._recently_failed(cache_key)
        if not recently_failed:
            # We haven't failed to discover recently, go ahead and refresh
            endpoints = self._refresh_current_endpoints(**kwargs)
            if endpoints:
                return self._select_endpoint(endpoints)
        # Discovery has failed recently, do our best to get an endpoint
        logger.debug('Endpoint Discovery has failed for: %s', kwargs)
        stale_entries = self._cache.get(cache_key, None)
        if stale_entries:
            # We have stale entries, use those while discovery is failing
            return self._select_endpoint(stale_entries)
        if discovery_required:
            # It looks strange to be checking recently_failed again but,
            # this informs us as to whether or not we tried to refresh earlier
            if recently_failed:
                # Discovery is required and we haven't already refreshed
                endpoints = self._refresh_current_endpoints(**kwargs)
                if endpoints:
                    return self._select_endpoint(endpoints)
            # No endpoints even refresh, raise hard error
            raise EndpointDiscoveryRefreshFailed()
        # Discovery is optional, just use the default endpoint for now
        return None


class EndpointDiscoveryHandler:
    def __init__(self, manager):
        self._manager = manager

    def register(self, events, service_id):
        events.register(
            f'before-parameter-build.{service_id}', self.gather_identifiers
        )
        events.register_first(
            f'request-created.{service_id}', self.discover_endpoint
        )
        events.register(f'needs-retry.{service_id}', self.handle_retries)

    def gather_identifiers(self, params, model, context, **kwargs):
        endpoint_discovery = model.endpoint_discovery
        # Only continue if the operation supports endpoint discovery
        if endpoint_discovery is None:
            return
        ids = self._manager.gather_identifiers(model, params)
        context['discovery'] = {'identifiers': ids}

    def discover_endpoint(self, request, operation_name, **kwargs):
        ids = request.context.get('discovery', {}).get('identifiers')
        if ids is None:
            return
        endpoint = self._manager.describe_endpoint(
            Operation=operation_name, Identifiers=ids
        )
        if endpoint is None:
            logger.debug('Failed to discover and inject endpoint')
            return
        if not endpoint.startswith('http'):
            endpoint = 'https://' + endpoint
        logger.debug('Injecting discovered endpoint: %s', endpoint)
        request.url = endpoint

    def handle_retries(self, request_dict, response, operation, **kwargs):
        if response is None:
            return None

        _, response = response
        status = response.get('ResponseMetadata', {}).get('HTTPStatusCode')
        error_code = response.get('Error', {}).get('Code')
        if status != 421 and error_code != 'InvalidEndpointException':
            return None

        context = request_dict.get('context', {})
        ids = context.get('discovery', {}).get('identifiers')
        if ids is None:
            return None

        # Delete the cached endpoints, forcing a refresh on retry
        # TODO: Improve eviction behavior to only evict the bad endpoint if
        # there are multiple. This will almost certainly require a lock.
        self._manager.delete_endpoints(
            Operation=operation.name, Identifiers=ids
        )
        return 0