about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/botocore/paginate.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/paginate.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/paginate.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/paginate.py724
1 files changed, 724 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/paginate.py b/.venv/lib/python3.12/site-packages/botocore/paginate.py
new file mode 100644
index 00000000..f7095967
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/paginate.py
@@ -0,0 +1,724 @@
+# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+import base64
+import json
+import logging
+from functools import partial
+from itertools import tee
+
+import jmespath
+
+from botocore.context import with_current_context
+from botocore.exceptions import PaginationError
+from botocore.useragent import register_feature_id
+from botocore.utils import merge_dicts, set_value_from_jmespath
+
+log = logging.getLogger(__name__)
+
+
+class TokenEncoder:
+    """Encodes dictionaries into opaque strings.
+
+    This for the most part json dumps + base64 encoding, but also supports
+    having bytes in the dictionary in addition to the types that json can
+    handle by default.
+
+    This is intended for use in encoding pagination tokens, which in some
+    cases can be complex structures and / or contain bytes.
+    """
+
+    def encode(self, token):
+        """Encodes a dictionary to an opaque string.
+
+        :type token: dict
+        :param token: A dictionary containing pagination information,
+            particularly the service pagination token(s) but also other boto
+            metadata.
+
+        :rtype: str
+        :returns: An opaque string
+        """
+        try:
+            # Try just using json dumps first to avoid having to traverse
+            # and encode the dict. In 99.9999% of cases this will work.
+            json_string = json.dumps(token)
+        except (TypeError, UnicodeDecodeError):
+            # If normal dumping failed, go through and base64 encode all bytes.
+            encoded_token, encoded_keys = self._encode(token, [])
+
+            # Save the list of all the encoded key paths. We can safely
+            # assume that no service will ever use this key.
+            encoded_token['boto_encoded_keys'] = encoded_keys
+
+            # Now that the bytes are all encoded, dump the json.
+            json_string = json.dumps(encoded_token)
+
+        # base64 encode the json string to produce an opaque token string.
+        return base64.b64encode(json_string.encode('utf-8')).decode('utf-8')
+
+    def _encode(self, data, path):
+        """Encode bytes in given data, keeping track of the path traversed."""
+        if isinstance(data, dict):
+            return self._encode_dict(data, path)
+        elif isinstance(data, list):
+            return self._encode_list(data, path)
+        elif isinstance(data, bytes):
+            return self._encode_bytes(data, path)
+        else:
+            return data, []
+
+    def _encode_list(self, data, path):
+        """Encode any bytes in a list, noting the index of what is encoded."""
+        new_data = []
+        encoded = []
+        for i, value in enumerate(data):
+            new_path = path + [i]
+            new_value, new_encoded = self._encode(value, new_path)
+            new_data.append(new_value)
+            encoded.extend(new_encoded)
+        return new_data, encoded
+
+    def _encode_dict(self, data, path):
+        """Encode any bytes in a dict, noting the index of what is encoded."""
+        new_data = {}
+        encoded = []
+        for key, value in data.items():
+            new_path = path + [key]
+            new_value, new_encoded = self._encode(value, new_path)
+            new_data[key] = new_value
+            encoded.extend(new_encoded)
+        return new_data, encoded
+
+    def _encode_bytes(self, data, path):
+        """Base64 encode a byte string."""
+        return base64.b64encode(data).decode('utf-8'), [path]
+
+
+class TokenDecoder:
+    """Decodes token strings back into dictionaries.
+
+    This performs the inverse operation to the TokenEncoder, accepting
+    opaque strings and decoding them into a useable form.
+    """
+
+    def decode(self, token):
+        """Decodes an opaque string to a dictionary.
+
+        :type token: str
+        :param token: A token string given by the botocore pagination
+            interface.
+
+        :rtype: dict
+        :returns: A dictionary containing pagination information,
+            particularly the service pagination token(s) but also other boto
+            metadata.
+        """
+        json_string = base64.b64decode(token.encode('utf-8')).decode('utf-8')
+        decoded_token = json.loads(json_string)
+
+        # Remove the encoding metadata as it is read since it will no longer
+        # be needed.
+        encoded_keys = decoded_token.pop('boto_encoded_keys', None)
+        if encoded_keys is None:
+            return decoded_token
+        else:
+            return self._decode(decoded_token, encoded_keys)
+
+    def _decode(self, token, encoded_keys):
+        """Find each encoded value and decode it."""
+        for key in encoded_keys:
+            encoded = self._path_get(token, key)
+            decoded = base64.b64decode(encoded.encode('utf-8'))
+            self._path_set(token, key, decoded)
+        return token
+
+    def _path_get(self, data, path):
+        """Return the nested data at the given path.
+
+        For instance:
+            data = {'foo': ['bar', 'baz']}
+            path = ['foo', 0]
+            ==> 'bar'
+        """
+        # jmespath isn't used here because it would be difficult to actually
+        # create the jmespath query when taking all of the unknowns of key
+        # structure into account. Gross though this is, it is simple and not
+        # very error prone.
+        d = data
+        for step in path:
+            d = d[step]
+        return d
+
+    def _path_set(self, data, path, value):
+        """Set the value of a key in the given data.
+
+        Example:
+            data = {'foo': ['bar', 'baz']}
+            path = ['foo', 1]
+            value = 'bin'
+            ==> data = {'foo': ['bar', 'bin']}
+        """
+        container = self._path_get(data, path[:-1])
+        container[path[-1]] = value
+
+
+class PaginatorModel:
+    def __init__(self, paginator_config):
+        self._paginator_config = paginator_config['pagination']
+
+    def get_paginator(self, operation_name):
+        try:
+            single_paginator_config = self._paginator_config[operation_name]
+        except KeyError:
+            raise ValueError(
+                f"Paginator for operation does not exist: {operation_name}"
+            )
+        return single_paginator_config
+
+
+class PageIterator:
+    """An iterable object to paginate API results.
+    Please note it is NOT a python iterator.
+    Use ``iter`` to wrap this as a generator.
+    """
+
+    def __init__(
+        self,
+        method,
+        input_token,
+        output_token,
+        more_results,
+        result_keys,
+        non_aggregate_keys,
+        limit_key,
+        max_items,
+        starting_token,
+        page_size,
+        op_kwargs,
+    ):
+        self._method = method
+        self._input_token = input_token
+        self._output_token = output_token
+        self._more_results = more_results
+        self._result_keys = result_keys
+        self._max_items = max_items
+        self._limit_key = limit_key
+        self._starting_token = starting_token
+        self._page_size = page_size
+        self._op_kwargs = op_kwargs
+        self._resume_token = None
+        self._non_aggregate_key_exprs = non_aggregate_keys
+        self._non_aggregate_part = {}
+        self._token_encoder = TokenEncoder()
+        self._token_decoder = TokenDecoder()
+
+    @property
+    def result_keys(self):
+        return self._result_keys
+
+    @property
+    def resume_token(self):
+        """Token to specify to resume pagination."""
+        return self._resume_token
+
+    @resume_token.setter
+    def resume_token(self, value):
+        if not isinstance(value, dict):
+            raise ValueError(f"Bad starting token: {value}")
+
+        if 'boto_truncate_amount' in value:
+            token_keys = sorted(self._input_token + ['boto_truncate_amount'])
+        else:
+            token_keys = sorted(self._input_token)
+        dict_keys = sorted(value.keys())
+
+        if token_keys == dict_keys:
+            self._resume_token = self._token_encoder.encode(value)
+        else:
+            raise ValueError(f"Bad starting token: {value}")
+
+    @property
+    def non_aggregate_part(self):
+        return self._non_aggregate_part
+
+    def __iter__(self):
+        current_kwargs = self._op_kwargs
+        previous_next_token = None
+        next_token = {key: None for key in self._input_token}
+        if self._starting_token is not None:
+            # If the starting token exists, populate the next_token with the
+            # values inside it. This ensures that we have the service's
+            # pagination token on hand if we need to truncate after the
+            # first response.
+            next_token = self._parse_starting_token()[0]
+        # The number of items from result_key we've seen so far.
+        total_items = 0
+        first_request = True
+        primary_result_key = self.result_keys[0]
+        starting_truncation = 0
+        self._inject_starting_params(current_kwargs)
+        while True:
+            response = self._make_request(current_kwargs)
+            parsed = self._extract_parsed_response(response)
+            if first_request:
+                # The first request is handled differently.  We could
+                # possibly have a resume/starting token that tells us where
+                # to index into the retrieved page.
+                if self._starting_token is not None:
+                    starting_truncation = self._handle_first_request(
+                        parsed, primary_result_key, starting_truncation
+                    )
+                first_request = False
+                self._record_non_aggregate_key_values(parsed)
+            else:
+                # If this isn't the first request, we have already sliced into
+                # the first request and had to make additional requests after.
+                # We no longer need to add this to truncation.
+                starting_truncation = 0
+            current_response = primary_result_key.search(parsed)
+            if current_response is None:
+                current_response = []
+            num_current_response = len(current_response)
+            truncate_amount = 0
+            if self._max_items is not None:
+                truncate_amount = (
+                    total_items + num_current_response - self._max_items
+                )
+            if truncate_amount > 0:
+                self._truncate_response(
+                    parsed,
+                    primary_result_key,
+                    truncate_amount,
+                    starting_truncation,
+                    next_token,
+                )
+                yield response
+                break
+            else:
+                yield response
+                total_items += num_current_response
+                next_token = self._get_next_token(parsed)
+                if all(t is None for t in next_token.values()):
+                    break
+                if (
+                    self._max_items is not None
+                    and total_items == self._max_items
+                ):
+                    # We're on a page boundary so we can set the current
+                    # next token to be the resume token.
+                    self.resume_token = next_token
+                    break
+                if (
+                    previous_next_token is not None
+                    and previous_next_token == next_token
+                ):
+                    message = (
+                        f"The same next token was received "
+                        f"twice: {next_token}"
+                    )
+                    raise PaginationError(message=message)
+                self._inject_token_into_kwargs(current_kwargs, next_token)
+                previous_next_token = next_token
+
+    def search(self, expression):
+        """Applies a JMESPath expression to a paginator
+
+        Each page of results is searched using the provided JMESPath
+        expression. If the result is not a list, it is yielded
+        directly. If the result is a list, each element in the result
+        is yielded individually (essentially implementing a flatmap in
+        which the JMESPath search is the mapping function).
+
+        :type expression: str
+        :param expression: JMESPath expression to apply to each page.
+
+        :return: Returns an iterator that yields the individual
+            elements of applying a JMESPath expression to each page of
+            results.
+        """
+        compiled = jmespath.compile(expression)
+        for page in self:
+            results = compiled.search(page)
+            if isinstance(results, list):
+                yield from results
+            else:
+                # Yield result directly if it is not a list.
+                yield results
+
+    @with_current_context(partial(register_feature_id, 'PAGINATOR'))
+    def _make_request(self, current_kwargs):
+        return self._method(**current_kwargs)
+
+    def _extract_parsed_response(self, response):
+        return response
+
+    def _record_non_aggregate_key_values(self, response):
+        non_aggregate_keys = {}
+        for expression in self._non_aggregate_key_exprs:
+            result = expression.search(response)
+            set_value_from_jmespath(
+                non_aggregate_keys, expression.expression, result
+            )
+        self._non_aggregate_part = non_aggregate_keys
+
+    def _inject_starting_params(self, op_kwargs):
+        # If the user has specified a starting token we need to
+        # inject that into the operation's kwargs.
+        if self._starting_token is not None:
+            # Don't need to do anything special if there is no starting
+            # token specified.
+            next_token = self._parse_starting_token()[0]
+            self._inject_token_into_kwargs(op_kwargs, next_token)
+        if self._page_size is not None:
+            # Pass the page size as the parameter name for limiting
+            # page size, also known as the limit_key.
+            op_kwargs[self._limit_key] = self._page_size
+
+    def _inject_token_into_kwargs(self, op_kwargs, next_token):
+        for name, token in next_token.items():
+            if (token is not None) and (token != 'None'):
+                op_kwargs[name] = token
+            elif name in op_kwargs:
+                del op_kwargs[name]
+
+    def _handle_first_request(
+        self, parsed, primary_result_key, starting_truncation
+    ):
+        # If the payload is an array or string, we need to slice into it
+        # and only return the truncated amount.
+        starting_truncation = self._parse_starting_token()[1]
+        all_data = primary_result_key.search(parsed)
+        if isinstance(all_data, (list, str)):
+            data = all_data[starting_truncation:]
+        else:
+            data = None
+        set_value_from_jmespath(parsed, primary_result_key.expression, data)
+        # We also need to truncate any secondary result keys
+        # because they were not truncated in the previous last
+        # response.
+        for token in self.result_keys:
+            if token == primary_result_key:
+                continue
+            sample = token.search(parsed)
+            if isinstance(sample, list):
+                empty_value = []
+            elif isinstance(sample, str):
+                empty_value = ''
+            elif isinstance(sample, (int, float)):
+                empty_value = 0
+            else:
+                empty_value = None
+            set_value_from_jmespath(parsed, token.expression, empty_value)
+        return starting_truncation
+
+    def _truncate_response(
+        self,
+        parsed,
+        primary_result_key,
+        truncate_amount,
+        starting_truncation,
+        next_token,
+    ):
+        original = primary_result_key.search(parsed)
+        if original is None:
+            original = []
+        amount_to_keep = len(original) - truncate_amount
+        truncated = original[:amount_to_keep]
+        set_value_from_jmespath(
+            parsed, primary_result_key.expression, truncated
+        )
+        # The issue here is that even though we know how much we've truncated
+        # we need to account for this globally including any starting
+        # left truncation. For example:
+        # Raw response: [0,1,2,3]
+        # Starting index: 1
+        # Max items: 1
+        # Starting left truncation: [1, 2, 3]
+        # End right truncation for max items: [1]
+        # However, even though we only kept 1, this is post
+        # left truncation so the next starting index should be 2, not 1
+        # (left_truncation + amount_to_keep).
+        next_token['boto_truncate_amount'] = (
+            amount_to_keep + starting_truncation
+        )
+        self.resume_token = next_token
+
+    def _get_next_token(self, parsed):
+        if self._more_results is not None:
+            if not self._more_results.search(parsed):
+                return {}
+        next_tokens = {}
+        for output_token, input_key in zip(
+            self._output_token, self._input_token
+        ):
+            next_token = output_token.search(parsed)
+            # We do not want to include any empty strings as actual tokens.
+            # Treat them as None.
+            if next_token:
+                next_tokens[input_key] = next_token
+            else:
+                next_tokens[input_key] = None
+        return next_tokens
+
+    def result_key_iters(self):
+        teed_results = tee(self, len(self.result_keys))
+        return [
+            ResultKeyIterator(i, result_key)
+            for i, result_key in zip(teed_results, self.result_keys)
+        ]
+
+    def build_full_result(self):
+        complete_result = {}
+        for response in self:
+            page = response
+            # We want to try to catch operation object pagination
+            # and format correctly for those. They come in the form
+            # of a tuple of two elements: (http_response, parsed_responsed).
+            # We want the parsed_response as that is what the page iterator
+            # uses. We can remove it though once operation objects are removed.
+            if isinstance(response, tuple) and len(response) == 2:
+                page = response[1]
+            # We're incrementally building the full response page
+            # by page.  For each page in the response we need to
+            # inject the necessary components from the page
+            # into the complete_result.
+            for result_expression in self.result_keys:
+                # In order to incrementally update a result key
+                # we need to search the existing value from complete_result,
+                # then we need to search the _current_ page for the
+                # current result key value.  Then we append the current
+                # value onto the existing value, and re-set that value
+                # as the new value.
+                result_value = result_expression.search(page)
+                if result_value is None:
+                    continue
+                existing_value = result_expression.search(complete_result)
+                if existing_value is None:
+                    # Set the initial result
+                    set_value_from_jmespath(
+                        complete_result,
+                        result_expression.expression,
+                        result_value,
+                    )
+                    continue
+                # Now both result_value and existing_value contain something
+                if isinstance(result_value, list):
+                    existing_value.extend(result_value)
+                elif isinstance(result_value, (int, float, str)):
+                    # Modify the existing result with the sum or concatenation
+                    set_value_from_jmespath(
+                        complete_result,
+                        result_expression.expression,
+                        existing_value + result_value,
+                    )
+        merge_dicts(complete_result, self.non_aggregate_part)
+        if self.resume_token is not None:
+            complete_result['NextToken'] = self.resume_token
+        return complete_result
+
+    def _parse_starting_token(self):
+        if self._starting_token is None:
+            return None
+
+        # The starting token is a dict passed as a base64 encoded string.
+        next_token = self._starting_token
+        try:
+            next_token = self._token_decoder.decode(next_token)
+            index = 0
+            if 'boto_truncate_amount' in next_token:
+                index = next_token.get('boto_truncate_amount')
+                del next_token['boto_truncate_amount']
+        except (ValueError, TypeError):
+            next_token, index = self._parse_starting_token_deprecated()
+        return next_token, index
+
+    def _parse_starting_token_deprecated(self):
+        """
+        This handles parsing of old style starting tokens, and attempts to
+        coerce them into the new style.
+        """
+        log.debug(
+            "Attempting to fall back to old starting token parser. For "
+            f"token: {self._starting_token}"
+        )
+        if self._starting_token is None:
+            return None
+
+        parts = self._starting_token.split('___')
+        next_token = []
+        index = 0
+        if len(parts) == len(self._input_token) + 1:
+            try:
+                index = int(parts.pop())
+            except ValueError:
+                # This doesn't look like a valid old-style token, so we're
+                # passing it along as an opaque service token.
+                parts = [self._starting_token]
+
+        for part in parts:
+            if part == 'None':
+                next_token.append(None)
+            else:
+                next_token.append(part)
+        return self._convert_deprecated_starting_token(next_token), index
+
+    def _convert_deprecated_starting_token(self, deprecated_token):
+        """
+        This attempts to convert a deprecated starting token into the new
+        style.
+        """
+        len_deprecated_token = len(deprecated_token)
+        len_input_token = len(self._input_token)
+        if len_deprecated_token > len_input_token:
+            raise ValueError(f"Bad starting token: {self._starting_token}")
+        elif len_deprecated_token < len_input_token:
+            log.debug(
+                "Old format starting token does not contain all input "
+                "tokens. Setting the rest, in order, as None."
+            )
+            for i in range(len_input_token - len_deprecated_token):
+                deprecated_token.append(None)
+        return dict(zip(self._input_token, deprecated_token))
+
+
+class Paginator:
+    PAGE_ITERATOR_CLS = PageIterator
+
+    def __init__(self, method, pagination_config, model):
+        self._model = model
+        self._method = method
+        self._pagination_cfg = pagination_config
+        self._output_token = self._get_output_tokens(self._pagination_cfg)
+        self._input_token = self._get_input_tokens(self._pagination_cfg)
+        self._more_results = self._get_more_results_token(self._pagination_cfg)
+        self._non_aggregate_keys = self._get_non_aggregate_keys(
+            self._pagination_cfg
+        )
+        self._result_keys = self._get_result_keys(self._pagination_cfg)
+        self._limit_key = self._get_limit_key(self._pagination_cfg)
+
+    @property
+    def result_keys(self):
+        return self._result_keys
+
+    def _get_non_aggregate_keys(self, config):
+        keys = []
+        for key in config.get('non_aggregate_keys', []):
+            keys.append(jmespath.compile(key))
+        return keys
+
+    def _get_output_tokens(self, config):
+        output = []
+        output_token = config['output_token']
+        if not isinstance(output_token, list):
+            output_token = [output_token]
+        for config in output_token:
+            output.append(jmespath.compile(config))
+        return output
+
+    def _get_input_tokens(self, config):
+        input_token = self._pagination_cfg['input_token']
+        if not isinstance(input_token, list):
+            input_token = [input_token]
+        return input_token
+
+    def _get_more_results_token(self, config):
+        more_results = config.get('more_results')
+        if more_results is not None:
+            return jmespath.compile(more_results)
+
+    def _get_result_keys(self, config):
+        result_key = config.get('result_key')
+        if result_key is not None:
+            if not isinstance(result_key, list):
+                result_key = [result_key]
+            result_key = [jmespath.compile(rk) for rk in result_key]
+            return result_key
+
+    def _get_limit_key(self, config):
+        return config.get('limit_key')
+
+    def paginate(self, **kwargs):
+        """Create paginator object for an operation.
+
+        This returns an iterable object.  Iterating over
+        this object will yield a single page of a response
+        at a time.
+
+        """
+        page_params = self._extract_paging_params(kwargs)
+        return self.PAGE_ITERATOR_CLS(
+            self._method,
+            self._input_token,
+            self._output_token,
+            self._more_results,
+            self._result_keys,
+            self._non_aggregate_keys,
+            self._limit_key,
+            page_params['MaxItems'],
+            page_params['StartingToken'],
+            page_params['PageSize'],
+            kwargs,
+        )
+
+    def _extract_paging_params(self, kwargs):
+        pagination_config = kwargs.pop('PaginationConfig', {})
+        max_items = pagination_config.get('MaxItems', None)
+        if max_items is not None:
+            max_items = int(max_items)
+        page_size = pagination_config.get('PageSize', None)
+        if page_size is not None:
+            if self._limit_key is None:
+                raise PaginationError(
+                    message="PageSize parameter is not supported for the "
+                    "pagination interface for this operation."
+                )
+            input_members = self._model.input_shape.members
+            limit_key_shape = input_members.get(self._limit_key)
+            if limit_key_shape.type_name == 'string':
+                if not isinstance(page_size, str):
+                    page_size = str(page_size)
+            else:
+                page_size = int(page_size)
+        return {
+            'MaxItems': max_items,
+            'StartingToken': pagination_config.get('StartingToken', None),
+            'PageSize': page_size,
+        }
+
+
+class ResultKeyIterator:
+    """Iterates over the results of paginated responses.
+
+    Each iterator is associated with a single result key.
+    Iterating over this object will give you each element in
+    the result key list.
+
+    :param pages_iterator: An iterator that will give you
+        pages of results (a ``PageIterator`` class).
+    :param result_key: The JMESPath expression representing
+        the result key.
+
+    """
+
+    def __init__(self, pages_iterator, result_key):
+        self._pages_iterator = pages_iterator
+        self.result_key = result_key
+
+    def __iter__(self):
+        for page in self._pages_iterator:
+            results = self.result_key.search(page)
+            if results is None:
+                results = []
+            yield from results