aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/storage3/_async/client.py
blob: 95852a02be3d5b041894327fb712e83612f070b8 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from __future__ import annotations

from typing import Optional

from storage3.constants import DEFAULT_TIMEOUT

from ..utils import AsyncClient
from ..version import __version__
from .bucket import AsyncStorageBucketAPI
from .file_api import AsyncBucketProxy

__all__ = [
    "AsyncStorageClient",
]


class AsyncStorageClient(AsyncStorageBucketAPI):
    """Manage storage buckets and files."""

    def __init__(
        self,
        url: str,
        headers: dict[str, str],
        timeout: int = DEFAULT_TIMEOUT,
        verify: bool = True,
        proxy: Optional[str] = None,
    ) -> None:
        headers = {
            "User-Agent": f"supabase-py/storage3 v{__version__}",
            **headers,
        }
        self.session = self._create_session(url, headers, timeout, verify, proxy)
        super().__init__(self.session)

    def _create_session(
        self,
        base_url: str,
        headers: dict[str, str],
        timeout: int,
        verify: bool = True,
        proxy: Optional[str] = None,
    ) -> AsyncClient:
        return AsyncClient(
            base_url=base_url,
            headers=headers,
            timeout=timeout,
            proxy=proxy,
            verify=bool(verify),
            follow_redirects=True,
            http2=True,
        )

    async def __aenter__(self) -> AsyncStorageClient:
        return self

    async def __aexit__(self, exc_type, exc, tb) -> None:
        await self.aclose()

    async def aclose(self) -> None:
        await self.session.aclose()

    def from_(self, id: str) -> AsyncBucketProxy:
        """Run a storage file operation.

        Parameters
        ----------
        id
            The unique identifier of the bucket
        """
        return AsyncBucketProxy(id, self._client)