aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/supafunc/_sync/functions_client.py
blob: 99780eb73928bac25513b90cb8abacb93b96215e (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from typing import Any, Dict, Literal, Optional, Union
from warnings import warn

from httpx import HTTPError, Response

from ..errors import FunctionsHttpError, FunctionsRelayError
from ..utils import (
    FunctionRegion,
    SyncClient,
    is_http_url,
    is_valid_jwt,
    is_valid_str_arg,
)
from ..version import __version__


class SyncFunctionsClient:
    def __init__(
        self,
        url: str,
        headers: Dict,
        timeout: int,
        verify: bool = True,
        proxy: Optional[str] = None,
    ):
        if not is_http_url(url):
            raise ValueError("url must be a valid HTTP URL string")
        self.url = url
        self.headers = {
            "User-Agent": f"supabase-py/functions-py v{__version__}",
            **headers,
        }
        self._client = SyncClient(
            base_url=self.url,
            headers=self.headers,
            verify=bool(verify),
            timeout=int(abs(timeout)),
            proxy=proxy,
            follow_redirects=True,
            http2=True,
        )

    def _request(
        self,
        method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
        url: str,
        headers: Optional[Dict[str, str]] = None,
        json: Optional[Dict[Any, Any]] = None,
    ) -> Response:

        user_data = {"data": json} if isinstance(json, str) else {"json": json}
        response = self._client.request(method, url, **user_data, headers=headers)

        try:
            response.raise_for_status()
        except HTTPError as exc:
            raise FunctionsHttpError(
                response.json().get("error")
                or f"An error occurred while requesting your edge function at {exc.request.url!r}."
            ) from exc

        return response

    def set_auth(self, token: str) -> None:
        """Updates the authorization header

        Parameters
        ----------
        token : str
            the new jwt token sent in the authorization header
        """

        if not is_valid_jwt(token):
            raise ValueError("token must be a valid JWT authorization token string.")

        self.headers["Authorization"] = f"Bearer {token}"

    def invoke(
        self, function_name: str, invoke_options: Optional[Dict] = None
    ) -> Union[Dict, bytes]:
        """Invokes a function

        Parameters
        ----------
        function_name : the name of the function to invoke
        invoke_options : object with the following properties
            `headers`: object representing the headers to send with the request
            `body`: the body of the request
            `responseType`: how the response should be parsed. The default is `json`
        """
        if not is_valid_str_arg(function_name):
            raise ValueError("function_name must a valid string value.")
        headers = self.headers
        body = None
        response_type = "text/plain"
        if invoke_options is not None:
            headers.update(invoke_options.get("headers", {}))
            response_type = invoke_options.get("responseType", "text/plain")

            region = invoke_options.get("region")
            if region:
                if not isinstance(region, FunctionRegion):
                    warn(f"Use FunctionRegion({region})")
                    region = FunctionRegion(region)

                if region.value != "any":
                    headers["x-region"] = region.value

            body = invoke_options.get("body")
            if isinstance(body, str):
                headers["Content-Type"] = "text/plain"
            elif isinstance(body, dict):
                headers["Content-Type"] = "application/json"

        response = self._request(
            "POST", f"{self.url}/{function_name}", headers=headers, json=body
        )
        is_relay_error = response.headers.get("x-relay-header")

        if is_relay_error and is_relay_error == "true":
            raise FunctionsRelayError(response.json().get("error"))

        if response_type == "json":
            data = response.json()
        else:
            data = response.content
        return data