aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/boto3/dynamodb/table.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/boto3/dynamodb/table.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/boto3/dynamodb/table.py')
-rw-r--r--.venv/lib/python3.12/site-packages/boto3/dynamodb/table.py167
1 files changed, 167 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/boto3/dynamodb/table.py b/.venv/lib/python3.12/site-packages/boto3/dynamodb/table.py
new file mode 100644
index 00000000..931296bc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/boto3/dynamodb/table.py
@@ -0,0 +1,167 @@
+# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# https://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def register_table_methods(base_classes, **kwargs):
+ base_classes.insert(0, TableResource)
+
+
+# This class can be used to add any additional methods we want
+# onto a table resource. Ideally to avoid creating a new
+# base class for every method we can just update this
+# class instead. Just be sure to move the bulk of the
+# actual method implementation to another class.
+class TableResource:
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ def batch_writer(self, overwrite_by_pkeys=None):
+ """Create a batch writer object.
+
+ This method creates a context manager for writing
+ objects to Amazon DynamoDB in batch.
+
+ The batch writer will automatically handle buffering and sending items
+ in batches. In addition, the batch writer will also automatically
+ handle any unprocessed items and resend them as needed. All you need
+ to do is call ``put_item`` for any items you want to add, and
+ ``delete_item`` for any items you want to delete.
+
+ Example usage::
+
+ with table.batch_writer() as batch:
+ for _ in range(1000000):
+ batch.put_item(Item={'HashKey': '...',
+ 'Otherstuff': '...'})
+ # You can also delete_items in a batch.
+ batch.delete_item(Key={'HashKey': 'SomeHashKey'})
+
+ :type overwrite_by_pkeys: list(string)
+ :param overwrite_by_pkeys: De-duplicate request items in buffer
+ if match new request item on specified primary keys. i.e
+ ``["partition_key1", "sort_key2", "sort_key3"]``
+
+ """
+ return BatchWriter(
+ self.name, self.meta.client, overwrite_by_pkeys=overwrite_by_pkeys
+ )
+
+
+class BatchWriter:
+ """Automatically handle batch writes to DynamoDB for a single table."""
+
+ def __init__(
+ self, table_name, client, flush_amount=25, overwrite_by_pkeys=None
+ ):
+ """
+
+ :type table_name: str
+ :param table_name: The name of the table. The class handles
+ batch writes to a single table.
+
+ :type client: ``botocore.client.Client``
+ :param client: A botocore client. Note this client
+ **must** have the dynamodb customizations applied
+ to it for transforming AttributeValues into the
+ wire protocol. What this means in practice is that
+ you need to use a client that comes from a DynamoDB
+ resource if you're going to instantiate this class
+ directly, i.e
+ ``boto3.resource('dynamodb').Table('foo').meta.client``.
+
+ :type flush_amount: int
+ :param flush_amount: The number of items to keep in
+ a local buffer before sending a batch_write_item
+ request to DynamoDB.
+
+ :type overwrite_by_pkeys: list(string)
+ :param overwrite_by_pkeys: De-duplicate request items in buffer
+ if match new request item on specified primary keys. i.e
+ ``["partition_key1", "sort_key2", "sort_key3"]``
+
+ """
+ self._table_name = table_name
+ self._client = client
+ self._items_buffer = []
+ self._flush_amount = flush_amount
+ self._overwrite_by_pkeys = overwrite_by_pkeys
+
+ def put_item(self, Item):
+ self._add_request_and_process({'PutRequest': {'Item': Item}})
+
+ def delete_item(self, Key):
+ self._add_request_and_process({'DeleteRequest': {'Key': Key}})
+
+ def _add_request_and_process(self, request):
+ if self._overwrite_by_pkeys:
+ self._remove_dup_pkeys_request_if_any(request)
+ self._items_buffer.append(request)
+ self._flush_if_needed()
+
+ def _remove_dup_pkeys_request_if_any(self, request):
+ pkey_values_new = self._extract_pkey_values(request)
+ for item in self._items_buffer:
+ if self._extract_pkey_values(item) == pkey_values_new:
+ self._items_buffer.remove(item)
+ logger.debug(
+ "With overwrite_by_pkeys enabled, skipping " "request:%s",
+ item,
+ )
+
+ def _extract_pkey_values(self, request):
+ if request.get('PutRequest'):
+ return [
+ request['PutRequest']['Item'][key]
+ for key in self._overwrite_by_pkeys
+ ]
+ elif request.get('DeleteRequest'):
+ return [
+ request['DeleteRequest']['Key'][key]
+ for key in self._overwrite_by_pkeys
+ ]
+ return None
+
+ def _flush_if_needed(self):
+ if len(self._items_buffer) >= self._flush_amount:
+ self._flush()
+
+ def _flush(self):
+ items_to_send = self._items_buffer[: self._flush_amount]
+ self._items_buffer = self._items_buffer[self._flush_amount :]
+ response = self._client.batch_write_item(
+ RequestItems={self._table_name: items_to_send}
+ )
+ unprocessed_items = response['UnprocessedItems']
+ if not unprocessed_items:
+ unprocessed_items = {}
+ item_list = unprocessed_items.get(self._table_name, [])
+ # Any unprocessed_items are immediately added to the
+ # next batch we send.
+ self._items_buffer.extend(item_list)
+ logger.debug(
+ "Batch write sent %s, unprocessed: %s",
+ len(items_to_send),
+ len(self._items_buffer),
+ )
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ # When we exit, we need to keep flushing whatever's left
+ # until there's nothing left in our items buffer.
+ while self._items_buffer:
+ self._flush()