aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/supafunc/_sync
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/supafunc/_sync')
-rw-r--r--.venv/lib/python3.12/site-packages/supafunc/_sync/__init__.py1
-rw-r--r--.venv/lib/python3.12/site-packages/supafunc/_sync/functions_client.py127
2 files changed, 128 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/supafunc/_sync/__init__.py b/.venv/lib/python3.12/site-packages/supafunc/_sync/__init__.py
new file mode 100644
index 00000000..9d48db4f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/supafunc/_sync/__init__.py
@@ -0,0 +1 @@
+from __future__ import annotations
diff --git a/.venv/lib/python3.12/site-packages/supafunc/_sync/functions_client.py b/.venv/lib/python3.12/site-packages/supafunc/_sync/functions_client.py
new file mode 100644
index 00000000..99780eb7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/supafunc/_sync/functions_client.py
@@ -0,0 +1,127 @@
+from typing import Any, Dict, Literal, Optional, Union
+from warnings import warn
+
+from httpx import HTTPError, Response
+
+from ..errors import FunctionsHttpError, FunctionsRelayError
+from ..utils import (
+ FunctionRegion,
+ SyncClient,
+ is_http_url,
+ is_valid_jwt,
+ is_valid_str_arg,
+)
+from ..version import __version__
+
+
+class SyncFunctionsClient:
+ def __init__(
+ self,
+ url: str,
+ headers: Dict,
+ timeout: int,
+ verify: bool = True,
+ proxy: Optional[str] = None,
+ ):
+ if not is_http_url(url):
+ raise ValueError("url must be a valid HTTP URL string")
+ self.url = url
+ self.headers = {
+ "User-Agent": f"supabase-py/functions-py v{__version__}",
+ **headers,
+ }
+ self._client = SyncClient(
+ base_url=self.url,
+ headers=self.headers,
+ verify=bool(verify),
+ timeout=int(abs(timeout)),
+ proxy=proxy,
+ follow_redirects=True,
+ http2=True,
+ )
+
+ def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ url: str,
+ headers: Optional[Dict[str, str]] = None,
+ json: Optional[Dict[Any, Any]] = None,
+ ) -> Response:
+
+ user_data = {"data": json} if isinstance(json, str) else {"json": json}
+ response = self._client.request(method, url, **user_data, headers=headers)
+
+ try:
+ response.raise_for_status()
+ except HTTPError as exc:
+ raise FunctionsHttpError(
+ response.json().get("error")
+ or f"An error occurred while requesting your edge function at {exc.request.url!r}."
+ ) from exc
+
+ return response
+
+ def set_auth(self, token: str) -> None:
+ """Updates the authorization header
+
+ Parameters
+ ----------
+ token : str
+ the new jwt token sent in the authorization header
+ """
+
+ if not is_valid_jwt(token):
+ raise ValueError("token must be a valid JWT authorization token string.")
+
+ self.headers["Authorization"] = f"Bearer {token}"
+
+ def invoke(
+ self, function_name: str, invoke_options: Optional[Dict] = None
+ ) -> Union[Dict, bytes]:
+ """Invokes a function
+
+ Parameters
+ ----------
+ function_name : the name of the function to invoke
+ invoke_options : object with the following properties
+ `headers`: object representing the headers to send with the request
+ `body`: the body of the request
+ `responseType`: how the response should be parsed. The default is `json`
+ """
+ if not is_valid_str_arg(function_name):
+ raise ValueError("function_name must a valid string value.")
+ headers = self.headers
+ body = None
+ response_type = "text/plain"
+ if invoke_options is not None:
+ headers.update(invoke_options.get("headers", {}))
+ response_type = invoke_options.get("responseType", "text/plain")
+
+ region = invoke_options.get("region")
+ if region:
+ if not isinstance(region, FunctionRegion):
+ warn(f"Use FunctionRegion({region})")
+ region = FunctionRegion(region)
+
+ if region.value != "any":
+ headers["x-region"] = region.value
+
+ body = invoke_options.get("body")
+ if isinstance(body, str):
+ headers["Content-Type"] = "text/plain"
+ elif isinstance(body, dict):
+ headers["Content-Type"] = "application/json"
+
+ response = self._request(
+ "POST", f"{self.url}/{function_name}", headers=headers, json=body
+ )
+ is_relay_error = response.headers.get("x-relay-header")
+
+ if is_relay_error and is_relay_error == "true":
+ raise FunctionsRelayError(response.json().get("error"))
+
+ if response_type == "json":
+ data = response.json()
+ else:
+ data = response.content
+ return data