1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
from typing import Any, Dict, Literal, Optional, Union
from warnings import warn
from httpx import HTTPError, Response
from ..errors import FunctionsHttpError, FunctionsRelayError
from ..utils import (
AsyncClient,
FunctionRegion,
is_http_url,
is_valid_jwt,
is_valid_str_arg,
)
from ..version import __version__
class AsyncFunctionsClient:
def __init__(
self,
url: str,
headers: Dict,
timeout: int,
verify: bool = True,
proxy: Optional[str] = None,
):
if not is_http_url(url):
raise ValueError("url must be a valid HTTP URL string")
self.url = url
self.headers = {
"User-Agent": f"supabase-py/functions-py v{__version__}",
**headers,
}
self._client = AsyncClient(
base_url=self.url,
headers=self.headers,
verify=bool(verify),
timeout=int(abs(timeout)),
proxy=proxy,
follow_redirects=True,
http2=True,
)
async def _request(
self,
method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
url: str,
headers: Optional[Dict[str, str]] = None,
json: Optional[Dict[Any, Any]] = None,
) -> Response:
user_data = {"data": json} if isinstance(json, str) else {"json": json}
response = await self._client.request(method, url, **user_data, headers=headers)
try:
response.raise_for_status()
except HTTPError as exc:
raise FunctionsHttpError(
response.json().get("error")
or f"An error occurred while requesting your edge function at {exc.request.url!r}."
) from exc
return response
def set_auth(self, token: str) -> None:
"""Updates the authorization header
Parameters
----------
token : str
the new jwt token sent in the authorization header
"""
if not is_valid_jwt(token):
raise ValueError("token must be a valid JWT authorization token string.")
self.headers["Authorization"] = f"Bearer {token}"
async def invoke(
self, function_name: str, invoke_options: Optional[Dict] = None
) -> Union[Dict, bytes]:
"""Invokes a function
Parameters
----------
function_name : the name of the function to invoke
invoke_options : object with the following properties
`headers`: object representing the headers to send with the request
`body`: the body of the request
`responseType`: how the response should be parsed. The default is `json`
"""
if not is_valid_str_arg(function_name):
raise ValueError("function_name must a valid string value.")
headers = self.headers
body = None
response_type = "text/plain"
if invoke_options is not None:
headers.update(invoke_options.get("headers", {}))
response_type = invoke_options.get("responseType", "text/plain")
region = invoke_options.get("region")
if region:
if not isinstance(region, FunctionRegion):
warn(f"Use FunctionRegion({region})")
region = FunctionRegion(region)
if region.value != "any":
headers["x-region"] = region.value
body = invoke_options.get("body")
if isinstance(body, str):
headers["Content-Type"] = "text/plain"
elif isinstance(body, dict):
headers["Content-Type"] = "application/json"
response = await self._request(
"POST", f"{self.url}/{function_name}", headers=headers, json=body
)
is_relay_error = response.headers.get("x-relay-header")
if is_relay_error and is_relay_error == "true":
raise FunctionsRelayError(response.json().get("error"))
if response_type == "json":
data = response.json()
else:
data = response.content
return data
|