aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/boto3/resources/collection.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/boto3/resources/collection.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/boto3/resources/collection.py')
-rw-r--r--.venv/lib/python3.12/site-packages/boto3/resources/collection.py566
1 files changed, 566 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/boto3/resources/collection.py b/.venv/lib/python3.12/site-packages/boto3/resources/collection.py
new file mode 100644
index 00000000..5d4c9e9d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/boto3/resources/collection.py
@@ -0,0 +1,566 @@
+# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# https://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+import copy
+import logging
+
+from botocore import xform_name
+from botocore.utils import merge_dicts
+
+from ..docs import docstring
+from .action import BatchAction
+from .params import create_request_parameters
+from .response import ResourceHandler
+
+logger = logging.getLogger(__name__)
+
+
+class ResourceCollection:
+ """
+ Represents a collection of resources, which can be iterated through,
+ optionally with filtering. Collections automatically handle pagination
+ for you.
+
+ See :ref:`guide_collections` for a high-level overview of collections,
+ including when remote service requests are performed.
+
+ :type model: :py:class:`~boto3.resources.model.Collection`
+ :param model: Collection model
+ :type parent: :py:class:`~boto3.resources.base.ServiceResource`
+ :param parent: The collection's parent resource
+ :type handler: :py:class:`~boto3.resources.response.ResourceHandler`
+ :param handler: The resource response handler used to create resource
+ instances
+ """
+
+ def __init__(self, model, parent, handler, **kwargs):
+ self._model = model
+ self._parent = parent
+ self._py_operation_name = xform_name(model.request.operation)
+ self._handler = handler
+ self._params = copy.deepcopy(kwargs)
+
+ def __repr__(self):
+ return '{}({}, {})'.format(
+ self.__class__.__name__,
+ self._parent,
+ f'{self._parent.meta.service_name}.{self._model.resource.type}',
+ )
+
+ def __iter__(self):
+ """
+ A generator which yields resource instances after doing the
+ appropriate service operation calls and handling any pagination
+ on your behalf.
+
+ Page size, item limit, and filter parameters are applied
+ if they have previously been set.
+
+ >>> bucket = s3.Bucket('boto3')
+ >>> for obj in bucket.objects.all():
+ ... print(obj.key)
+ 'key1'
+ 'key2'
+
+ """
+ limit = self._params.get('limit', None)
+
+ count = 0
+ for page in self.pages():
+ for item in page:
+ yield item
+
+ # If the limit is set and has been reached, then
+ # we stop processing items here.
+ count += 1
+ if limit is not None and count >= limit:
+ return
+
+ def _clone(self, **kwargs):
+ """
+ Create a clone of this collection. This is used by the methods
+ below to provide a chainable interface that returns copies
+ rather than the original. This allows things like:
+
+ >>> base = collection.filter(Param1=1)
+ >>> query1 = base.filter(Param2=2)
+ >>> query2 = base.filter(Param3=3)
+ >>> query1.params
+ {'Param1': 1, 'Param2': 2}
+ >>> query2.params
+ {'Param1': 1, 'Param3': 3}
+
+ :rtype: :py:class:`ResourceCollection`
+ :return: A clone of this resource collection
+ """
+ params = copy.deepcopy(self._params)
+ merge_dicts(params, kwargs, append_lists=True)
+ clone = self.__class__(
+ self._model, self._parent, self._handler, **params
+ )
+ return clone
+
+ def pages(self):
+ """
+ A generator which yields pages of resource instances after
+ doing the appropriate service operation calls and handling
+ any pagination on your behalf. Non-paginated calls will
+ return a single page of items.
+
+ Page size, item limit, and filter parameters are applied
+ if they have previously been set.
+
+ >>> bucket = s3.Bucket('boto3')
+ >>> for page in bucket.objects.pages():
+ ... for obj in page:
+ ... print(obj.key)
+ 'key1'
+ 'key2'
+
+ :rtype: list(:py:class:`~boto3.resources.base.ServiceResource`)
+ :return: List of resource instances
+ """
+ client = self._parent.meta.client
+ cleaned_params = self._params.copy()
+ limit = cleaned_params.pop('limit', None)
+ page_size = cleaned_params.pop('page_size', None)
+ params = create_request_parameters(self._parent, self._model.request)
+ merge_dicts(params, cleaned_params, append_lists=True)
+
+ # Is this a paginated operation? If so, we need to get an
+ # iterator for the various pages. If not, then we simply
+ # call the operation and return the result as a single
+ # page in a list. For non-paginated results, we just ignore
+ # the page size parameter.
+ if client.can_paginate(self._py_operation_name):
+ logger.debug(
+ 'Calling paginated %s:%s with %r',
+ self._parent.meta.service_name,
+ self._py_operation_name,
+ params,
+ )
+ paginator = client.get_paginator(self._py_operation_name)
+ pages = paginator.paginate(
+ PaginationConfig={'MaxItems': limit, 'PageSize': page_size},
+ **params,
+ )
+ else:
+ logger.debug(
+ 'Calling %s:%s with %r',
+ self._parent.meta.service_name,
+ self._py_operation_name,
+ params,
+ )
+ pages = [getattr(client, self._py_operation_name)(**params)]
+
+ # Now that we have a page iterator or single page of results
+ # we start processing and yielding individual items.
+ count = 0
+ for page in pages:
+ page_items = []
+ for item in self._handler(self._parent, params, page):
+ page_items.append(item)
+
+ # If the limit is set and has been reached, then
+ # we stop processing items here.
+ count += 1
+ if limit is not None and count >= limit:
+ break
+
+ yield page_items
+
+ # Stop reading pages if we've reached out limit
+ if limit is not None and count >= limit:
+ break
+
+ def all(self):
+ """
+ Get all items from the collection, optionally with a custom
+ page size and item count limit.
+
+ This method returns an iterable generator which yields
+ individual resource instances. Example use::
+
+ # Iterate through items
+ >>> for queue in sqs.queues.all():
+ ... print(queue.url)
+ 'https://url1'
+ 'https://url2'
+
+ # Convert to list
+ >>> queues = list(sqs.queues.all())
+ >>> len(queues)
+ 2
+ """
+ return self._clone()
+
+ def filter(self, **kwargs):
+ """
+ Get items from the collection, passing keyword arguments along
+ as parameters to the underlying service operation, which are
+ typically used to filter the results.
+
+ This method returns an iterable generator which yields
+ individual resource instances. Example use::
+
+ # Iterate through items
+ >>> for queue in sqs.queues.filter(Param='foo'):
+ ... print(queue.url)
+ 'https://url1'
+ 'https://url2'
+
+ # Convert to list
+ >>> queues = list(sqs.queues.filter(Param='foo'))
+ >>> len(queues)
+ 2
+
+ :rtype: :py:class:`ResourceCollection`
+ """
+ return self._clone(**kwargs)
+
+ def limit(self, count):
+ """
+ Return at most this many resources.
+
+ >>> for bucket in s3.buckets.limit(5):
+ ... print(bucket.name)
+ 'bucket1'
+ 'bucket2'
+ 'bucket3'
+ 'bucket4'
+ 'bucket5'
+
+ :type count: int
+ :param count: Return no more than this many items
+ :rtype: :py:class:`ResourceCollection`
+ """
+ return self._clone(limit=count)
+
+ def page_size(self, count):
+ """
+ Fetch at most this many resources per service request.
+
+ >>> for obj in s3.Bucket('boto3').objects.page_size(100):
+ ... print(obj.key)
+
+ :type count: int
+ :param count: Fetch this many items per request
+ :rtype: :py:class:`ResourceCollection`
+ """
+ return self._clone(page_size=count)
+
+
+class CollectionManager:
+ """
+ A collection manager provides access to resource collection instances,
+ which can be iterated and filtered. The manager exposes some
+ convenience functions that are also found on resource collections,
+ such as :py:meth:`~ResourceCollection.all` and
+ :py:meth:`~ResourceCollection.filter`.
+
+ Get all items::
+
+ >>> for bucket in s3.buckets.all():
+ ... print(bucket.name)
+
+ Get only some items via filtering::
+
+ >>> for queue in sqs.queues.filter(QueueNamePrefix='AWS'):
+ ... print(queue.url)
+
+ Get whole pages of items:
+
+ >>> for page in s3.Bucket('boto3').objects.pages():
+ ... for obj in page:
+ ... print(obj.key)
+
+ A collection manager is not iterable. You **must** call one of the
+ methods that return a :py:class:`ResourceCollection` before trying
+ to iterate, slice, or convert to a list.
+
+ See the :ref:`guide_collections` guide for a high-level overview
+ of collections, including when remote service requests are performed.
+
+ :type collection_model: :py:class:`~boto3.resources.model.Collection`
+ :param model: Collection model
+
+ :type parent: :py:class:`~boto3.resources.base.ServiceResource`
+ :param parent: The collection's parent resource
+
+ :type factory: :py:class:`~boto3.resources.factory.ResourceFactory`
+ :param factory: The resource factory to create new resources
+
+ :type service_context: :py:class:`~boto3.utils.ServiceContext`
+ :param service_context: Context about the AWS service
+ """
+
+ # The class to use when creating an iterator
+ _collection_cls = ResourceCollection
+
+ def __init__(self, collection_model, parent, factory, service_context):
+ self._model = collection_model
+ operation_name = self._model.request.operation
+ self._parent = parent
+
+ search_path = collection_model.resource.path
+ self._handler = ResourceHandler(
+ search_path=search_path,
+ factory=factory,
+ resource_model=collection_model.resource,
+ service_context=service_context,
+ operation_name=operation_name,
+ )
+
+ def __repr__(self):
+ return '{}({}, {})'.format(
+ self.__class__.__name__,
+ self._parent,
+ f'{self._parent.meta.service_name}.{self._model.resource.type}',
+ )
+
+ def iterator(self, **kwargs):
+ """
+ Get a resource collection iterator from this manager.
+
+ :rtype: :py:class:`ResourceCollection`
+ :return: An iterable representing the collection of resources
+ """
+ return self._collection_cls(
+ self._model, self._parent, self._handler, **kwargs
+ )
+
+ # Set up some methods to proxy ResourceCollection methods
+ def all(self):
+ return self.iterator()
+
+ all.__doc__ = ResourceCollection.all.__doc__
+
+ def filter(self, **kwargs):
+ return self.iterator(**kwargs)
+
+ filter.__doc__ = ResourceCollection.filter.__doc__
+
+ def limit(self, count):
+ return self.iterator(limit=count)
+
+ limit.__doc__ = ResourceCollection.limit.__doc__
+
+ def page_size(self, count):
+ return self.iterator(page_size=count)
+
+ page_size.__doc__ = ResourceCollection.page_size.__doc__
+
+ def pages(self):
+ return self.iterator().pages()
+
+ pages.__doc__ = ResourceCollection.pages.__doc__
+
+
+class CollectionFactory:
+ """
+ A factory to create new
+ :py:class:`CollectionManager` and :py:class:`ResourceCollection`
+ subclasses from a :py:class:`~boto3.resources.model.Collection`
+ model. These subclasses include methods to perform batch operations.
+ """
+
+ def load_from_definition(
+ self, resource_name, collection_model, service_context, event_emitter
+ ):
+ """
+ Loads a collection from a model, creating a new
+ :py:class:`CollectionManager` subclass
+ with the correct properties and methods, named based on the service
+ and resource name, e.g. ec2.InstanceCollectionManager. It also
+ creates a new :py:class:`ResourceCollection` subclass which is used
+ by the new manager class.
+
+ :type resource_name: string
+ :param resource_name: Name of the resource to look up. For services,
+ this should match the ``service_name``.
+
+ :type service_context: :py:class:`~boto3.utils.ServiceContext`
+ :param service_context: Context about the AWS service
+
+ :type event_emitter: :py:class:`~botocore.hooks.HierarchialEmitter`
+ :param event_emitter: An event emitter
+
+ :rtype: Subclass of :py:class:`CollectionManager`
+ :return: The collection class.
+ """
+ attrs = {}
+ collection_name = collection_model.name
+
+ # Create the batch actions for a collection
+ self._load_batch_actions(
+ attrs,
+ resource_name,
+ collection_model,
+ service_context.service_model,
+ event_emitter,
+ )
+ # Add the documentation to the collection class's methods
+ self._load_documented_collection_methods(
+ attrs=attrs,
+ resource_name=resource_name,
+ collection_model=collection_model,
+ service_model=service_context.service_model,
+ event_emitter=event_emitter,
+ base_class=ResourceCollection,
+ )
+
+ if service_context.service_name == resource_name:
+ cls_name = (
+ f'{service_context.service_name}.{collection_name}Collection'
+ )
+ else:
+ cls_name = f'{service_context.service_name}.{resource_name}.{collection_name}Collection'
+
+ collection_cls = type(str(cls_name), (ResourceCollection,), attrs)
+
+ # Add the documentation to the collection manager's methods
+ self._load_documented_collection_methods(
+ attrs=attrs,
+ resource_name=resource_name,
+ collection_model=collection_model,
+ service_model=service_context.service_model,
+ event_emitter=event_emitter,
+ base_class=CollectionManager,
+ )
+ attrs['_collection_cls'] = collection_cls
+ cls_name += 'Manager'
+
+ return type(str(cls_name), (CollectionManager,), attrs)
+
+ def _load_batch_actions(
+ self,
+ attrs,
+ resource_name,
+ collection_model,
+ service_model,
+ event_emitter,
+ ):
+ """
+ Batch actions on the collection become methods on both
+ the collection manager and iterators.
+ """
+ for action_model in collection_model.batch_actions:
+ snake_cased = xform_name(action_model.name)
+ attrs[snake_cased] = self._create_batch_action(
+ resource_name,
+ snake_cased,
+ action_model,
+ collection_model,
+ service_model,
+ event_emitter,
+ )
+
+ def _load_documented_collection_methods(
+ factory_self,
+ attrs,
+ resource_name,
+ collection_model,
+ service_model,
+ event_emitter,
+ base_class,
+ ):
+ # The base class already has these methods defined. However
+ # the docstrings are generic and not based for a particular service
+ # or resource. So we override these methods by proxying to the
+ # base class's builtin method and adding a docstring
+ # that pertains to the resource.
+
+ # A collection's all() method.
+ def all(self):
+ return base_class.all(self)
+
+ all.__doc__ = docstring.CollectionMethodDocstring(
+ resource_name=resource_name,
+ action_name='all',
+ event_emitter=event_emitter,
+ collection_model=collection_model,
+ service_model=service_model,
+ include_signature=False,
+ )
+ attrs['all'] = all
+
+ # The collection's filter() method.
+ def filter(self, **kwargs):
+ return base_class.filter(self, **kwargs)
+
+ filter.__doc__ = docstring.CollectionMethodDocstring(
+ resource_name=resource_name,
+ action_name='filter',
+ event_emitter=event_emitter,
+ collection_model=collection_model,
+ service_model=service_model,
+ include_signature=False,
+ )
+ attrs['filter'] = filter
+
+ # The collection's limit method.
+ def limit(self, count):
+ return base_class.limit(self, count)
+
+ limit.__doc__ = docstring.CollectionMethodDocstring(
+ resource_name=resource_name,
+ action_name='limit',
+ event_emitter=event_emitter,
+ collection_model=collection_model,
+ service_model=service_model,
+ include_signature=False,
+ )
+ attrs['limit'] = limit
+
+ # The collection's page_size method.
+ def page_size(self, count):
+ return base_class.page_size(self, count)
+
+ page_size.__doc__ = docstring.CollectionMethodDocstring(
+ resource_name=resource_name,
+ action_name='page_size',
+ event_emitter=event_emitter,
+ collection_model=collection_model,
+ service_model=service_model,
+ include_signature=False,
+ )
+ attrs['page_size'] = page_size
+
+ def _create_batch_action(
+ factory_self,
+ resource_name,
+ snake_cased,
+ action_model,
+ collection_model,
+ service_model,
+ event_emitter,
+ ):
+ """
+ Creates a new method which makes a batch operation request
+ to the underlying service API.
+ """
+ action = BatchAction(action_model)
+
+ def batch_action(self, *args, **kwargs):
+ return action(self, *args, **kwargs)
+
+ batch_action.__name__ = str(snake_cased)
+ batch_action.__doc__ = docstring.BatchActionDocstring(
+ resource_name=resource_name,
+ event_emitter=event_emitter,
+ batch_action_model=action_model,
+ service_model=service_model,
+ collection_model=collection_model,
+ include_signature=False,
+ )
+ return batch_action