import re from typing import Any, Dict, List, Optional, Union from gotrue import SyncMemoryStorage from gotrue.types import AuthChangeEvent, Session from httpx import Timeout from postgrest import ( SyncPostgrestClient, SyncRequestBuilder, SyncRPCFilterRequestBuilder, ) from postgrest.constants import DEFAULT_POSTGREST_CLIENT_TIMEOUT from realtime import RealtimeChannelOptions, SyncRealtimeChannel, SyncRealtimeClient from storage3 import SyncStorageClient from storage3.constants import DEFAULT_TIMEOUT as DEFAULT_STORAGE_CLIENT_TIMEOUT from supafunc import SyncFunctionsClient from ..lib.client_options import SyncClientOptions as ClientOptions from .auth_client import SyncSupabaseAuthClient # Create an exception class when user does not provide a valid url or key. class SupabaseException(Exception): def __init__(self, message: str): self.message = message super().__init__(self.message) class SyncClient: """Supabase client class.""" def __init__( self, supabase_url: str, supabase_key: str, options: Optional[ClientOptions] = None, ): """Instantiate the client. Parameters ---------- supabase_url: str The URL to the Supabase instance that should be connected to. supabase_key: str The API key to the Supabase instance that should be connected to. **options Any extra settings to be optionally specified - also see the `DEFAULT_OPTIONS` dict. """ if not supabase_url: raise SupabaseException("supabase_url is required") if not supabase_key: raise SupabaseException("supabase_key is required") # Check if the url and key are valid if not re.match(r"^(https?)://.+", supabase_url): raise SupabaseException("Invalid URL") # Check if the key is a valid JWT if not re.match( r"^[A-Za-z0-9-_=]+\.[A-Za-z0-9-_=]+\.?[A-Za-z0-9-_.+/=]*$", supabase_key ): raise SupabaseException("Invalid API key") if options is None: options = ClientOptions(storage=SyncMemoryStorage()) self.supabase_url = supabase_url self.supabase_key = supabase_key self.options = options options.headers.update(self._get_auth_headers()) self.rest_url = f"{supabase_url}/rest/v1" self.realtime_url = f"{supabase_url}/realtime/v1".replace("http", "ws") self.auth_url = f"{supabase_url}/auth/v1" self.storage_url = f"{supabase_url}/storage/v1" self.functions_url = f"{supabase_url}/functions/v1" # Instantiate clients. self.auth = self._init_supabase_auth_client( auth_url=self.auth_url, client_options=options, ) self.realtime = self._init_realtime_client( realtime_url=self.realtime_url, supabase_key=self.supabase_key, options=options.realtime if options else None, ) self._postgrest = None self._storage = None self._functions = None self.auth.on_auth_state_change(self._listen_to_auth_events) @classmethod def create( cls, supabase_url: str, supabase_key: str, options: Optional[ClientOptions] = None, ): auth_header = options.headers.get("Authorization") if options else None client = cls(supabase_url, supabase_key, options) if auth_header is None: try: session = client.auth.get_session() session_access_token = client._create_auth_header(session.access_token) except Exception as err: session_access_token = None client.options.headers.update( client._get_auth_headers(session_access_token) ) return client def table(self, table_name: str) -> SyncRequestBuilder: """Perform a table operation. Note that the supabase client uses the `from` method, but in Python, this is a reserved keyword, so we have elected to use the name `table`. Alternatively you can use the `.from_()` method. """ return self.from_(table_name) def schema(self, schema: str) -> SyncPostgrestClient: """Select a schema to query or perform an function (rpc) call. The schema needs to be on the list of exposed schemas inside Supabase. """ if self.options.schema != schema: self.options.schema = schema if self._postgrest: self._postgrest.schema(schema) return self.postgrest def from_(self, table_name: str) -> SyncRequestBuilder: """Perform a table operation. See the `table` method. """ return self.postgrest.from_(table_name) def rpc( self, fn: str, params: Optional[Dict[Any, Any]] = None ) -> SyncRPCFilterRequestBuilder: """Performs a stored procedure call. Parameters ---------- fn : callable The stored procedure call to be executed. params : dict of any Parameters passed into the stored procedure call. Returns ------- SyncFilterRequestBuilder Returns a filter builder. This lets you apply filters on the response of an RPC. """ if params is None: params = {} return self.postgrest.rpc(fn, params) @property def postgrest(self): if self._postgrest is None: self._postgrest = self._init_postgrest_client( rest_url=self.rest_url, headers=self.options.headers, schema=self.options.schema, timeout=self.options.postgrest_client_timeout, ) return self._postgrest @property def storage(self): if self._storage is None: self._storage = self._init_storage_client( storage_url=self.storage_url, headers=self.options.headers, storage_client_timeout=self.options.storage_client_timeout, ) return self._storage @property def functions(self): if self._functions is None: self._functions = SyncFunctionsClient( self.functions_url, self.options.headers, self.options.function_client_timeout, ) return self._functions def channel( self, topic: str, params: RealtimeChannelOptions = {} ) -> SyncRealtimeChannel: """Creates a Realtime channel with Broadcast, Presence, and Postgres Changes.""" return self.realtime.channel(topic, params) def get_channels(self) -> List[SyncRealtimeChannel]: """Returns all realtime channels.""" return self.realtime.get_channels() def remove_channel(self, channel: SyncRealtimeChannel) -> None: """Unsubscribes and removes Realtime channel from Realtime client.""" self.realtime.remove_channel(channel) def remove_all_channels(self) -> None: """Unsubscribes and removes all Realtime channels from Realtime client.""" self.realtime.remove_all_channels() @staticmethod def _init_realtime_client( realtime_url: str, supabase_key: str, options: Optional[Dict[str, Any]] = None ) -> SyncRealtimeClient: if options is None: options = {} """Private method for creating an instance of the realtime-py client.""" return SyncRealtimeClient(realtime_url, token=supabase_key, **options) @staticmethod def _init_storage_client( storage_url: str, headers: Dict[str, str], storage_client_timeout: int = DEFAULT_STORAGE_CLIENT_TIMEOUT, verify: bool = True, proxy: Optional[str] = None, ) -> SyncStorageClient: return SyncStorageClient( storage_url, headers, storage_client_timeout, verify, proxy ) @staticmethod def _init_supabase_auth_client( auth_url: str, client_options: ClientOptions, verify: bool = True, proxy: Optional[str] = None, ) -> SyncSupabaseAuthClient: """Creates a wrapped instance of the GoTrue Client.""" return SyncSupabaseAuthClient( url=auth_url, auto_refresh_token=client_options.auto_refresh_token, persist_session=client_options.persist_session, storage=client_options.storage, headers=client_options.headers, flow_type=client_options.flow_type, verify=verify, proxy=proxy, ) @staticmethod def _init_postgrest_client( rest_url: str, headers: Dict[str, str], schema: str, timeout: Union[int, float, Timeout] = DEFAULT_POSTGREST_CLIENT_TIMEOUT, verify: bool = True, proxy: Optional[str] = None, ) -> SyncPostgrestClient: """Private helper for creating an instance of the Postgrest client.""" return SyncPostgrestClient( rest_url, headers=headers, schema=schema, timeout=timeout, verify=verify, proxy=proxy, ) def _create_auth_header(self, token: str): return f"Bearer {token}" def _get_auth_headers(self, authorization: Optional[str] = None) -> Dict[str, str]: if authorization is None: authorization = self.options.headers.get( "Authorization", self._create_auth_header(self.supabase_key) ) """Helper method to get auth headers.""" return { "apiKey": self.supabase_key, "Authorization": authorization, } def _listen_to_auth_events( self, event: AuthChangeEvent, session: Optional[Session] ): access_token = self.supabase_key if event in ["SIGNED_IN", "TOKEN_REFRESHED", "SIGNED_OUT"]: # reset postgrest and storage instance on event change self._postgrest = None self._storage = None self._functions = None access_token = session.access_token if session else self.supabase_key self.options.headers["Authorization"] = self._create_auth_header(access_token) def create_client( supabase_url: str, supabase_key: str, options: Optional[ClientOptions] = None, ) -> SyncClient: """Create client function to instantiate supabase client like JS runtime. Parameters ---------- supabase_url: str The URL to the Supabase instance that should be connected to. supabase_key: str The API key to the Supabase instance that should be connected to. **options Any extra settings to be optionally specified - also see the `DEFAULT_OPTIONS` dict. Examples -------- Instantiating the client. >>> import os >>> from supabase import create_client, Client >>> >>> url: str = os.environ.get("SUPABASE_TEST_URL") >>> key: str = os.environ.get("SUPABASE_TEST_KEY") >>> supabase: Client = create_client(url, key) Returns ------- Client """ return SyncClient.create( supabase_url=supabase_url, supabase_key=supabase_key, options=options )