1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
from __future__ import annotations
from typing import Any, Optional
from httpx import HTTPStatusError, Response
from ..exceptions import StorageApiError
from ..types import CreateOrUpdateBucketOptions, RequestMethod
from ..utils import AsyncClient
from .file_api import AsyncBucket
__all__ = ["AsyncStorageBucketAPI"]
class AsyncStorageBucketAPI:
"""This class abstracts access to the endpoint to the Get, List, Empty, and Delete operations on a bucket"""
def __init__(self, session: AsyncClient) -> None:
self._client = session
async def _request(
self,
method: RequestMethod,
url: str,
json: Optional[dict[Any, Any]] = None,
) -> Response:
try:
response = await self._client.request(method, url, json=json)
response.raise_for_status()
except HTTPStatusError as exc:
resp = exc.response.json()
raise StorageApiError(resp["message"], resp["error"], resp["statusCode"])
return response
async def list_buckets(self) -> list[AsyncBucket]:
"""Retrieves the details of all storage buckets within an existing product."""
# if the request doesn't error, it is assured to return a list
res = await self._request("GET", "/bucket")
return [AsyncBucket(**bucket) for bucket in res.json()]
async def get_bucket(self, id: str) -> AsyncBucket:
"""Retrieves the details of an existing storage bucket.
Parameters
----------
id
The unique identifier of the bucket you would like to retrieve.
"""
res = await self._request("GET", f"/bucket/{id}")
json = res.json()
return AsyncBucket(**json)
async def create_bucket(
self,
id: str,
name: Optional[str] = None,
options: Optional[CreateOrUpdateBucketOptions] = None,
) -> dict[str, str]:
"""Creates a new storage bucket.
Parameters
----------
id
A unique identifier for the bucket you are creating.
name
A name for the bucket you are creating. If not passed, the id is used as the name as well.
options
Extra options to send while creating the bucket. Valid options are `public`, `file_size_limit` and
`allowed_mime_types`.
"""
json: dict[str, Any] = {"id": id, "name": name or id}
if options:
json.update(**options)
res = await self._request(
"POST",
"/bucket",
json=json,
)
return res.json()
async def update_bucket(
self, id: str, options: CreateOrUpdateBucketOptions
) -> dict[str, str]:
"""Update a storage bucket.
Parameters
----------
id
The unique identifier of the bucket you would like to update.
options
The properties you want to update. Valid options are `public`, `file_size_limit` and
`allowed_mime_types`.
"""
json = {"id": id, "name": id, **options}
res = await self._request("PUT", f"/bucket/{id}", json=json)
return res.json()
async def empty_bucket(self, id: str) -> dict[str, str]:
"""Removes all objects inside a single bucket.
Parameters
----------
id
The unique identifier of the bucket you would like to empty.
"""
res = await self._request("POST", f"/bucket/{id}/empty", json={})
return res.json()
async def delete_bucket(self, id: str) -> dict[str, str]:
"""Deletes an existing bucket. Note that you cannot delete buckets with existing objects inside. You must first
`empty()` the bucket.
Parameters
----------
id
The unique identifier of the bucket you would like to delete.
"""
res = await self._request("DELETE", f"/bucket/{id}", json={})
return res.json()
|