about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/storage3/_sync/file_api.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/storage3/_sync/file_api.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/storage3/_sync/file_api.py')
-rw-r--r--.venv/lib/python3.12/site-packages/storage3/_sync/file_api.py559
1 files changed, 559 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/storage3/_sync/file_api.py b/.venv/lib/python3.12/site-packages/storage3/_sync/file_api.py
new file mode 100644
index 00000000..5ee5e9f6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/storage3/_sync/file_api.py
@@ -0,0 +1,559 @@
+from __future__ import annotations
+
+import base64
+import json
+import urllib.parse
+from dataclasses import dataclass, field
+from io import BufferedReader, FileIO
+from pathlib import Path
+from typing import Any, Literal, Optional, Union, cast
+
+from httpx import HTTPStatusError, Response
+
+from ..constants import DEFAULT_FILE_OPTIONS, DEFAULT_SEARCH_OPTIONS
+from ..exceptions import StorageApiError
+from ..types import (
+    BaseBucket,
+    CreateSignedUrlResponse,
+    CreateSignedURLsOptions,
+    DownloadOptions,
+    FileOptions,
+    ListBucketFilesOptions,
+    RequestMethod,
+    SignedUploadURL,
+    SignedUrlResponse,
+    UploadData,
+    UploadResponse,
+    URLOptions,
+)
+from ..utils import StorageException, SyncClient
+
+__all__ = ["SyncBucket"]
+
+
+class SyncBucketActionsMixin:
+    """Functions needed to access the file API."""
+
+    id: str
+    _client: SyncClient
+
+    def _request(
+        self,
+        method: RequestMethod,
+        url: str,
+        headers: Optional[dict[str, Any]] = None,
+        json: Optional[dict[Any, Any]] = None,
+        files: Optional[Any] = None,
+        **kwargs: Any,
+    ) -> Response:
+        try:
+            response = self._client.request(
+                method, url, headers=headers or {}, json=json, files=files, **kwargs
+            )
+            response.raise_for_status()
+        except HTTPStatusError as exc:
+            resp = exc.response.json()
+            raise StorageApiError(resp["message"], resp["error"], resp["statusCode"])
+
+        # close the resource before returning the response
+        if files and "file" in files and isinstance(files["file"][1], BufferedReader):
+            files["file"][1].close()
+
+        return response
+
+    def create_signed_upload_url(self, path: str) -> SignedUploadURL:
+        """
+        Creates a signed upload URL.
+
+        Parameters
+        ----------
+        path
+            The file path, including the file name. For example `folder/image.png`.
+        """
+        _path = self._get_final_path(path)
+        response = self._request("POST", f"/object/upload/sign/{_path}")
+        data = response.json()
+        full_url: urllib.parse.ParseResult = urllib.parse.urlparse(
+            str(self._client.base_url) + cast(str, data["url"]).lstrip("/")
+        )
+        query_params = urllib.parse.parse_qs(full_url.query)
+        if not query_params.get("token"):
+            raise StorageException("No token sent by the API")
+        return {
+            "signed_url": full_url.geturl(),
+            "signedUrl": full_url.geturl(),
+            "token": query_params["token"][0],
+            "path": path,
+        }
+
+    def upload_to_signed_url(
+        self,
+        path: str,
+        token: str,
+        file: Union[BufferedReader, bytes, FileIO, str, Path],
+        file_options: Optional[FileOptions] = None,
+    ) -> UploadResponse:
+        """
+        Upload a file with a token generated from :meth:`.create_signed_url`
+
+        Parameters
+        ----------
+        path
+            The file path, including the file name
+        token
+            The token generated from :meth:`.create_signed_url`
+        file
+            The file contents or a file-like object to upload
+        file_options
+            Additional options for the uploaded file
+        """
+        _path = self._get_final_path(path)
+        _url = urllib.parse.urlparse(f"/object/upload/sign/{_path}")
+        query_params = urllib.parse.urlencode({"token": token})
+        final_url = f"{_url.geturl()}?{query_params}"
+
+        if file_options is None:
+            file_options = {}
+
+        cache_control = file_options.get("cache-control")
+        # cacheControl is also passed as form data
+        # https://github.com/supabase/storage-js/blob/fa44be8156295ba6320ffeff96bdf91016536a46/src/packages/StorageFileApi.ts#L89
+        _data = {}
+        if cache_control:
+            file_options["cache-control"] = f"max-age={cache_control}"
+            _data = {"cacheControl": cache_control}
+        headers = {
+            **self._client.headers,
+            **DEFAULT_FILE_OPTIONS,
+            **file_options,
+        }
+        filename = path.rsplit("/", maxsplit=1)[-1]
+
+        if (
+            isinstance(file, BufferedReader)
+            or isinstance(file, bytes)
+            or isinstance(file, FileIO)
+        ):
+            # bytes or byte-stream-like object received
+            _file = {"file": (filename, file, headers.pop("content-type"))}
+        else:
+            # str or pathlib.path received
+            _file = {
+                "file": (
+                    filename,
+                    open(file, "rb"),
+                    headers.pop("content-type"),
+                )
+            }
+        response = self._request(
+            "PUT", final_url, files=_file, headers=headers, data=_data
+        )
+        data: UploadData = response.json()
+
+        return UploadResponse(path=path, Key=data.get("Key"))
+
+    def create_signed_url(
+        self, path: str, expires_in: int, options: URLOptions = {}
+    ) -> SignedUrlResponse:
+        """
+        Parameters
+        ----------
+        path
+            file path to be downloaded, including the current file name.
+        expires_in
+            number of seconds until the signed URL expires.
+        options
+            options to be passed for downloading or transforming the file.
+        """
+        json = {"expiresIn": str(expires_in)}
+        download_query = ""
+        if options.get("download"):
+            json.update({"download": options["download"]})
+
+            download_query = (
+                "&download="
+                if options.get("download") is True
+                else f"&download={options.get('download')}"
+            )
+        if options.get("transform"):
+            json.update({"transform": options["transform"]})
+
+        path = self._get_final_path(path)
+        response = self._request(
+            "POST",
+            f"/object/sign/{path}",
+            json=json,
+        )
+        data = response.json()
+
+        # Prepare URL
+        url = urllib.parse.urlparse(data["signedURL"])
+        url = urllib.parse.quote(url.path) + f"?{url.query}"
+
+        signedURL = (
+            f"{self._client.base_url}{cast(str, url).lstrip('/')}{download_query}"
+        )
+        data: SignedUrlResponse = {"signedURL": signedURL, "signedUrl": signedURL}
+        return data
+
+    def create_signed_urls(
+        self, paths: list[str], expires_in: int, options: CreateSignedURLsOptions = {}
+    ) -> list[CreateSignedUrlResponse]:
+        """
+        Parameters
+        ----------
+        path
+            file path to be downloaded, including the current file name.
+        expires_in
+            number of seconds until the signed URL expires.
+        options
+            options to be passed for downloading the file.
+        """
+        json = {"paths": paths, "expiresIn": str(expires_in)}
+        download_query = ""
+        if options.get("download"):
+            json.update({"download": options.get("download")})
+
+            download_query = (
+                "&download="
+                if options.get("download") is True
+                else f"&download={options.get('download')}"
+            )
+
+        response = self._request(
+            "POST",
+            f"/object/sign/{self.id}",
+            json=json,
+        )
+        data = response.json()
+        signed_urls = []
+        for item in data:
+
+            # Prepare URL
+            url = urllib.parse.urlparse(item["signedURL"])
+            url = urllib.parse.quote(url.path) + f"?{url.query}"
+
+            signedURL = (
+                f"{self._client.base_url}{cast(str, url).lstrip('/')}{download_query}"
+            )
+            signed_item: CreateSignedUrlResponse = {
+                "error": item["error"],
+                "path": item["path"],
+                "signedURL": signedURL,
+                "signedUrl": signedURL,
+            }
+            signed_urls.append(signed_item)
+        return signed_urls
+
+    def get_public_url(self, path: str, options: URLOptions = {}) -> str:
+        """
+        Parameters
+        ----------
+        path
+            file path, including the path and file name. For example `folder/image.png`.
+        """
+        _query_string = []
+        download_query = ""
+        if options.get("download"):
+            download_query = (
+                "&download="
+                if options.get("download") is True
+                else f"&download={options.get('download')}"
+            )
+
+        if download_query:
+            _query_string.append(download_query)
+
+        render_path = "render/image" if options.get("transform") else "object"
+        transformation_query = (
+            urllib.parse.urlencode(options.get("transform"))
+            if options.get("transform")
+            else None
+        )
+
+        if transformation_query:
+            _query_string.append(transformation_query)
+
+        query_string = "&".join(_query_string)
+        query_string = f"?{query_string}"
+        _path = self._get_final_path(path)
+        return f"{self._client.base_url}{render_path}/public/{_path}{query_string}"
+
+    def move(self, from_path: str, to_path: str) -> dict[str, str]:
+        """
+        Moves an existing file, optionally renaming it at the same time.
+
+        Parameters
+        ----------
+        from_path
+            The original file path, including the current file name. For example `folder/image.png`.
+        to_path
+            The new file path, including the new file name. For example `folder/image-copy.png`.
+        """
+        res = self._request(
+            "POST",
+            "/object/move",
+            json={
+                "bucketId": self.id,
+                "sourceKey": from_path,
+                "destinationKey": to_path,
+            },
+        )
+        return res.json()
+
+    def copy(self, from_path: str, to_path: str) -> dict[str, str]:
+        """
+        Copies an existing file to a new path in the same bucket.
+
+        Parameters
+        ----------
+        from_path
+            The original file path, including the current file name. For example `folder/image.png`.
+        to_path
+            The new file path, including the new file name. For example `folder/image-copy.png`.
+        """
+        res = self._request(
+            "POST",
+            "/object/copy",
+            json={
+                "bucketId": self.id,
+                "sourceKey": from_path,
+                "destinationKey": to_path,
+            },
+        )
+        return res.json()
+
+    def remove(self, paths: list) -> list[dict[str, Any]]:
+        """
+        Deletes files within the same bucket
+
+        Parameters
+        ----------
+        paths
+            An array or list of files to be deletes, including the path and file name. For example [`folder/image.png`].
+        """
+        response = self._request(
+            "DELETE",
+            f"/object/{self.id}",
+            json={"prefixes": paths},
+        )
+        return response.json()
+
+    def info(
+        self,
+        path: str,
+    ) -> list[dict[str, str]]:
+        """
+        Lists info for a particular file.
+
+        Parameters
+        ----------
+        path
+            The path to the file.
+        """
+        response = self._request(
+            "GET",
+            f"/object/info/{self.id}/{path}",
+        )
+        return response.json()
+
+    def exists(
+        self,
+        path: str,
+    ) -> bool:
+        """
+        Returns True if the file exists, False otherwise.
+
+        Parameters
+        ----------
+        path
+            The path to the file.
+        """
+        try:
+            response = self._request(
+                "HEAD",
+                f"/object/{self.id}/{path}",
+            )
+            return response.status_code == 200
+        except json.JSONDecodeError:
+            return False
+
+    def list(
+        self,
+        path: Optional[str] = None,
+        options: Optional[ListBucketFilesOptions] = None,
+    ) -> list[dict[str, str]]:
+        """
+        Lists all the files within a bucket.
+
+        Parameters
+        ----------
+        path
+            The folder path.
+        options
+            Search options, including `limit`, `offset`, `sortBy` and `search`.
+        """
+        extra_options = options or {}
+        extra_headers = {"Content-Type": "application/json"}
+        body = {
+            **DEFAULT_SEARCH_OPTIONS,
+            **extra_options,
+            "prefix": path or "",
+        }
+        response = self._request(
+            "POST",
+            f"/object/list/{self.id}",
+            json=body,
+            headers=extra_headers,
+        )
+        return response.json()
+
+    def download(self, path: str, options: DownloadOptions = {}) -> bytes:
+        """
+        Downloads a file.
+
+        Parameters
+        ----------
+        path
+            The file path to be downloaded, including the path and file name. For example `folder/image.png`.
+        """
+        render_path = (
+            "render/image/authenticated" if options.get("transform") else "object"
+        )
+        transformation_query = urllib.parse.urlencode(options.get("transform") or {})
+        query_string = f"?{transformation_query}" if transformation_query else ""
+
+        _path = self._get_final_path(path)
+        response = self._request(
+            "GET",
+            f"{render_path}/{_path}{query_string}",
+        )
+        return response.content
+
+    def _upload_or_update(
+        self,
+        method: Literal["POST", "PUT"],
+        path: str,
+        file: Union[BufferedReader, bytes, FileIO, str, Path],
+        file_options: Optional[FileOptions] = None,
+    ) -> UploadResponse:
+        """
+        Uploads a file to an existing bucket.
+
+        Parameters
+        ----------
+        path
+            The relative file path including the bucket ID. Should be of the format `bucket/folder/subfolder/filename.png`.
+            The bucket must already exist before attempting to upload.
+        file
+            The File object to be stored in the bucket. or a async generator of chunks
+        file_options
+            HTTP headers.
+        """
+        if file_options is None:
+            file_options = {}
+        cache_control = file_options.pop("cache-control", None)
+        _data = {}
+
+        upsert = file_options.pop("upsert", None)
+        if upsert:
+            file_options.update({"x-upsert": upsert})
+
+        metadata = file_options.pop("metadata", None)
+        file_opts_headers = file_options.pop("headers", None)
+
+        headers = {
+            **self._client.headers,
+            **DEFAULT_FILE_OPTIONS,
+            **file_options,
+        }
+
+        if metadata:
+            metadata_str = json.dumps(metadata)
+            headers["x-metadata"] = base64.b64encode(metadata_str.encode())
+            _data.update({"metadata": metadata_str})
+
+        if file_opts_headers:
+            headers.update({**file_opts_headers})
+
+        # Only include x-upsert on a POST method
+        if method != "POST":
+            del headers["x-upsert"]
+
+        filename = path.rsplit("/", maxsplit=1)[-1]
+
+        if cache_control:
+            headers["cache-control"] = f"max-age={cache_control}"
+            _data.update({"cacheControl": cache_control})
+
+        if (
+            isinstance(file, BufferedReader)
+            or isinstance(file, bytes)
+            or isinstance(file, FileIO)
+        ):
+            # bytes or byte-stream-like object received
+            files = {"file": (filename, file, headers.pop("content-type"))}
+        else:
+            # str or pathlib.path received
+            files = {
+                "file": (
+                    filename,
+                    open(file, "rb"),
+                    headers.pop("content-type"),
+                )
+            }
+
+        _path = self._get_final_path(path)
+
+        response = self._request(
+            method, f"/object/{_path}", files=files, headers=headers, data=_data
+        )
+
+        data: UploadData = response.json()
+
+        return UploadResponse(path=path, Key=data.get("Key"))
+
+    def upload(
+        self,
+        path: str,
+        file: Union[BufferedReader, bytes, FileIO, str, Path],
+        file_options: Optional[FileOptions] = None,
+    ) -> UploadResponse:
+        """
+        Uploads a file to an existing bucket.
+
+        Parameters
+        ----------
+        path
+            The relative file path including the bucket ID. Should be of the format `bucket/folder/subfolder/filename.png`.
+            The bucket must already exist before attempting to upload.
+        file
+            The File object to be stored in the bucket. or a async generator of chunks
+        file_options
+            HTTP headers.
+        """
+        return self._upload_or_update("POST", path, file, file_options)
+
+    def update(
+        self,
+        path: str,
+        file: Union[BufferedReader, bytes, FileIO, str, Path],
+        file_options: Optional[FileOptions] = None,
+    ) -> UploadResponse:
+        return self._upload_or_update("PUT", path, file, file_options)
+
+    def _get_final_path(self, path: str) -> str:
+        return f"{self.id}/{path}"
+
+
+@dataclass(repr=False)
+class SyncBucket(BaseBucket):
+    """Represents a storage bucket."""
+
+
+@dataclass
+class SyncBucketProxy(SyncBucketActionsMixin):
+    """A bucket proxy, this contains the minimum required fields to query the File API."""
+
+    id: str
+    _client: SyncClient = field(repr=False)