diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/storage3/_async/file_api.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/storage3/_async/file_api.py | 559 |
1 files changed, 559 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/storage3/_async/file_api.py b/.venv/lib/python3.12/site-packages/storage3/_async/file_api.py new file mode 100644 index 00000000..029a0330 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/storage3/_async/file_api.py @@ -0,0 +1,559 @@ +from __future__ import annotations + +import base64 +import json +import urllib.parse +from dataclasses import dataclass, field +from io import BufferedReader, FileIO +from pathlib import Path +from typing import Any, Literal, Optional, Union, cast + +from httpx import HTTPStatusError, Response + +from ..constants import DEFAULT_FILE_OPTIONS, DEFAULT_SEARCH_OPTIONS +from ..exceptions import StorageApiError +from ..types import ( + BaseBucket, + CreateSignedUrlResponse, + CreateSignedURLsOptions, + DownloadOptions, + FileOptions, + ListBucketFilesOptions, + RequestMethod, + SignedUploadURL, + SignedUrlResponse, + UploadData, + UploadResponse, + URLOptions, +) +from ..utils import AsyncClient, StorageException + +__all__ = ["AsyncBucket"] + + +class AsyncBucketActionsMixin: + """Functions needed to access the file API.""" + + id: str + _client: AsyncClient + + async def _request( + self, + method: RequestMethod, + url: str, + headers: Optional[dict[str, Any]] = None, + json: Optional[dict[Any, Any]] = None, + files: Optional[Any] = None, + **kwargs: Any, + ) -> Response: + try: + response = await self._client.request( + method, url, headers=headers or {}, json=json, files=files, **kwargs + ) + response.raise_for_status() + except HTTPStatusError as exc: + resp = exc.response.json() + raise StorageApiError(resp["message"], resp["error"], resp["statusCode"]) + + # close the resource before returning the response + if files and "file" in files and isinstance(files["file"][1], BufferedReader): + files["file"][1].close() + + return response + + async def create_signed_upload_url(self, path: str) -> SignedUploadURL: + """ + Creates a signed upload URL. + + Parameters + ---------- + path + The file path, including the file name. For example `folder/image.png`. + """ + _path = self._get_final_path(path) + response = await self._request("POST", f"/object/upload/sign/{_path}") + data = response.json() + full_url: urllib.parse.ParseResult = urllib.parse.urlparse( + str(self._client.base_url) + cast(str, data["url"]).lstrip("/") + ) + query_params = urllib.parse.parse_qs(full_url.query) + if not query_params.get("token"): + raise StorageException("No token sent by the API") + return { + "signed_url": full_url.geturl(), + "signedUrl": full_url.geturl(), + "token": query_params["token"][0], + "path": path, + } + + async def upload_to_signed_url( + self, + path: str, + token: str, + file: Union[BufferedReader, bytes, FileIO, str, Path], + file_options: Optional[FileOptions] = None, + ) -> UploadResponse: + """ + Upload a file with a token generated from :meth:`.create_signed_url` + + Parameters + ---------- + path + The file path, including the file name + token + The token generated from :meth:`.create_signed_url` + file + The file contents or a file-like object to upload + file_options + Additional options for the uploaded file + """ + _path = self._get_final_path(path) + _url = urllib.parse.urlparse(f"/object/upload/sign/{_path}") + query_params = urllib.parse.urlencode({"token": token}) + final_url = f"{_url.geturl()}?{query_params}" + + if file_options is None: + file_options = {} + + cache_control = file_options.get("cache-control") + # cacheControl is also passed as form data + # https://github.com/supabase/storage-js/blob/fa44be8156295ba6320ffeff96bdf91016536a46/src/packages/StorageFileApi.ts#L89 + _data = {} + if cache_control: + file_options["cache-control"] = f"max-age={cache_control}" + _data = {"cacheControl": cache_control} + headers = { + **self._client.headers, + **DEFAULT_FILE_OPTIONS, + **file_options, + } + filename = path.rsplit("/", maxsplit=1)[-1] + + if ( + isinstance(file, BufferedReader) + or isinstance(file, bytes) + or isinstance(file, FileIO) + ): + # bytes or byte-stream-like object received + _file = {"file": (filename, file, headers.pop("content-type"))} + else: + # str or pathlib.path received + _file = { + "file": ( + filename, + open(file, "rb"), + headers.pop("content-type"), + ) + } + response = await self._request( + "PUT", final_url, files=_file, headers=headers, data=_data + ) + data: UploadData = response.json() + + return UploadResponse(path=path, Key=data.get("Key")) + + async def create_signed_url( + self, path: str, expires_in: int, options: URLOptions = {} + ) -> SignedUrlResponse: + """ + Parameters + ---------- + path + file path to be downloaded, including the current file name. + expires_in + number of seconds until the signed URL expires. + options + options to be passed for downloading or transforming the file. + """ + json = {"expiresIn": str(expires_in)} + download_query = "" + if options.get("download"): + json.update({"download": options["download"]}) + + download_query = ( + "&download=" + if options.get("download") is True + else f"&download={options.get('download')}" + ) + if options.get("transform"): + json.update({"transform": options["transform"]}) + + path = self._get_final_path(path) + response = await self._request( + "POST", + f"/object/sign/{path}", + json=json, + ) + data = response.json() + + # Prepare URL + url = urllib.parse.urlparse(data["signedURL"]) + url = urllib.parse.quote(url.path) + f"?{url.query}" + + signedURL = ( + f"{self._client.base_url}{cast(str, url).lstrip('/')}{download_query}" + ) + data: SignedUrlResponse = {"signedURL": signedURL, "signedUrl": signedURL} + return data + + async def create_signed_urls( + self, paths: list[str], expires_in: int, options: CreateSignedURLsOptions = {} + ) -> list[CreateSignedUrlResponse]: + """ + Parameters + ---------- + path + file path to be downloaded, including the current file name. + expires_in + number of seconds until the signed URL expires. + options + options to be passed for downloading the file. + """ + json = {"paths": paths, "expiresIn": str(expires_in)} + download_query = "" + if options.get("download"): + json.update({"download": options.get("download")}) + + download_query = ( + "&download=" + if options.get("download") is True + else f"&download={options.get('download')}" + ) + + response = await self._request( + "POST", + f"/object/sign/{self.id}", + json=json, + ) + data = response.json() + signed_urls = [] + for item in data: + + # Prepare URL + url = urllib.parse.urlparse(item["signedURL"]) + url = urllib.parse.quote(url.path) + f"?{url.query}" + + signedURL = ( + f"{self._client.base_url}{cast(str, url).lstrip('/')}{download_query}" + ) + signed_item: CreateSignedUrlResponse = { + "error": item["error"], + "path": item["path"], + "signedURL": signedURL, + "signedUrl": signedURL, + } + signed_urls.append(signed_item) + return signed_urls + + async def get_public_url(self, path: str, options: URLOptions = {}) -> str: + """ + Parameters + ---------- + path + file path, including the path and file name. For example `folder/image.png`. + """ + _query_string = [] + download_query = "" + if options.get("download"): + download_query = ( + "&download=" + if options.get("download") is True + else f"&download={options.get('download')}" + ) + + if download_query: + _query_string.append(download_query) + + render_path = "render/image" if options.get("transform") else "object" + transformation_query = ( + urllib.parse.urlencode(options.get("transform")) + if options.get("transform") + else None + ) + + if transformation_query: + _query_string.append(transformation_query) + + query_string = "&".join(_query_string) + query_string = f"?{query_string}" + _path = self._get_final_path(path) + return f"{self._client.base_url}{render_path}/public/{_path}{query_string}" + + async def move(self, from_path: str, to_path: str) -> dict[str, str]: + """ + Moves an existing file, optionally renaming it at the same time. + + Parameters + ---------- + from_path + The original file path, including the current file name. For example `folder/image.png`. + to_path + The new file path, including the new file name. For example `folder/image-copy.png`. + """ + res = await self._request( + "POST", + "/object/move", + json={ + "bucketId": self.id, + "sourceKey": from_path, + "destinationKey": to_path, + }, + ) + return res.json() + + async def copy(self, from_path: str, to_path: str) -> dict[str, str]: + """ + Copies an existing file to a new path in the same bucket. + + Parameters + ---------- + from_path + The original file path, including the current file name. For example `folder/image.png`. + to_path + The new file path, including the new file name. For example `folder/image-copy.png`. + """ + res = await self._request( + "POST", + "/object/copy", + json={ + "bucketId": self.id, + "sourceKey": from_path, + "destinationKey": to_path, + }, + ) + return res.json() + + async def remove(self, paths: list) -> list[dict[str, Any]]: + """ + Deletes files within the same bucket + + Parameters + ---------- + paths + An array or list of files to be deletes, including the path and file name. For example [`folder/image.png`]. + """ + response = await self._request( + "DELETE", + f"/object/{self.id}", + json={"prefixes": paths}, + ) + return response.json() + + async def info( + self, + path: str, + ) -> list[dict[str, str]]: + """ + Lists info for a particular file. + + Parameters + ---------- + path + The path to the file. + """ + response = await self._request( + "GET", + f"/object/info/{self.id}/{path}", + ) + return response.json() + + async def exists( + self, + path: str, + ) -> bool: + """ + Returns True if the file exists, False otherwise. + + Parameters + ---------- + path + The path to the file. + """ + try: + response = await self._request( + "HEAD", + f"/object/{self.id}/{path}", + ) + return response.status_code == 200 + except json.JSONDecodeError: + return False + + async def list( + self, + path: Optional[str] = None, + options: Optional[ListBucketFilesOptions] = None, + ) -> list[dict[str, str]]: + """ + Lists all the files within a bucket. + + Parameters + ---------- + path + The folder path. + options + Search options, including `limit`, `offset`, `sortBy` and `search`. + """ + extra_options = options or {} + extra_headers = {"Content-Type": "application/json"} + body = { + **DEFAULT_SEARCH_OPTIONS, + **extra_options, + "prefix": path or "", + } + response = await self._request( + "POST", + f"/object/list/{self.id}", + json=body, + headers=extra_headers, + ) + return response.json() + + async def download(self, path: str, options: DownloadOptions = {}) -> bytes: + """ + Downloads a file. + + Parameters + ---------- + path + The file path to be downloaded, including the path and file name. For example `folder/image.png`. + """ + render_path = ( + "render/image/authenticated" if options.get("transform") else "object" + ) + transformation_query = urllib.parse.urlencode(options.get("transform") or {}) + query_string = f"?{transformation_query}" if transformation_query else "" + + _path = self._get_final_path(path) + response = await self._request( + "GET", + f"{render_path}/{_path}{query_string}", + ) + return response.content + + async def _upload_or_update( + self, + method: Literal["POST", "PUT"], + path: str, + file: Union[BufferedReader, bytes, FileIO, str, Path], + file_options: Optional[FileOptions] = None, + ) -> UploadResponse: + """ + Uploads a file to an existing bucket. + + Parameters + ---------- + path + The relative file path including the bucket ID. Should be of the format `bucket/folder/subfolder/filename.png`. + The bucket must already exist before attempting to upload. + file + The File object to be stored in the bucket. or a async generator of chunks + file_options + HTTP headers. + """ + if file_options is None: + file_options = {} + cache_control = file_options.pop("cache-control", None) + _data = {} + + upsert = file_options.pop("upsert", None) + if upsert: + file_options.update({"x-upsert": upsert}) + + metadata = file_options.pop("metadata", None) + file_opts_headers = file_options.pop("headers", None) + + headers = { + **self._client.headers, + **DEFAULT_FILE_OPTIONS, + **file_options, + } + + if metadata: + metadata_str = json.dumps(metadata) + headers["x-metadata"] = base64.b64encode(metadata_str.encode()) + _data.update({"metadata": metadata_str}) + + if file_opts_headers: + headers.update({**file_opts_headers}) + + # Only include x-upsert on a POST method + if method != "POST": + del headers["x-upsert"] + + filename = path.rsplit("/", maxsplit=1)[-1] + + if cache_control: + headers["cache-control"] = f"max-age={cache_control}" + _data.update({"cacheControl": cache_control}) + + if ( + isinstance(file, BufferedReader) + or isinstance(file, bytes) + or isinstance(file, FileIO) + ): + # bytes or byte-stream-like object received + files = {"file": (filename, file, headers.pop("content-type"))} + else: + # str or pathlib.path received + files = { + "file": ( + filename, + open(file, "rb"), + headers.pop("content-type"), + ) + } + + _path = self._get_final_path(path) + + response = await self._request( + method, f"/object/{_path}", files=files, headers=headers, data=_data + ) + + data: UploadData = response.json() + + return UploadResponse(path=path, Key=data.get("Key")) + + async def upload( + self, + path: str, + file: Union[BufferedReader, bytes, FileIO, str, Path], + file_options: Optional[FileOptions] = None, + ) -> UploadResponse: + """ + Uploads a file to an existing bucket. + + Parameters + ---------- + path + The relative file path including the bucket ID. Should be of the format `bucket/folder/subfolder/filename.png`. + The bucket must already exist before attempting to upload. + file + The File object to be stored in the bucket. or a async generator of chunks + file_options + HTTP headers. + """ + return await self._upload_or_update("POST", path, file, file_options) + + async def update( + self, + path: str, + file: Union[BufferedReader, bytes, FileIO, str, Path], + file_options: Optional[FileOptions] = None, + ) -> UploadResponse: + return await self._upload_or_update("PUT", path, file, file_options) + + def _get_final_path(self, path: str) -> str: + return f"{self.id}/{path}" + + +@dataclass(repr=False) +class AsyncBucket(BaseBucket): + """Represents a storage bucket.""" + + +@dataclass +class AsyncBucketProxy(AsyncBucketActionsMixin): + """A bucket proxy, this contains the minimum required fields to query the File API.""" + + id: str + _client: AsyncClient = field(repr=False) |