aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/storage3/_async/bucket.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/storage3/_async/bucket.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/storage3/_async/bucket.py')
-rw-r--r--.venv/lib/python3.12/site-packages/storage3/_async/bucket.py120
1 files changed, 120 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/storage3/_async/bucket.py b/.venv/lib/python3.12/site-packages/storage3/_async/bucket.py
new file mode 100644
index 00000000..9d6bf886
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/storage3/_async/bucket.py
@@ -0,0 +1,120 @@
+from __future__ import annotations
+
+from typing import Any, Optional
+
+from httpx import HTTPStatusError, Response
+
+from ..exceptions import StorageApiError
+from ..types import CreateOrUpdateBucketOptions, RequestMethod
+from ..utils import AsyncClient
+from .file_api import AsyncBucket
+
+__all__ = ["AsyncStorageBucketAPI"]
+
+
+class AsyncStorageBucketAPI:
+ """This class abstracts access to the endpoint to the Get, List, Empty, and Delete operations on a bucket"""
+
+ def __init__(self, session: AsyncClient) -> None:
+ self._client = session
+
+ async def _request(
+ self,
+ method: RequestMethod,
+ url: str,
+ json: Optional[dict[Any, Any]] = None,
+ ) -> Response:
+ try:
+ response = await self._client.request(method, url, json=json)
+ response.raise_for_status()
+ except HTTPStatusError as exc:
+ resp = exc.response.json()
+ raise StorageApiError(resp["message"], resp["error"], resp["statusCode"])
+
+ return response
+
+ async def list_buckets(self) -> list[AsyncBucket]:
+ """Retrieves the details of all storage buckets within an existing product."""
+ # if the request doesn't error, it is assured to return a list
+ res = await self._request("GET", "/bucket")
+ return [AsyncBucket(**bucket) for bucket in res.json()]
+
+ async def get_bucket(self, id: str) -> AsyncBucket:
+ """Retrieves the details of an existing storage bucket.
+
+ Parameters
+ ----------
+ id
+ The unique identifier of the bucket you would like to retrieve.
+ """
+ res = await self._request("GET", f"/bucket/{id}")
+ json = res.json()
+ return AsyncBucket(**json)
+
+ async def create_bucket(
+ self,
+ id: str,
+ name: Optional[str] = None,
+ options: Optional[CreateOrUpdateBucketOptions] = None,
+ ) -> dict[str, str]:
+ """Creates a new storage bucket.
+
+ Parameters
+ ----------
+ id
+ A unique identifier for the bucket you are creating.
+ name
+ A name for the bucket you are creating. If not passed, the id is used as the name as well.
+ options
+ Extra options to send while creating the bucket. Valid options are `public`, `file_size_limit` and
+ `allowed_mime_types`.
+ """
+ json: dict[str, Any] = {"id": id, "name": name or id}
+ if options:
+ json.update(**options)
+ res = await self._request(
+ "POST",
+ "/bucket",
+ json=json,
+ )
+ return res.json()
+
+ async def update_bucket(
+ self, id: str, options: CreateOrUpdateBucketOptions
+ ) -> dict[str, str]:
+ """Update a storage bucket.
+
+ Parameters
+ ----------
+ id
+ The unique identifier of the bucket you would like to update.
+ options
+ The properties you want to update. Valid options are `public`, `file_size_limit` and
+ `allowed_mime_types`.
+ """
+ json = {"id": id, "name": id, **options}
+ res = await self._request("PUT", f"/bucket/{id}", json=json)
+ return res.json()
+
+ async def empty_bucket(self, id: str) -> dict[str, str]:
+ """Removes all objects inside a single bucket.
+
+ Parameters
+ ----------
+ id
+ The unique identifier of the bucket you would like to empty.
+ """
+ res = await self._request("POST", f"/bucket/{id}/empty", json={})
+ return res.json()
+
+ async def delete_bucket(self, id: str) -> dict[str, str]:
+ """Deletes an existing bucket. Note that you cannot delete buckets with existing objects inside. You must first
+ `empty()` the bucket.
+
+ Parameters
+ ----------
+ id
+ The unique identifier of the bucket you would like to delete.
+ """
+ res = await self._request("DELETE", f"/bucket/{id}", json={})
+ return res.json()