diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/gotrue')
22 files changed, 4640 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/gotrue/__init__.py b/.venv/lib/python3.12/site-packages/gotrue/__init__.py new file mode 100644 index 00000000..2a13526f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/__init__.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from ._async.gotrue_admin_api import AsyncGoTrueAdminAPI # type: ignore # noqa: F401 +from ._async.gotrue_client import AsyncGoTrueClient # type: ignore # noqa: F401 +from ._async.storage import AsyncMemoryStorage # type: ignore # noqa: F401 +from ._async.storage import AsyncSupportedStorage # type: ignore # noqa: F401 +from ._sync.gotrue_admin_api import SyncGoTrueAdminAPI # type: ignore # noqa: F401 +from ._sync.gotrue_client import SyncGoTrueClient # type: ignore # noqa: F401 +from ._sync.storage import SyncMemoryStorage # type: ignore # noqa: F401 +from ._sync.storage import SyncSupportedStorage # type: ignore # noqa: F401 +from .types import * # type: ignore # noqa: F401, F403 +from .version import __version__ diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/__init__.py b/.venv/lib/python3.12/site-packages/gotrue/_async/__init__.py new file mode 100644 index 00000000..9d48db4f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_async/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_api.py new file mode 100644 index 00000000..54d48739 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_api.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +from functools import partial +from typing import Dict, List, Optional + +from ..helpers import model_validate, parse_link_response, parse_user_response +from ..http_clients import AsyncClient +from ..types import ( + AdminUserAttributes, + AuthMFAAdminDeleteFactorParams, + AuthMFAAdminDeleteFactorResponse, + AuthMFAAdminListFactorsParams, + AuthMFAAdminListFactorsResponse, + GenerateLinkParams, + GenerateLinkResponse, + InviteUserByEmailOptions, + SignOutScope, + User, + UserResponse, +) +from .gotrue_admin_mfa_api import AsyncGoTrueAdminMFAAPI +from .gotrue_base_api import AsyncGoTrueBaseAPI + + +class AsyncGoTrueAdminAPI(AsyncGoTrueBaseAPI): + def __init__( + self, + *, + url: str = "", + headers: Dict[str, str] = {}, + http_client: Optional[AsyncClient] = None, + verify: bool = True, + proxy: Optional[str] = None, + ) -> None: + AsyncGoTrueBaseAPI.__init__( + self, + url=url, + headers=headers, + http_client=http_client, + verify=verify, + proxy=proxy, + ) + self.mfa = AsyncGoTrueAdminMFAAPI() + self.mfa.list_factors = self._list_factors + self.mfa.delete_factor = self._delete_factor + + async def sign_out(self, jwt: str, scope: SignOutScope = "global") -> None: + """ + Removes a logged-in session. + """ + return await self._request( + "POST", + "logout", + query={"scope": scope}, + jwt=jwt, + no_resolve_json=True, + ) + + async def invite_user_by_email( + self, + email: str, + options: InviteUserByEmailOptions = {}, + ) -> UserResponse: + """ + Sends an invite link to an email address. + """ + return await self._request( + "POST", + "invite", + body={"email": email, "data": options.get("data")}, + redirect_to=options.get("redirect_to"), + xform=parse_user_response, + ) + + async def generate_link(self, params: GenerateLinkParams) -> GenerateLinkResponse: + """ + Generates email links and OTPs to be sent via a custom email provider. + """ + return await self._request( + "POST", + "admin/generate_link", + body={ + "type": params.get("type"), + "email": params.get("email"), + "password": params.get("password"), + "new_email": params.get("new_email"), + "data": params.get("options", {}).get("data"), + }, + redirect_to=params.get("options", {}).get("redirect_to"), + xform=parse_link_response, + ) + + # User Admin API + + async def create_user(self, attributes: AdminUserAttributes) -> UserResponse: + """ + Creates a new user. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return await self._request( + "POST", + "admin/users", + body=attributes, + xform=parse_user_response, + ) + + async def list_users(self, page: int = None, per_page: int = None) -> List[User]: + """ + Get a list of users. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return await self._request( + "GET", + "admin/users", + query={"page": page, "per_page": per_page}, + xform=lambda data: ( + [model_validate(User, user) for user in data["users"]] + if "users" in data + else [] + ), + ) + + async def get_user_by_id(self, uid: str) -> UserResponse: + """ + Get user by id. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return await self._request( + "GET", + f"admin/users/{uid}", + xform=parse_user_response, + ) + + async def update_user_by_id( + self, + uid: str, + attributes: AdminUserAttributes, + ) -> UserResponse: + """ + Updates the user data. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return await self._request( + "PUT", + f"admin/users/{uid}", + body=attributes, + xform=parse_user_response, + ) + + async def delete_user(self, id: str, should_soft_delete: bool = False) -> None: + """ + Delete a user. Requires a `service_role` key. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + body = {"should_soft_delete": should_soft_delete} + return await self._request("DELETE", f"admin/users/{id}", body=body) + + async def _list_factors( + self, + params: AuthMFAAdminListFactorsParams, + ) -> AuthMFAAdminListFactorsResponse: + return await self._request( + "GET", + f"admin/users/{params.get('user_id')}/factors", + xform=partial(model_validate, AuthMFAAdminListFactorsResponse), + ) + + async def _delete_factor( + self, + params: AuthMFAAdminDeleteFactorParams, + ) -> AuthMFAAdminDeleteFactorResponse: + return await self._request( + "DELETE", + f"admin/users/{params.get('user_id')}/factors/{params.get('factor_id')}", + xform=partial(model_validate, AuthMFAAdminDeleteFactorResponse), + ) diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_mfa_api.py new file mode 100644 index 00000000..ca812fcd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_mfa_api.py @@ -0,0 +1,32 @@ +from ..types import ( + AuthMFAAdminDeleteFactorParams, + AuthMFAAdminDeleteFactorResponse, + AuthMFAAdminListFactorsParams, + AuthMFAAdminListFactorsResponse, +) + + +class AsyncGoTrueAdminMFAAPI: + """ + Contains the full multi-factor authentication administration API. + """ + + async def list_factors( + self, + params: AuthMFAAdminListFactorsParams, + ) -> AuthMFAAdminListFactorsResponse: + """ + Lists all factors attached to a user. + """ + raise NotImplementedError() # pragma: no cover + + async def delete_factor( + self, + params: AuthMFAAdminDeleteFactorParams, + ) -> AuthMFAAdminDeleteFactorResponse: + """ + Deletes a factor on a user. This will log the user out of all active + sessions (if the deleted factor was verified). There's no need to delete + unverified factors. + """ + raise NotImplementedError() # pragma: no cover diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_base_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_base_api.py new file mode 100644 index 00000000..21d0b444 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_base_api.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from typing import Any, Callable, Dict, Optional, TypeVar, overload + +from httpx import Response +from pydantic import BaseModel +from typing_extensions import Literal, Self + +from ..constants import API_VERSION_HEADER_NAME, API_VERSIONS +from ..helpers import handle_exception, model_dump +from ..http_clients import AsyncClient + +T = TypeVar("T") + + +class AsyncGoTrueBaseAPI: + def __init__( + self, + *, + url: str, + headers: Dict[str, str], + http_client: Optional[AsyncClient], + verify: bool = True, + proxy: Optional[str] = None, + ): + self._url = url + self._headers = headers + self._http_client = http_client or AsyncClient( + verify=bool(verify), + proxy=proxy, + follow_redirects=True, + http2=True, + ) + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, exc_t, exc_v, exc_tb) -> None: + await self.close() + + async def close(self) -> None: + await self._http_client.aclose() + + @overload + async def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: Literal[False] = False, + xform: Callable[[Any], T], + ) -> T: ... # pragma: no cover + + @overload + async def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: Literal[True], + xform: Callable[[Response], T], + ) -> T: ... # pragma: no cover + + @overload + async def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: bool = False, + ) -> None: ... # pragma: no cover + + async def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: bool = False, + xform: Optional[Callable[[Any], T]] = None, + ) -> Optional[T]: + url = f"{self._url}/{path}" + headers = {**self._headers, **(headers or {})} + if API_VERSION_HEADER_NAME not in headers: + headers[API_VERSION_HEADER_NAME] = API_VERSIONS["2024-01-01"].get("name") + if "Content-Type" not in headers: + headers["Content-Type"] = "application/json;charset=UTF-8" + if jwt: + headers["Authorization"] = f"Bearer {jwt}" + query = query or {} + if redirect_to: + query["redirect_to"] = redirect_to + try: + response = await self._http_client.request( + method, + url, + headers=headers, + params=query, + json=model_dump(body) if isinstance(body, BaseModel) else body, + ) + response.raise_for_status() + result = response if no_resolve_json else response.json() + if xform: + return xform(result) + except Exception as e: + raise handle_exception(e) diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_client.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_client.py new file mode 100644 index 00000000..2a2a5564 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_client.py @@ -0,0 +1,1152 @@ +from __future__ import annotations + +from contextlib import suppress +from functools import partial +from json import loads +from time import time +from typing import Callable, Dict, List, Optional, Tuple +from urllib.parse import parse_qs, urlencode, urlparse +from uuid import uuid4 + +from ..constants import ( + DEFAULT_HEADERS, + EXPIRY_MARGIN, + GOTRUE_URL, + MAX_RETRIES, + RETRY_INTERVAL, + STORAGE_KEY, +) +from ..errors import ( + AuthApiError, + AuthImplicitGrantRedirectError, + AuthInvalidCredentialsError, + AuthRetryableError, + AuthSessionMissingError, +) +from ..helpers import ( + decode_jwt_payload, + generate_pkce_challenge, + generate_pkce_verifier, + model_dump, + model_dump_json, + model_validate, + parse_auth_otp_response, + parse_auth_response, + parse_link_identity_response, + parse_sso_response, + parse_user_response, +) +from ..http_clients import AsyncClient +from ..timer import Timer +from ..types import ( + AuthChangeEvent, + AuthenticatorAssuranceLevels, + AuthFlowType, + AuthMFAChallengeResponse, + AuthMFAEnrollResponse, + AuthMFAGetAuthenticatorAssuranceLevelResponse, + AuthMFAListFactorsResponse, + AuthMFAUnenrollResponse, + AuthMFAVerifyResponse, + AuthOtpResponse, + AuthResponse, + CodeExchangeParams, + DecodedJWTDict, + IdentitiesResponse, + MFAChallengeAndVerifyParams, + MFAChallengeParams, + MFAEnrollParams, + MFAUnenrollParams, + MFAVerifyParams, + OAuthResponse, + Options, + Provider, + ResendCredentials, + Session, + SignInAnonymouslyCredentials, + SignInWithIdTokenCredentials, + SignInWithOAuthCredentials, + SignInWithPasswordCredentials, + SignInWithPasswordlessCredentials, + SignInWithSSOCredentials, + SignOutOptions, + SignUpWithPasswordCredentials, + Subscription, + UserAttributes, + UserIdentity, + UserResponse, + VerifyOtpParams, +) +from .gotrue_admin_api import AsyncGoTrueAdminAPI +from .gotrue_base_api import AsyncGoTrueBaseAPI +from .gotrue_mfa_api import AsyncGoTrueMFAAPI +from .storage import AsyncMemoryStorage, AsyncSupportedStorage + + +class AsyncGoTrueClient(AsyncGoTrueBaseAPI): + def __init__( + self, + *, + url: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + storage_key: Optional[str] = None, + auto_refresh_token: bool = True, + persist_session: bool = True, + storage: Optional[AsyncSupportedStorage] = None, + http_client: Optional[AsyncClient] = None, + flow_type: AuthFlowType = "implicit", + verify: bool = True, + proxy: Optional[str] = None, + ) -> None: + AsyncGoTrueBaseAPI.__init__( + self, + url=url or GOTRUE_URL, + headers=headers or DEFAULT_HEADERS, + http_client=http_client, + verify=verify, + proxy=proxy, + ) + self._storage_key = storage_key or STORAGE_KEY + self._auto_refresh_token = auto_refresh_token + self._persist_session = persist_session + self._storage = storage or AsyncMemoryStorage() + self._in_memory_session: Optional[Session] = None + self._refresh_token_timer: Optional[Timer] = None + self._network_retries = 0 + self._state_change_emitters: Dict[str, Subscription] = {} + self._flow_type = flow_type + + self.admin = AsyncGoTrueAdminAPI( + url=self._url, + headers=self._headers, + http_client=self._http_client, + ) + self.mfa = AsyncGoTrueMFAAPI() + self.mfa.challenge = self._challenge + self.mfa.challenge_and_verify = self._challenge_and_verify + self.mfa.enroll = self._enroll + self.mfa.get_authenticator_assurance_level = ( + self._get_authenticator_assurance_level + ) + self.mfa.list_factors = self._list_factors + self.mfa.unenroll = self._unenroll + self.mfa.verify = self._verify + + # Initializations + + async def initialize(self, *, url: Optional[str] = None) -> None: + if url and self._is_implicit_grant_flow(url): + await self.initialize_from_url(url) + else: + await self.initialize_from_storage() + + async def initialize_from_storage(self) -> None: + return await self._recover_and_refresh() + + async def initialize_from_url(self, url: str) -> None: + try: + if self._is_implicit_grant_flow(url): + session, redirect_type = await self._get_session_from_url(url) + await self._save_session(session) + self._notify_all_subscribers("SIGNED_IN", session) + if redirect_type == "recovery": + self._notify_all_subscribers("PASSWORD_RECOVERY", session) + except Exception as e: + await self._remove_session() + raise e + + # Public methods + + async def sign_in_anonymously( + self, credentials: Optional[SignInAnonymouslyCredentials] = None + ) -> AuthResponse: + """ + Creates a new anonymous user. + """ + await self._remove_session() + if credentials is None: + credentials = {"options": {}} + options = credentials.get("options", {}) + data = options.get("data") or {} + captcha_token = options.get("captcha_token") + response = await self._request( + "POST", + "signup", + body={ + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + xform=parse_auth_response, + ) + if response.session: + await self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + async def sign_up( + self, + credentials: SignUpWithPasswordCredentials, + ) -> AuthResponse: + """ + Creates a new user. + """ + await self._remove_session() + email = credentials.get("email") + phone = credentials.get("phone") + password = credentials.get("password") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") or options.get("email_redirect_to") + data = options.get("data") or {} + channel = options.get("channel", "sms") + captcha_token = options.get("captcha_token") + if email: + response = await self._request( + "POST", + "signup", + body={ + "email": email, + "password": password, + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + redirect_to=redirect_to, + xform=parse_auth_response, + ) + elif phone: + response = await self._request( + "POST", + "signup", + body={ + "phone": phone, + "password": password, + "data": data, + "channel": channel, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + xform=parse_auth_response, + ) + else: + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number and a password" + ) + if response.session: + await self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + async def sign_in_with_password( + self, + credentials: SignInWithPasswordCredentials, + ) -> AuthResponse: + """ + Log in an existing user with an email or phone and password. + """ + await self._remove_session() + email = credentials.get("email") + phone = credentials.get("phone") + password = credentials.get("password") + options = credentials.get("options", {}) + data = options.get("data") or {} + captcha_token = options.get("captcha_token") + if email: + response = await self._request( + "POST", + "token", + body={ + "email": email, + "password": password, + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + query={ + "grant_type": "password", + }, + xform=parse_auth_response, + ) + elif phone: + response = await self._request( + "POST", + "token", + body={ + "phone": phone, + "password": password, + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + query={ + "grant_type": "password", + }, + xform=parse_auth_response, + ) + else: + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number and a password" + ) + if response.session: + await self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + async def sign_in_with_id_token( + self, + credentials: SignInWithIdTokenCredentials, + ) -> AuthResponse: + """ + Allows signing in with an OIDC ID token. The authentication provider used should be enabled and configured. + """ + await self._remove_session() + provider = credentials.get("provider") + token = credentials.get("token") + access_token = credentials.get("access_token") + nonce = credentials.get("nonce") + options = credentials.get("options", {}) + captcha_token = options.get("captcha_token") + + response = await self._request( + "POST", + "token", + body={ + "provider": provider, + "id_token": token, + "access_token": access_token, + "nonce": nonce, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + query={ + "grant_type": "id_token", + }, + xform=parse_auth_response, + ) + + if response.session: + await self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + async def sign_in_with_sso(self, credentials: SignInWithSSOCredentials): + """ + Attempts a single-sign on using an enterprise Identity Provider. A + successful SSO attempt will redirect the current page to the identity + provider authorization page. The redirect URL is implementation and SSO + protocol specific. + + You can use it by providing a SSO domain. Typically you can extract this + domain by asking users for their email address. If this domain is + registered on the Auth instance the redirect will use that organization's + currently active SSO Identity Provider for the login. + If you have built an organization-specific login page, you can use the + organization's SSO Identity Provider UUID directly instead. + """ + await self._remove_session() + provider_id = credentials.get("provider_id") + domain = credentials.get("domain") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") + captcha_token = options.get("captcha_token") + # HTTPX currently does not follow redirects: https://www.python-httpx.org/compatibility/ + # Additionally, unlike the JS client, Python is a server side language and it's not possible + # to automatically redirect in browser for hte user + skip_http_redirect = options.get("skip_http_redirect", True) + + if domain: + return await self._request( + "POST", + "sso", + body={ + "domain": domain, + "skip_http_redirect": skip_http_redirect, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + "redirect_to": redirect_to, + }, + xform=parse_sso_response, + ) + if provider_id: + return await self._request( + "POST", + "sso", + body={ + "provider_id": provider_id, + "skip_http_redirect": skip_http_redirect, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + "redirect_to": redirect_to, + }, + xform=parse_sso_response, + ) + raise AuthInvalidCredentialsError( + "You must provide either a domain or provider_id" + ) + + async def sign_in_with_oauth( + self, + credentials: SignInWithOAuthCredentials, + ) -> OAuthResponse: + """ + Log in an existing user via a third-party provider. + """ + await self._remove_session() + + provider = credentials.get("provider") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") + scopes = options.get("scopes") + params = options.get("query_params", {}) + if redirect_to: + params["redirect_to"] = redirect_to + if scopes: + params["scopes"] = scopes + url_with_qs, _ = await self._get_url_for_provider( + f"{self._url}/authorize", provider, params + ) + return OAuthResponse(provider=provider, url=url_with_qs) + + async def link_identity( + self, credentials: SignInWithOAuthCredentials + ) -> OAuthResponse: + provider = credentials.get("provider") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") + scopes = options.get("scopes") + params = options.get("query_params", {}) + if redirect_to: + params["redirect_to"] = redirect_to + if scopes: + params["scopes"] = scopes + params["skip_http_redirect"] = "true" + url = "user/identities/authorize" + _, query = await self._get_url_for_provider(url, provider, params) + + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + + response = await self._request( + method="GET", + path=url, + query=query, + jwt=session.access_token, + xform=parse_link_identity_response, + ) + return OAuthResponse(provider=provider, url=response.url) + + async def get_user_identities(self): + response = await self.get_user() + return ( + IdentitiesResponse(identities=response.user.identities) + if response.user + else AuthSessionMissingError() + ) + + async def unlink_identity(self, identity: UserIdentity): + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + + return await self._request( + "DELETE", + f"user/identities/{identity.identity_id}", + jwt=session.access_token, + ) + + async def sign_in_with_otp( + self, + credentials: SignInWithPasswordlessCredentials, + ) -> AuthOtpResponse: + """ + Log in a user using magiclink or a one-time password (OTP). + + If the `{{ .ConfirmationURL }}` variable is specified in + the email template, a magiclink will be sent. + + If the `{{ .Token }}` variable is specified in the email + template, an OTP will be sent. + + If you're using phone sign-ins, only an OTP will be sent. + You won't be able to send a magiclink for phone sign-ins. + """ + await self._remove_session() + email = credentials.get("email") + phone = credentials.get("phone") + options = credentials.get("options", {}) + email_redirect_to = options.get("email_redirect_to") + should_create_user = options.get("should_create_user", True) + data = options.get("data") + channel = options.get("channel", "sms") + captcha_token = options.get("captcha_token") + if email: + return await self._request( + "POST", + "otp", + body={ + "email": email, + "data": data, + "create_user": should_create_user, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + redirect_to=email_redirect_to, + xform=parse_auth_otp_response, + ) + if phone: + return await self._request( + "POST", + "otp", + body={ + "phone": phone, + "data": data, + "create_user": should_create_user, + "channel": channel, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + xform=parse_auth_otp_response, + ) + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number" + ) + + async def resend( + self, + credentials: ResendCredentials, + ) -> AuthOtpResponse: + """ + Resends an existing signup confirmation email, email change email, SMS OTP or phone change OTP. + """ + email = credentials.get("email") + phone = credentials.get("phone") + type = credentials.get("type") + options = credentials.get("options", {}) + email_redirect_to = options.get("email_redirect_to") + captcha_token = options.get("captcha_token") + body = { + "type": type, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + } + + if email is None and phone is None: + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number" + ) + + body.update({"email": email} if email else {"phone": phone}) + + return await self._request( + "POST", + "resend", + body=body, + redirect_to=email_redirect_to if email else None, + xform=parse_auth_otp_response, + ) + + async def verify_otp(self, params: VerifyOtpParams) -> AuthResponse: + """ + Log in a user given a User supplied OTP received via mobile. + """ + await self._remove_session() + response = await self._request( + "POST", + "verify", + body={ + "gotrue_meta_security": { + "captcha_token": params.get("options", {}).get("captcha_token"), + }, + **params, + }, + redirect_to=params.get("options", {}).get("redirect_to"), + xform=parse_auth_response, + ) + if response.session: + await self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + async def reauthenticate(self) -> AuthResponse: + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + + return await self._request( + "GET", + "reauthenticate", + jwt=session.access_token, + xform=parse_auth_response, + ) + + async def get_session(self) -> Optional[Session]: + """ + Returns the session, refreshing it if necessary. + + The session returned can be null if the session is not detected which + can happen in the event a user is not signed-in or has logged out. + """ + current_session: Optional[Session] = None + if self._persist_session: + maybe_session = await self._storage.get_item(self._storage_key) + current_session = self._get_valid_session(maybe_session) + if not current_session: + await self._remove_session() + else: + current_session = self._in_memory_session + if not current_session: + return None + time_now = round(time()) + has_expired = ( + current_session.expires_at <= time_now + EXPIRY_MARGIN + if current_session.expires_at + else False + ) + return ( + await self._call_refresh_token(current_session.refresh_token) + if has_expired + else current_session + ) + + async def get_user(self, jwt: Optional[str] = None) -> Optional[UserResponse]: + """ + Gets the current user details if there is an existing session. + + Takes in an optional access token `jwt`. If no `jwt` is provided, + `get_user()` will attempt to get the `jwt` from the current session. + """ + if not jwt: + session = await self.get_session() + if session: + jwt = session.access_token + else: + return None + return await self._request("GET", "user", jwt=jwt, xform=parse_user_response) + + async def update_user(self, attributes: UserAttributes) -> UserResponse: + """ + Updates user data, if there is a logged in user. + """ + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + response = await self._request( + "PUT", + "user", + body=attributes, + jwt=session.access_token, + xform=parse_user_response, + ) + session.user = response.user + await self._save_session(session) + self._notify_all_subscribers("USER_UPDATED", session) + return response + + async def set_session(self, access_token: str, refresh_token: str) -> AuthResponse: + """ + Sets the session data from the current session. If the current session + is expired, `set_session` will take care of refreshing it to obtain a + new session. + + If the refresh token in the current session is invalid and the current + session has expired, an error will be thrown. + + If the current session does not contain at `expires_at` field, + `set_session` will use the exp claim defined in the access token. + + The current session that minimally contains an access token, + refresh token and a user. + """ + time_now = round(time()) + expires_at = time_now + has_expired = True + session: Optional[Session] = None + if access_token and access_token.split(".")[1]: + payload = self._decode_jwt(access_token) + exp = payload.get("exp") + if exp: + expires_at = int(exp) + has_expired = expires_at <= time_now + if has_expired: + if not refresh_token: + raise AuthSessionMissingError() + response = await self._refresh_access_token(refresh_token) + if not response.session: + return AuthResponse() + session = response.session + else: + response = await self.get_user(access_token) + session = Session( + access_token=access_token, + refresh_token=refresh_token, + user=response.user, + token_type="bearer", + expires_in=expires_at - time_now, + expires_at=expires_at, + ) + await self._save_session(session) + self._notify_all_subscribers("TOKEN_REFRESHED", session) + return AuthResponse(session=session, user=response.user) + + async def refresh_session( + self, refresh_token: Optional[str] = None + ) -> AuthResponse: + """ + Returns a new session, regardless of expiry status. + + Takes in an optional current session. If not passed in, then refreshSession() + will attempt to retrieve it from getSession(). If the current session's + refresh token is invalid, an error will be thrown. + """ + if not refresh_token: + session = await self.get_session() + if session: + refresh_token = session.refresh_token + if not refresh_token: + raise AuthSessionMissingError() + session = await self._call_refresh_token(refresh_token) + return AuthResponse(session=session, user=session.user) + + async def sign_out(self, options: SignOutOptions = {"scope": "global"}) -> None: + """ + `sign_out` will remove the logged in user from the + current session and log them out - removing all items from storage and then trigger a `"SIGNED_OUT"` event. + + For advanced use cases, you can revoke all refresh tokens for a user by passing a user's JWT through to `admin.sign_out`. + + There is no way to revoke a user's access token jwt until it expires. + It is recommended to set a shorter expiry on the jwt for this reason. + """ + with suppress(AuthApiError): + session = await self.get_session() + access_token = session.access_token if session else None + if access_token: + await self.admin.sign_out(access_token, options["scope"]) + + if options["scope"] != "others": + await self._remove_session() + self._notify_all_subscribers("SIGNED_OUT", None) + + def on_auth_state_change( + self, + callback: Callable[[AuthChangeEvent, Optional[Session]], None], + ) -> Subscription: + """ + Receive a notification every time an auth event happens. + """ + unique_id = str(uuid4()) + + def _unsubscribe() -> None: + self._state_change_emitters.pop(unique_id) + + subscription = Subscription( + id=unique_id, + callback=callback, + unsubscribe=_unsubscribe, + ) + self._state_change_emitters[unique_id] = subscription + return subscription + + async def reset_password_for_email(self, email: str, options: Options = {}) -> None: + """ + Sends a password reset request to an email address. + """ + await self._request( + "POST", + "recover", + body={ + "email": email, + "gotrue_meta_security": { + "captcha_token": options.get("captcha_token"), + }, + }, + redirect_to=options.get("redirect_to"), + ) + + async def reset_password_email( + self, + email: str, + options: Options = {}, + ) -> None: + """ + Sends a password reset request to an email address. + """ + await self.reset_password_for_email(email, options) + + # MFA methods + + async def _enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse: + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + + body = { + "friendly_name": params["friendly_name"], + "factor_type": params["factor_type"], + } + + if params["factor_type"] == "phone": + body["phone"] = params["phone"] + else: + body["issuer"] = params["issuer"] + + response = await self._request( + "POST", + "factors", + body=body, + jwt=session.access_token, + xform=partial(model_validate, AuthMFAEnrollResponse), + ) + if params["factor_type"] == "totp" and response.totp.qr_code: + response.totp.qr_code = f"data:image/svg+xml;utf-8,{response.totp.qr_code}" + return response + + async def _challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse: + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + return await self._request( + "POST", + f"factors/{params.get('factor_id')}/challenge", + body={"channel": params.get("channel")}, + jwt=session.access_token, + xform=partial(model_validate, AuthMFAChallengeResponse), + ) + + async def _challenge_and_verify( + self, + params: MFAChallengeAndVerifyParams, + ) -> AuthMFAVerifyResponse: + response = await self._challenge( + { + "factor_id": params.get("factor_id"), + } + ) + return await self._verify( + { + "factor_id": params.get("factor_id"), + "challenge_id": response.id, + "code": params.get("code"), + } + ) + + async def _verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse: + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + response = await self._request( + "POST", + f"factors/{params.get('factor_id')}/verify", + body=params, + jwt=session.access_token, + xform=partial(model_validate, AuthMFAVerifyResponse), + ) + session = model_validate(Session, model_dump(response)) + await self._save_session(session) + self._notify_all_subscribers("MFA_CHALLENGE_VERIFIED", session) + return response + + async def _unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse: + session = await self.get_session() + if not session: + raise AuthSessionMissingError() + return await self._request( + "DELETE", + f"factors/{params.get('factor_id')}", + jwt=session.access_token, + xform=partial(AuthMFAUnenrollResponse, model_validate), + ) + + async def _list_factors(self) -> AuthMFAListFactorsResponse: + response = await self.get_user() + all = response.user.factors or [] + totp = [f for f in all if f.factor_type == "totp" and f.status == "verified"] + phone = [f for f in all if f.factor_type == "phone" and f.status == "verified"] + return AuthMFAListFactorsResponse(all=all, totp=totp, phone=phone) + + async def _get_authenticator_assurance_level( + self, + ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse: + session = await self.get_session() + if not session: + return AuthMFAGetAuthenticatorAssuranceLevelResponse( + current_level=None, + next_level=None, + current_authentication_methods=[], + ) + payload = self._decode_jwt(session.access_token) + current_level: Optional[AuthenticatorAssuranceLevels] = None + if payload.get("aal"): + current_level = payload.get("aal") + verified_factors = [ + f for f in session.user.factors or [] if f.status == "verified" + ] + next_level = "aal2" if verified_factors else current_level + current_authentication_methods = payload.get("amr") or [] + return AuthMFAGetAuthenticatorAssuranceLevelResponse( + current_level=current_level, + next_level=next_level, + current_authentication_methods=current_authentication_methods, + ) + + # Private methods + + async def _remove_session(self) -> None: + if self._persist_session: + await self._storage.remove_item(self._storage_key) + else: + self._in_memory_session = None + if self._refresh_token_timer: + self._refresh_token_timer.cancel() + self._refresh_token_timer = None + + async def _get_session_from_url( + self, + url: str, + ) -> Tuple[Session, Optional[str]]: + if not self._is_implicit_grant_flow(url): + raise AuthImplicitGrantRedirectError("Not a valid implicit grant flow url.") + result = urlparse(url) + params = parse_qs(result.query) + error_description = self._get_param(params, "error_description") + if error_description: + error_code = self._get_param(params, "error_code") + error = self._get_param(params, "error") + if not error_code: + raise AuthImplicitGrantRedirectError("No error_code detected.") + if not error: + raise AuthImplicitGrantRedirectError("No error detected.") + raise AuthImplicitGrantRedirectError( + error_description, + {"code": error_code, "error": error}, + ) + provider_token = self._get_param(params, "provider_token") + provider_refresh_token = self._get_param(params, "provider_refresh_token") + access_token = self._get_param(params, "access_token") + if not access_token: + raise AuthImplicitGrantRedirectError("No access_token detected.") + expires_in = self._get_param(params, "expires_in") + if not expires_in: + raise AuthImplicitGrantRedirectError("No expires_in detected.") + refresh_token = self._get_param(params, "refresh_token") + if not refresh_token: + raise AuthImplicitGrantRedirectError("No refresh_token detected.") + token_type = self._get_param(params, "token_type") + if not token_type: + raise AuthImplicitGrantRedirectError("No token_type detected.") + time_now = round(time()) + expires_at = time_now + int(expires_in) + user = await self.get_user(access_token) + session = Session( + provider_token=provider_token, + provider_refresh_token=provider_refresh_token, + access_token=access_token, + expires_in=int(expires_in), + expires_at=expires_at, + refresh_token=refresh_token, + token_type=token_type, + user=user.user, + ) + redirect_type = self._get_param(params, "type") + return session, redirect_type + + async def _recover_and_refresh(self) -> None: + raw_session = await self._storage.get_item(self._storage_key) + current_session = self._get_valid_session(raw_session) + if not current_session: + if raw_session: + await self._remove_session() + return + time_now = round(time()) + expires_at = current_session.expires_at + if expires_at and expires_at < time_now + EXPIRY_MARGIN: + refresh_token = current_session.refresh_token + if self._auto_refresh_token and refresh_token: + self._network_retries += 1 + try: + await self._call_refresh_token(refresh_token) + self._network_retries = 0 + except Exception as e: + if ( + isinstance(e, AuthRetryableError) + and self._network_retries < MAX_RETRIES + ): + if self._refresh_token_timer: + self._refresh_token_timer.cancel() + self._refresh_token_timer = Timer( + (RETRY_INTERVAL ** (self._network_retries * 100)), + self._recover_and_refresh, + ) + self._refresh_token_timer.start() + return + await self._remove_session() + return + if self._persist_session: + await self._save_session(current_session) + self._notify_all_subscribers("SIGNED_IN", current_session) + + async def _call_refresh_token(self, refresh_token: str) -> Session: + if not refresh_token: + raise AuthSessionMissingError() + response = await self._refresh_access_token(refresh_token) + if not response.session: + raise AuthSessionMissingError() + await self._save_session(response.session) + self._notify_all_subscribers("TOKEN_REFRESHED", response.session) + return response.session + + async def _refresh_access_token(self, refresh_token: str) -> AuthResponse: + return await self._request( + "POST", + "token", + query={"grant_type": "refresh_token"}, + body={"refresh_token": refresh_token}, + xform=parse_auth_response, + ) + + async def _save_session(self, session: Session) -> None: + if not self._persist_session: + self._in_memory_session = session + expire_at = session.expires_at + if expire_at: + time_now = round(time()) + expire_in = expire_at - time_now + refresh_duration_before_expires = ( + EXPIRY_MARGIN if expire_in > EXPIRY_MARGIN else 0.5 + ) + value = (expire_in - refresh_duration_before_expires) * 1000 + await self._start_auto_refresh_token(value) + if self._persist_session and session.expires_at: + await self._storage.set_item(self._storage_key, model_dump_json(session)) + + async def _start_auto_refresh_token(self, value: float) -> None: + if self._refresh_token_timer: + self._refresh_token_timer.cancel() + self._refresh_token_timer = None + if value <= 0 or not self._auto_refresh_token: + return + + async def refresh_token_function(): + self._network_retries += 1 + try: + session = await self.get_session() + if session: + await self._call_refresh_token(session.refresh_token) + self._network_retries = 0 + except Exception as e: + if ( + isinstance(e, AuthRetryableError) + and self._network_retries < MAX_RETRIES + ): + await self._start_auto_refresh_token( + RETRY_INTERVAL ** (self._network_retries * 100) + ) + + self._refresh_token_timer = Timer(value, refresh_token_function) + self._refresh_token_timer.start() + + def _notify_all_subscribers( + self, + event: AuthChangeEvent, + session: Optional[Session], + ) -> None: + for subscription in self._state_change_emitters.values(): + subscription.callback(event, session) + + def _get_valid_session( + self, + raw_session: Optional[str], + ) -> Optional[Session]: + if not raw_session: + return None + data = loads(raw_session) + if not data: + return None + if not data.get("access_token"): + return None + if not data.get("refresh_token"): + return None + if not data.get("expires_at"): + return None + try: + expires_at = int(data["expires_at"]) + data["expires_at"] = expires_at + except ValueError: + return None + try: + return model_validate(Session, data) + except Exception: + return None + + def _get_param( + self, + query_params: Dict[str, List[str]], + name: str, + ) -> Optional[str]: + return query_params[name][0] if name in query_params else None + + def _is_implicit_grant_flow(self, url: str) -> bool: + result = urlparse(url) + params = parse_qs(result.query) + return "access_token" in params or "error_description" in params + + async def _get_url_for_provider( + self, + url: str, + provider: Provider, + params: Dict[str, str], + ) -> Tuple[str, Dict[str, str]]: + if self._flow_type == "pkce": + code_verifier = generate_pkce_verifier() + code_challenge = generate_pkce_challenge(code_verifier) + await self._storage.set_item( + f"{self._storage_key}-code-verifier", code_verifier + ) + code_challenge_method = ( + "plain" if code_verifier == code_challenge else "s256" + ) + params["code_challenge"] = code_challenge + params["code_challenge_method"] = code_challenge_method + + params["provider"] = provider + query = urlencode(params) + return f"{url}?{query}", params + + def _decode_jwt(self, jwt: str) -> DecodedJWTDict: + """ + Decodes a JWT (without performing any validation). + """ + return decode_jwt_payload(jwt) + + async def exchange_code_for_session(self, params: CodeExchangeParams): + code_verifier = params.get("code_verifier") or await self._storage.get_item( + f"{self._storage_key}-code-verifier" + ) + response = await self._request( + "POST", + "token", + query={"grant_type": "pkce"}, + body={ + "auth_code": params.get("auth_code"), + "code_verifier": code_verifier, + }, + redirect_to=params.get("redirect_to"), + xform=parse_auth_response, + ) + await self._storage.remove_item(f"{self._storage_key}-code-verifier") + if response.session: + await self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_mfa_api.py new file mode 100644 index 00000000..a30c4c73 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_mfa_api.py @@ -0,0 +1,94 @@ +from ..types import ( + AuthMFAChallengeResponse, + AuthMFAEnrollResponse, + AuthMFAGetAuthenticatorAssuranceLevelResponse, + AuthMFAListFactorsResponse, + AuthMFAUnenrollResponse, + AuthMFAVerifyResponse, + MFAChallengeAndVerifyParams, + MFAChallengeParams, + MFAEnrollParams, + MFAUnenrollParams, + MFAVerifyParams, +) + + +class AsyncGoTrueMFAAPI: + """ + Contains the full multi-factor authentication API. + """ + + async def enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse: + """ + Starts the enrollment process for a new Multi-Factor Authentication + factor. This method creates a new factor in the 'unverified' state. + Present the QR code or secret to the user and ask them to add it to their + authenticator app. Ask the user to provide you with an authenticator code + from their app and verify it by calling challenge and then verify. + + The first successful verification of an unverified factor activates the + factor. All other sessions are logged out and the current one gets an + `aal2` authenticator level. + """ + raise NotImplementedError() # pragma: no cover + + async def challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse: + """ + Prepares a challenge used to verify that a user has access to a MFA + factor. Provide the challenge ID and verification code by calling `verify`. + """ + raise NotImplementedError() # pragma: no cover + + async def challenge_and_verify( + self, + params: MFAChallengeAndVerifyParams, + ) -> AuthMFAVerifyResponse: + """ + Helper method which creates a challenge and immediately uses the given code + to verify against it thereafter. The verification code is provided by the + user by entering a code seen in their authenticator app. + """ + raise NotImplementedError() # pragma: no cover + + async def verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse: + """ + Verifies a verification code against a challenge. The verification code is + provided by the user by entering a code seen in their authenticator app. + """ + raise NotImplementedError() # pragma: no cover + + async def unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse: + """ + Unenroll removes a MFA factor. Unverified factors can safely be ignored + and it's not necessary to unenroll them. Unenrolling a verified MFA factor + cannot be done from a session with an `aal1` authenticator level. + """ + raise NotImplementedError() # pragma: no cover + + async def list_factors(self) -> AuthMFAListFactorsResponse: + """ + Returns the list of MFA factors enabled for this user. For most use cases + you should consider using `get_authenticator_assurance_level`. + + This uses a cached version of the factors and avoids incurring a network call. + If you need to update this list, call `get_user` first. + """ + raise NotImplementedError() # pragma: no cover + + async def get_authenticator_assurance_level( + self, + ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse: + """ + Returns the Authenticator Assurance Level (AAL) for the active session. + + - `aal1` (or `null`) means that the user's identity has been verified only + with a conventional login (email+password, OTP, magic link, social login, + etc.). + - `aal2` means that the user's identity has been verified both with a + conventional login and at least one MFA factor. + + Although this method returns a promise, it's fairly quick (microseconds) + and rarely uses the network. You can use this to check whether the current + user needs to be shown a screen to verify their MFA factors. + """ + raise NotImplementedError() # pragma: no cover diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/storage.py b/.venv/lib/python3.12/site-packages/gotrue/_async/storage.py new file mode 100644 index 00000000..5239dd9d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_async/storage.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Dict, Optional + + +class AsyncSupportedStorage(ABC): + @abstractmethod + async def get_item(self, key: str) -> Optional[str]: ... # pragma: no cover + + @abstractmethod + async def set_item(self, key: str, value: str) -> None: ... # pragma: no cover + + @abstractmethod + async def remove_item(self, key: str) -> None: ... # pragma: no cover + + +class AsyncMemoryStorage(AsyncSupportedStorage): + def __init__(self): + self.storage: Dict[str, str] = {} + + async def get_item(self, key: str) -> Optional[str]: + if key in self.storage: + return self.storage[key] + + async def set_item(self, key: str, value: str) -> None: + self.storage[key] = value + + async def remove_item(self, key: str) -> None: + if key in self.storage: + del self.storage[key] diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/__init__.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/__init__.py new file mode 100644 index 00000000..9d48db4f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_api.py new file mode 100644 index 00000000..3997c53d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_api.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +from functools import partial +from typing import Dict, List, Optional + +from ..helpers import model_validate, parse_link_response, parse_user_response +from ..http_clients import SyncClient +from ..types import ( + AdminUserAttributes, + AuthMFAAdminDeleteFactorParams, + AuthMFAAdminDeleteFactorResponse, + AuthMFAAdminListFactorsParams, + AuthMFAAdminListFactorsResponse, + GenerateLinkParams, + GenerateLinkResponse, + InviteUserByEmailOptions, + SignOutScope, + User, + UserResponse, +) +from .gotrue_admin_mfa_api import SyncGoTrueAdminMFAAPI +from .gotrue_base_api import SyncGoTrueBaseAPI + + +class SyncGoTrueAdminAPI(SyncGoTrueBaseAPI): + def __init__( + self, + *, + url: str = "", + headers: Dict[str, str] = {}, + http_client: Optional[SyncClient] = None, + verify: bool = True, + proxy: Optional[str] = None, + ) -> None: + SyncGoTrueBaseAPI.__init__( + self, + url=url, + headers=headers, + http_client=http_client, + verify=verify, + proxy=proxy, + ) + self.mfa = SyncGoTrueAdminMFAAPI() + self.mfa.list_factors = self._list_factors + self.mfa.delete_factor = self._delete_factor + + def sign_out(self, jwt: str, scope: SignOutScope = "global") -> None: + """ + Removes a logged-in session. + """ + return self._request( + "POST", + "logout", + query={"scope": scope}, + jwt=jwt, + no_resolve_json=True, + ) + + def invite_user_by_email( + self, + email: str, + options: InviteUserByEmailOptions = {}, + ) -> UserResponse: + """ + Sends an invite link to an email address. + """ + return self._request( + "POST", + "invite", + body={"email": email, "data": options.get("data")}, + redirect_to=options.get("redirect_to"), + xform=parse_user_response, + ) + + def generate_link(self, params: GenerateLinkParams) -> GenerateLinkResponse: + """ + Generates email links and OTPs to be sent via a custom email provider. + """ + return self._request( + "POST", + "admin/generate_link", + body={ + "type": params.get("type"), + "email": params.get("email"), + "password": params.get("password"), + "new_email": params.get("new_email"), + "data": params.get("options", {}).get("data"), + }, + redirect_to=params.get("options", {}).get("redirect_to"), + xform=parse_link_response, + ) + + # User Admin API + + def create_user(self, attributes: AdminUserAttributes) -> UserResponse: + """ + Creates a new user. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return self._request( + "POST", + "admin/users", + body=attributes, + xform=parse_user_response, + ) + + def list_users(self, page: int = None, per_page: int = None) -> List[User]: + """ + Get a list of users. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return self._request( + "GET", + "admin/users", + query={"page": page, "per_page": per_page}, + xform=lambda data: ( + [model_validate(User, user) for user in data["users"]] + if "users" in data + else [] + ), + ) + + def get_user_by_id(self, uid: str) -> UserResponse: + """ + Get user by id. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return self._request( + "GET", + f"admin/users/{uid}", + xform=parse_user_response, + ) + + def update_user_by_id( + self, + uid: str, + attributes: AdminUserAttributes, + ) -> UserResponse: + """ + Updates the user data. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + return self._request( + "PUT", + f"admin/users/{uid}", + body=attributes, + xform=parse_user_response, + ) + + def delete_user(self, id: str, should_soft_delete: bool = False) -> None: + """ + Delete a user. Requires a `service_role` key. + + This function should only be called on a server. + Never expose your `service_role` key in the browser. + """ + body = {"should_soft_delete": should_soft_delete} + return self._request("DELETE", f"admin/users/{id}", body=body) + + def _list_factors( + self, + params: AuthMFAAdminListFactorsParams, + ) -> AuthMFAAdminListFactorsResponse: + return self._request( + "GET", + f"admin/users/{params.get('user_id')}/factors", + xform=partial(model_validate, AuthMFAAdminListFactorsResponse), + ) + + def _delete_factor( + self, + params: AuthMFAAdminDeleteFactorParams, + ) -> AuthMFAAdminDeleteFactorResponse: + return self._request( + "DELETE", + f"admin/users/{params.get('user_id')}/factors/{params.get('factor_id')}", + xform=partial(model_validate, AuthMFAAdminDeleteFactorResponse), + ) diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_mfa_api.py new file mode 100644 index 00000000..c3fcfc8e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_mfa_api.py @@ -0,0 +1,32 @@ +from ..types import ( + AuthMFAAdminDeleteFactorParams, + AuthMFAAdminDeleteFactorResponse, + AuthMFAAdminListFactorsParams, + AuthMFAAdminListFactorsResponse, +) + + +class SyncGoTrueAdminMFAAPI: + """ + Contains the full multi-factor authentication administration API. + """ + + def list_factors( + self, + params: AuthMFAAdminListFactorsParams, + ) -> AuthMFAAdminListFactorsResponse: + """ + Lists all factors attached to a user. + """ + raise NotImplementedError() # pragma: no cover + + def delete_factor( + self, + params: AuthMFAAdminDeleteFactorParams, + ) -> AuthMFAAdminDeleteFactorResponse: + """ + Deletes a factor on a user. This will log the user out of all active + sessions (if the deleted factor was verified). There's no need to delete + unverified factors. + """ + raise NotImplementedError() # pragma: no cover diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_base_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_base_api.py new file mode 100644 index 00000000..c6c2b7b0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_base_api.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from typing import Any, Callable, Dict, Optional, TypeVar, overload + +from httpx import Response +from pydantic import BaseModel +from typing_extensions import Literal, Self + +from ..constants import API_VERSION_HEADER_NAME, API_VERSIONS +from ..helpers import handle_exception, model_dump +from ..http_clients import SyncClient + +T = TypeVar("T") + + +class SyncGoTrueBaseAPI: + def __init__( + self, + *, + url: str, + headers: Dict[str, str], + http_client: Optional[SyncClient], + verify: bool = True, + proxy: Optional[str] = None, + ): + self._url = url + self._headers = headers + self._http_client = http_client or SyncClient( + verify=bool(verify), + proxy=proxy, + follow_redirects=True, + http2=True, + ) + + def __enter__(self) -> Self: + return self + + def __exit__(self, exc_t, exc_v, exc_tb) -> None: + self.close() + + def close(self) -> None: + self._http_client.aclose() + + @overload + def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: Literal[False] = False, + xform: Callable[[Any], T], + ) -> T: ... # pragma: no cover + + @overload + def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: Literal[True], + xform: Callable[[Response], T], + ) -> T: ... # pragma: no cover + + @overload + def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: bool = False, + ) -> None: ... # pragma: no cover + + def _request( + self, + method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"], + path: str, + *, + jwt: Optional[str] = None, + redirect_to: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + query: Optional[Dict[str, str]] = None, + body: Optional[Any] = None, + no_resolve_json: bool = False, + xform: Optional[Callable[[Any], T]] = None, + ) -> Optional[T]: + url = f"{self._url}/{path}" + headers = {**self._headers, **(headers or {})} + if API_VERSION_HEADER_NAME not in headers: + headers[API_VERSION_HEADER_NAME] = API_VERSIONS["2024-01-01"].get("name") + if "Content-Type" not in headers: + headers["Content-Type"] = "application/json;charset=UTF-8" + if jwt: + headers["Authorization"] = f"Bearer {jwt}" + query = query or {} + if redirect_to: + query["redirect_to"] = redirect_to + try: + response = self._http_client.request( + method, + url, + headers=headers, + params=query, + json=model_dump(body) if isinstance(body, BaseModel) else body, + ) + response.raise_for_status() + result = response if no_resolve_json else response.json() + if xform: + return xform(result) + except Exception as e: + raise handle_exception(e) diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_client.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_client.py new file mode 100644 index 00000000..e4e821ef --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_client.py @@ -0,0 +1,1146 @@ +from __future__ import annotations + +from contextlib import suppress +from functools import partial +from json import loads +from time import time +from typing import Callable, Dict, List, Optional, Tuple +from urllib.parse import parse_qs, urlencode, urlparse +from uuid import uuid4 + +from ..constants import ( + DEFAULT_HEADERS, + EXPIRY_MARGIN, + GOTRUE_URL, + MAX_RETRIES, + RETRY_INTERVAL, + STORAGE_KEY, +) +from ..errors import ( + AuthApiError, + AuthImplicitGrantRedirectError, + AuthInvalidCredentialsError, + AuthRetryableError, + AuthSessionMissingError, +) +from ..helpers import ( + decode_jwt_payload, + generate_pkce_challenge, + generate_pkce_verifier, + model_dump, + model_dump_json, + model_validate, + parse_auth_otp_response, + parse_auth_response, + parse_link_identity_response, + parse_sso_response, + parse_user_response, +) +from ..http_clients import SyncClient +from ..timer import Timer +from ..types import ( + AuthChangeEvent, + AuthenticatorAssuranceLevels, + AuthFlowType, + AuthMFAChallengeResponse, + AuthMFAEnrollResponse, + AuthMFAGetAuthenticatorAssuranceLevelResponse, + AuthMFAListFactorsResponse, + AuthMFAUnenrollResponse, + AuthMFAVerifyResponse, + AuthOtpResponse, + AuthResponse, + CodeExchangeParams, + DecodedJWTDict, + IdentitiesResponse, + MFAChallengeAndVerifyParams, + MFAChallengeParams, + MFAEnrollParams, + MFAUnenrollParams, + MFAVerifyParams, + OAuthResponse, + Options, + Provider, + ResendCredentials, + Session, + SignInAnonymouslyCredentials, + SignInWithIdTokenCredentials, + SignInWithOAuthCredentials, + SignInWithPasswordCredentials, + SignInWithPasswordlessCredentials, + SignInWithSSOCredentials, + SignOutOptions, + SignUpWithPasswordCredentials, + Subscription, + UserAttributes, + UserIdentity, + UserResponse, + VerifyOtpParams, +) +from .gotrue_admin_api import SyncGoTrueAdminAPI +from .gotrue_base_api import SyncGoTrueBaseAPI +from .gotrue_mfa_api import SyncGoTrueMFAAPI +from .storage import SyncMemoryStorage, SyncSupportedStorage + + +class SyncGoTrueClient(SyncGoTrueBaseAPI): + def __init__( + self, + *, + url: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + storage_key: Optional[str] = None, + auto_refresh_token: bool = True, + persist_session: bool = True, + storage: Optional[SyncSupportedStorage] = None, + http_client: Optional[SyncClient] = None, + flow_type: AuthFlowType = "implicit", + verify: bool = True, + proxy: Optional[str] = None, + ) -> None: + SyncGoTrueBaseAPI.__init__( + self, + url=url or GOTRUE_URL, + headers=headers or DEFAULT_HEADERS, + http_client=http_client, + verify=verify, + proxy=proxy, + ) + self._storage_key = storage_key or STORAGE_KEY + self._auto_refresh_token = auto_refresh_token + self._persist_session = persist_session + self._storage = storage or SyncMemoryStorage() + self._in_memory_session: Optional[Session] = None + self._refresh_token_timer: Optional[Timer] = None + self._network_retries = 0 + self._state_change_emitters: Dict[str, Subscription] = {} + self._flow_type = flow_type + + self.admin = SyncGoTrueAdminAPI( + url=self._url, + headers=self._headers, + http_client=self._http_client, + ) + self.mfa = SyncGoTrueMFAAPI() + self.mfa.challenge = self._challenge + self.mfa.challenge_and_verify = self._challenge_and_verify + self.mfa.enroll = self._enroll + self.mfa.get_authenticator_assurance_level = ( + self._get_authenticator_assurance_level + ) + self.mfa.list_factors = self._list_factors + self.mfa.unenroll = self._unenroll + self.mfa.verify = self._verify + + # Initializations + + def initialize(self, *, url: Optional[str] = None) -> None: + if url and self._is_implicit_grant_flow(url): + self.initialize_from_url(url) + else: + self.initialize_from_storage() + + def initialize_from_storage(self) -> None: + return self._recover_and_refresh() + + def initialize_from_url(self, url: str) -> None: + try: + if self._is_implicit_grant_flow(url): + session, redirect_type = self._get_session_from_url(url) + self._save_session(session) + self._notify_all_subscribers("SIGNED_IN", session) + if redirect_type == "recovery": + self._notify_all_subscribers("PASSWORD_RECOVERY", session) + except Exception as e: + self._remove_session() + raise e + + # Public methods + + def sign_in_anonymously( + self, credentials: Optional[SignInAnonymouslyCredentials] = None + ) -> AuthResponse: + """ + Creates a new anonymous user. + """ + self._remove_session() + if credentials is None: + credentials = {"options": {}} + options = credentials.get("options", {}) + data = options.get("data") or {} + captcha_token = options.get("captcha_token") + response = self._request( + "POST", + "signup", + body={ + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + xform=parse_auth_response, + ) + if response.session: + self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + def sign_up( + self, + credentials: SignUpWithPasswordCredentials, + ) -> AuthResponse: + """ + Creates a new user. + """ + self._remove_session() + email = credentials.get("email") + phone = credentials.get("phone") + password = credentials.get("password") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") or options.get("email_redirect_to") + data = options.get("data") or {} + channel = options.get("channel", "sms") + captcha_token = options.get("captcha_token") + if email: + response = self._request( + "POST", + "signup", + body={ + "email": email, + "password": password, + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + redirect_to=redirect_to, + xform=parse_auth_response, + ) + elif phone: + response = self._request( + "POST", + "signup", + body={ + "phone": phone, + "password": password, + "data": data, + "channel": channel, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + xform=parse_auth_response, + ) + else: + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number and a password" + ) + if response.session: + self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + def sign_in_with_password( + self, + credentials: SignInWithPasswordCredentials, + ) -> AuthResponse: + """ + Log in an existing user with an email or phone and password. + """ + self._remove_session() + email = credentials.get("email") + phone = credentials.get("phone") + password = credentials.get("password") + options = credentials.get("options", {}) + data = options.get("data") or {} + captcha_token = options.get("captcha_token") + if email: + response = self._request( + "POST", + "token", + body={ + "email": email, + "password": password, + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + query={ + "grant_type": "password", + }, + xform=parse_auth_response, + ) + elif phone: + response = self._request( + "POST", + "token", + body={ + "phone": phone, + "password": password, + "data": data, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + query={ + "grant_type": "password", + }, + xform=parse_auth_response, + ) + else: + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number and a password" + ) + if response.session: + self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + def sign_in_with_id_token( + self, + credentials: SignInWithIdTokenCredentials, + ) -> AuthResponse: + """ + Allows signing in with an OIDC ID token. The authentication provider used should be enabled and configured. + """ + self._remove_session() + provider = credentials.get("provider") + token = credentials.get("token") + access_token = credentials.get("access_token") + nonce = credentials.get("nonce") + options = credentials.get("options", {}) + captcha_token = options.get("captcha_token") + + response = self._request( + "POST", + "token", + body={ + "provider": provider, + "id_token": token, + "access_token": access_token, + "nonce": nonce, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + query={ + "grant_type": "id_token", + }, + xform=parse_auth_response, + ) + + if response.session: + self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + def sign_in_with_sso(self, credentials: SignInWithSSOCredentials): + """ + Attempts a single-sign on using an enterprise Identity Provider. A + successful SSO attempt will redirect the current page to the identity + provider authorization page. The redirect URL is implementation and SSO + protocol specific. + + You can use it by providing a SSO domain. Typically you can extract this + domain by asking users for their email address. If this domain is + registered on the Auth instance the redirect will use that organization's + currently active SSO Identity Provider for the login. + If you have built an organization-specific login page, you can use the + organization's SSO Identity Provider UUID directly instead. + """ + self._remove_session() + provider_id = credentials.get("provider_id") + domain = credentials.get("domain") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") + captcha_token = options.get("captcha_token") + # HTTPX currently does not follow redirects: https://www.python-httpx.org/compatibility/ + # Additionally, unlike the JS client, Python is a server side language and it's not possible + # to automatically redirect in browser for hte user + skip_http_redirect = options.get("skip_http_redirect", True) + + if domain: + return self._request( + "POST", + "sso", + body={ + "domain": domain, + "skip_http_redirect": skip_http_redirect, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + "redirect_to": redirect_to, + }, + xform=parse_sso_response, + ) + if provider_id: + return self._request( + "POST", + "sso", + body={ + "provider_id": provider_id, + "skip_http_redirect": skip_http_redirect, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + "redirect_to": redirect_to, + }, + xform=parse_sso_response, + ) + raise AuthInvalidCredentialsError( + "You must provide either a domain or provider_id" + ) + + def sign_in_with_oauth( + self, + credentials: SignInWithOAuthCredentials, + ) -> OAuthResponse: + """ + Log in an existing user via a third-party provider. + """ + self._remove_session() + + provider = credentials.get("provider") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") + scopes = options.get("scopes") + params = options.get("query_params", {}) + if redirect_to: + params["redirect_to"] = redirect_to + if scopes: + params["scopes"] = scopes + url_with_qs, _ = self._get_url_for_provider( + f"{self._url}/authorize", provider, params + ) + return OAuthResponse(provider=provider, url=url_with_qs) + + def link_identity(self, credentials: SignInWithOAuthCredentials) -> OAuthResponse: + provider = credentials.get("provider") + options = credentials.get("options", {}) + redirect_to = options.get("redirect_to") + scopes = options.get("scopes") + params = options.get("query_params", {}) + if redirect_to: + params["redirect_to"] = redirect_to + if scopes: + params["scopes"] = scopes + params["skip_http_redirect"] = "true" + url = "user/identities/authorize" + _, query = self._get_url_for_provider(url, provider, params) + + session = self.get_session() + if not session: + raise AuthSessionMissingError() + + response = self._request( + method="GET", + path=url, + query=query, + jwt=session.access_token, + xform=parse_link_identity_response, + ) + return OAuthResponse(provider=provider, url=response.url) + + def get_user_identities(self): + response = self.get_user() + return ( + IdentitiesResponse(identities=response.user.identities) + if response.user + else AuthSessionMissingError() + ) + + def unlink_identity(self, identity: UserIdentity): + session = self.get_session() + if not session: + raise AuthSessionMissingError() + + return self._request( + "DELETE", + f"user/identities/{identity.identity_id}", + jwt=session.access_token, + ) + + def sign_in_with_otp( + self, + credentials: SignInWithPasswordlessCredentials, + ) -> AuthOtpResponse: + """ + Log in a user using magiclink or a one-time password (OTP). + + If the `{{ .ConfirmationURL }}` variable is specified in + the email template, a magiclink will be sent. + + If the `{{ .Token }}` variable is specified in the email + template, an OTP will be sent. + + If you're using phone sign-ins, only an OTP will be sent. + You won't be able to send a magiclink for phone sign-ins. + """ + self._remove_session() + email = credentials.get("email") + phone = credentials.get("phone") + options = credentials.get("options", {}) + email_redirect_to = options.get("email_redirect_to") + should_create_user = options.get("should_create_user", True) + data = options.get("data") + channel = options.get("channel", "sms") + captcha_token = options.get("captcha_token") + if email: + return self._request( + "POST", + "otp", + body={ + "email": email, + "data": data, + "create_user": should_create_user, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + redirect_to=email_redirect_to, + xform=parse_auth_otp_response, + ) + if phone: + return self._request( + "POST", + "otp", + body={ + "phone": phone, + "data": data, + "create_user": should_create_user, + "channel": channel, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + }, + xform=parse_auth_otp_response, + ) + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number" + ) + + def resend( + self, + credentials: ResendCredentials, + ) -> AuthOtpResponse: + """ + Resends an existing signup confirmation email, email change email, SMS OTP or phone change OTP. + """ + email = credentials.get("email") + phone = credentials.get("phone") + type = credentials.get("type") + options = credentials.get("options", {}) + email_redirect_to = options.get("email_redirect_to") + captcha_token = options.get("captcha_token") + body = { + "type": type, + "gotrue_meta_security": { + "captcha_token": captcha_token, + }, + } + + if email is None and phone is None: + raise AuthInvalidCredentialsError( + "You must provide either an email or phone number" + ) + + body.update({"email": email} if email else {"phone": phone}) + + return self._request( + "POST", + "resend", + body=body, + redirect_to=email_redirect_to if email else None, + xform=parse_auth_otp_response, + ) + + def verify_otp(self, params: VerifyOtpParams) -> AuthResponse: + """ + Log in a user given a User supplied OTP received via mobile. + """ + self._remove_session() + response = self._request( + "POST", + "verify", + body={ + "gotrue_meta_security": { + "captcha_token": params.get("options", {}).get("captcha_token"), + }, + **params, + }, + redirect_to=params.get("options", {}).get("redirect_to"), + xform=parse_auth_response, + ) + if response.session: + self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response + + def reauthenticate(self) -> AuthResponse: + session = self.get_session() + if not session: + raise AuthSessionMissingError() + + return self._request( + "GET", + "reauthenticate", + jwt=session.access_token, + xform=parse_auth_response, + ) + + def get_session(self) -> Optional[Session]: + """ + Returns the session, refreshing it if necessary. + + The session returned can be null if the session is not detected which + can happen in the event a user is not signed-in or has logged out. + """ + current_session: Optional[Session] = None + if self._persist_session: + maybe_session = self._storage.get_item(self._storage_key) + current_session = self._get_valid_session(maybe_session) + if not current_session: + self._remove_session() + else: + current_session = self._in_memory_session + if not current_session: + return None + time_now = round(time()) + has_expired = ( + current_session.expires_at <= time_now + EXPIRY_MARGIN + if current_session.expires_at + else False + ) + return ( + self._call_refresh_token(current_session.refresh_token) + if has_expired + else current_session + ) + + def get_user(self, jwt: Optional[str] = None) -> Optional[UserResponse]: + """ + Gets the current user details if there is an existing session. + + Takes in an optional access token `jwt`. If no `jwt` is provided, + `get_user()` will attempt to get the `jwt` from the current session. + """ + if not jwt: + session = self.get_session() + if session: + jwt = session.access_token + else: + return None + return self._request("GET", "user", jwt=jwt, xform=parse_user_response) + + def update_user(self, attributes: UserAttributes) -> UserResponse: + """ + Updates user data, if there is a logged in user. + """ + session = self.get_session() + if not session: + raise AuthSessionMissingError() + response = self._request( + "PUT", + "user", + body=attributes, + jwt=session.access_token, + xform=parse_user_response, + ) + session.user = response.user + self._save_session(session) + self._notify_all_subscribers("USER_UPDATED", session) + return response + + def set_session(self, access_token: str, refresh_token: str) -> AuthResponse: + """ + Sets the session data from the current session. If the current session + is expired, `set_session` will take care of refreshing it to obtain a + new session. + + If the refresh token in the current session is invalid and the current + session has expired, an error will be thrown. + + If the current session does not contain at `expires_at` field, + `set_session` will use the exp claim defined in the access token. + + The current session that minimally contains an access token, + refresh token and a user. + """ + time_now = round(time()) + expires_at = time_now + has_expired = True + session: Optional[Session] = None + if access_token and access_token.split(".")[1]: + payload = self._decode_jwt(access_token) + exp = payload.get("exp") + if exp: + expires_at = int(exp) + has_expired = expires_at <= time_now + if has_expired: + if not refresh_token: + raise AuthSessionMissingError() + response = self._refresh_access_token(refresh_token) + if not response.session: + return AuthResponse() + session = response.session + else: + response = self.get_user(access_token) + session = Session( + access_token=access_token, + refresh_token=refresh_token, + user=response.user, + token_type="bearer", + expires_in=expires_at - time_now, + expires_at=expires_at, + ) + self._save_session(session) + self._notify_all_subscribers("TOKEN_REFRESHED", session) + return AuthResponse(session=session, user=response.user) + + def refresh_session(self, refresh_token: Optional[str] = None) -> AuthResponse: + """ + Returns a new session, regardless of expiry status. + + Takes in an optional current session. If not passed in, then refreshSession() + will attempt to retrieve it from getSession(). If the current session's + refresh token is invalid, an error will be thrown. + """ + if not refresh_token: + session = self.get_session() + if session: + refresh_token = session.refresh_token + if not refresh_token: + raise AuthSessionMissingError() + session = self._call_refresh_token(refresh_token) + return AuthResponse(session=session, user=session.user) + + def sign_out(self, options: SignOutOptions = {"scope": "global"}) -> None: + """ + `sign_out` will remove the logged in user from the + current session and log them out - removing all items from storage and then trigger a `"SIGNED_OUT"` event. + + For advanced use cases, you can revoke all refresh tokens for a user by passing a user's JWT through to `admin.sign_out`. + + There is no way to revoke a user's access token jwt until it expires. + It is recommended to set a shorter expiry on the jwt for this reason. + """ + with suppress(AuthApiError): + session = self.get_session() + access_token = session.access_token if session else None + if access_token: + self.admin.sign_out(access_token, options["scope"]) + + if options["scope"] != "others": + self._remove_session() + self._notify_all_subscribers("SIGNED_OUT", None) + + def on_auth_state_change( + self, + callback: Callable[[AuthChangeEvent, Optional[Session]], None], + ) -> Subscription: + """ + Receive a notification every time an auth event happens. + """ + unique_id = str(uuid4()) + + def _unsubscribe() -> None: + self._state_change_emitters.pop(unique_id) + + subscription = Subscription( + id=unique_id, + callback=callback, + unsubscribe=_unsubscribe, + ) + self._state_change_emitters[unique_id] = subscription + return subscription + + def reset_password_for_email(self, email: str, options: Options = {}) -> None: + """ + Sends a password reset request to an email address. + """ + self._request( + "POST", + "recover", + body={ + "email": email, + "gotrue_meta_security": { + "captcha_token": options.get("captcha_token"), + }, + }, + redirect_to=options.get("redirect_to"), + ) + + def reset_password_email( + self, + email: str, + options: Options = {}, + ) -> None: + """ + Sends a password reset request to an email address. + """ + self.reset_password_for_email(email, options) + + # MFA methods + + def _enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse: + session = self.get_session() + if not session: + raise AuthSessionMissingError() + + body = { + "friendly_name": params["friendly_name"], + "factor_type": params["factor_type"], + } + + if params["factor_type"] == "phone": + body["phone"] = params["phone"] + else: + body["issuer"] = params["issuer"] + + response = self._request( + "POST", + "factors", + body=body, + jwt=session.access_token, + xform=partial(model_validate, AuthMFAEnrollResponse), + ) + if params["factor_type"] == "totp" and response.totp.qr_code: + response.totp.qr_code = f"data:image/svg+xml;utf-8,{response.totp.qr_code}" + return response + + def _challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse: + session = self.get_session() + if not session: + raise AuthSessionMissingError() + return self._request( + "POST", + f"factors/{params.get('factor_id')}/challenge", + body={"channel": params.get("channel")}, + jwt=session.access_token, + xform=partial(model_validate, AuthMFAChallengeResponse), + ) + + def _challenge_and_verify( + self, + params: MFAChallengeAndVerifyParams, + ) -> AuthMFAVerifyResponse: + response = self._challenge( + { + "factor_id": params.get("factor_id"), + } + ) + return self._verify( + { + "factor_id": params.get("factor_id"), + "challenge_id": response.id, + "code": params.get("code"), + } + ) + + def _verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse: + session = self.get_session() + if not session: + raise AuthSessionMissingError() + response = self._request( + "POST", + f"factors/{params.get('factor_id')}/verify", + body=params, + jwt=session.access_token, + xform=partial(model_validate, AuthMFAVerifyResponse), + ) + session = model_validate(Session, model_dump(response)) + self._save_session(session) + self._notify_all_subscribers("MFA_CHALLENGE_VERIFIED", session) + return response + + def _unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse: + session = self.get_session() + if not session: + raise AuthSessionMissingError() + return self._request( + "DELETE", + f"factors/{params.get('factor_id')}", + jwt=session.access_token, + xform=partial(AuthMFAUnenrollResponse, model_validate), + ) + + def _list_factors(self) -> AuthMFAListFactorsResponse: + response = self.get_user() + all = response.user.factors or [] + totp = [f for f in all if f.factor_type == "totp" and f.status == "verified"] + phone = [f for f in all if f.factor_type == "phone" and f.status == "verified"] + return AuthMFAListFactorsResponse(all=all, totp=totp, phone=phone) + + def _get_authenticator_assurance_level( + self, + ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse: + session = self.get_session() + if not session: + return AuthMFAGetAuthenticatorAssuranceLevelResponse( + current_level=None, + next_level=None, + current_authentication_methods=[], + ) + payload = self._decode_jwt(session.access_token) + current_level: Optional[AuthenticatorAssuranceLevels] = None + if payload.get("aal"): + current_level = payload.get("aal") + verified_factors = [ + f for f in session.user.factors or [] if f.status == "verified" + ] + next_level = "aal2" if verified_factors else current_level + current_authentication_methods = payload.get("amr") or [] + return AuthMFAGetAuthenticatorAssuranceLevelResponse( + current_level=current_level, + next_level=next_level, + current_authentication_methods=current_authentication_methods, + ) + + # Private methods + + def _remove_session(self) -> None: + if self._persist_session: + self._storage.remove_item(self._storage_key) + else: + self._in_memory_session = None + if self._refresh_token_timer: + self._refresh_token_timer.cancel() + self._refresh_token_timer = None + + def _get_session_from_url( + self, + url: str, + ) -> Tuple[Session, Optional[str]]: + if not self._is_implicit_grant_flow(url): + raise AuthImplicitGrantRedirectError("Not a valid implicit grant flow url.") + result = urlparse(url) + params = parse_qs(result.query) + error_description = self._get_param(params, "error_description") + if error_description: + error_code = self._get_param(params, "error_code") + error = self._get_param(params, "error") + if not error_code: + raise AuthImplicitGrantRedirectError("No error_code detected.") + if not error: + raise AuthImplicitGrantRedirectError("No error detected.") + raise AuthImplicitGrantRedirectError( + error_description, + {"code": error_code, "error": error}, + ) + provider_token = self._get_param(params, "provider_token") + provider_refresh_token = self._get_param(params, "provider_refresh_token") + access_token = self._get_param(params, "access_token") + if not access_token: + raise AuthImplicitGrantRedirectError("No access_token detected.") + expires_in = self._get_param(params, "expires_in") + if not expires_in: + raise AuthImplicitGrantRedirectError("No expires_in detected.") + refresh_token = self._get_param(params, "refresh_token") + if not refresh_token: + raise AuthImplicitGrantRedirectError("No refresh_token detected.") + token_type = self._get_param(params, "token_type") + if not token_type: + raise AuthImplicitGrantRedirectError("No token_type detected.") + time_now = round(time()) + expires_at = time_now + int(expires_in) + user = self.get_user(access_token) + session = Session( + provider_token=provider_token, + provider_refresh_token=provider_refresh_token, + access_token=access_token, + expires_in=int(expires_in), + expires_at=expires_at, + refresh_token=refresh_token, + token_type=token_type, + user=user.user, + ) + redirect_type = self._get_param(params, "type") + return session, redirect_type + + def _recover_and_refresh(self) -> None: + raw_session = self._storage.get_item(self._storage_key) + current_session = self._get_valid_session(raw_session) + if not current_session: + if raw_session: + self._remove_session() + return + time_now = round(time()) + expires_at = current_session.expires_at + if expires_at and expires_at < time_now + EXPIRY_MARGIN: + refresh_token = current_session.refresh_token + if self._auto_refresh_token and refresh_token: + self._network_retries += 1 + try: + self._call_refresh_token(refresh_token) + self._network_retries = 0 + except Exception as e: + if ( + isinstance(e, AuthRetryableError) + and self._network_retries < MAX_RETRIES + ): + if self._refresh_token_timer: + self._refresh_token_timer.cancel() + self._refresh_token_timer = Timer( + (RETRY_INTERVAL ** (self._network_retries * 100)), + self._recover_and_refresh, + ) + self._refresh_token_timer.start() + return + self._remove_session() + return + if self._persist_session: + self._save_session(current_session) + self._notify_all_subscribers("SIGNED_IN", current_session) + + def _call_refresh_token(self, refresh_token: str) -> Session: + if not refresh_token: + raise AuthSessionMissingError() + response = self._refresh_access_token(refresh_token) + if not response.session: + raise AuthSessionMissingError() + self._save_session(response.session) + self._notify_all_subscribers("TOKEN_REFRESHED", response.session) + return response.session + + def _refresh_access_token(self, refresh_token: str) -> AuthResponse: + return self._request( + "POST", + "token", + query={"grant_type": "refresh_token"}, + body={"refresh_token": refresh_token}, + xform=parse_auth_response, + ) + + def _save_session(self, session: Session) -> None: + if not self._persist_session: + self._in_memory_session = session + expire_at = session.expires_at + if expire_at: + time_now = round(time()) + expire_in = expire_at - time_now + refresh_duration_before_expires = ( + EXPIRY_MARGIN if expire_in > EXPIRY_MARGIN else 0.5 + ) + value = (expire_in - refresh_duration_before_expires) * 1000 + self._start_auto_refresh_token(value) + if self._persist_session and session.expires_at: + self._storage.set_item(self._storage_key, model_dump_json(session)) + + def _start_auto_refresh_token(self, value: float) -> None: + if self._refresh_token_timer: + self._refresh_token_timer.cancel() + self._refresh_token_timer = None + if value <= 0 or not self._auto_refresh_token: + return + + def refresh_token_function(): + self._network_retries += 1 + try: + session = self.get_session() + if session: + self._call_refresh_token(session.refresh_token) + self._network_retries = 0 + except Exception as e: + if ( + isinstance(e, AuthRetryableError) + and self._network_retries < MAX_RETRIES + ): + self._start_auto_refresh_token( + RETRY_INTERVAL ** (self._network_retries * 100) + ) + + self._refresh_token_timer = Timer(value, refresh_token_function) + self._refresh_token_timer.start() + + def _notify_all_subscribers( + self, + event: AuthChangeEvent, + session: Optional[Session], + ) -> None: + for subscription in self._state_change_emitters.values(): + subscription.callback(event, session) + + def _get_valid_session( + self, + raw_session: Optional[str], + ) -> Optional[Session]: + if not raw_session: + return None + data = loads(raw_session) + if not data: + return None + if not data.get("access_token"): + return None + if not data.get("refresh_token"): + return None + if not data.get("expires_at"): + return None + try: + expires_at = int(data["expires_at"]) + data["expires_at"] = expires_at + except ValueError: + return None + try: + return model_validate(Session, data) + except Exception: + return None + + def _get_param( + self, + query_params: Dict[str, List[str]], + name: str, + ) -> Optional[str]: + return query_params[name][0] if name in query_params else None + + def _is_implicit_grant_flow(self, url: str) -> bool: + result = urlparse(url) + params = parse_qs(result.query) + return "access_token" in params or "error_description" in params + + def _get_url_for_provider( + self, + url: str, + provider: Provider, + params: Dict[str, str], + ) -> Tuple[str, Dict[str, str]]: + if self._flow_type == "pkce": + code_verifier = generate_pkce_verifier() + code_challenge = generate_pkce_challenge(code_verifier) + self._storage.set_item(f"{self._storage_key}-code-verifier", code_verifier) + code_challenge_method = ( + "plain" if code_verifier == code_challenge else "s256" + ) + params["code_challenge"] = code_challenge + params["code_challenge_method"] = code_challenge_method + + params["provider"] = provider + query = urlencode(params) + return f"{url}?{query}", params + + def _decode_jwt(self, jwt: str) -> DecodedJWTDict: + """ + Decodes a JWT (without performing any validation). + """ + return decode_jwt_payload(jwt) + + def exchange_code_for_session(self, params: CodeExchangeParams): + code_verifier = params.get("code_verifier") or self._storage.get_item( + f"{self._storage_key}-code-verifier" + ) + response = self._request( + "POST", + "token", + query={"grant_type": "pkce"}, + body={ + "auth_code": params.get("auth_code"), + "code_verifier": code_verifier, + }, + redirect_to=params.get("redirect_to"), + xform=parse_auth_response, + ) + self._storage.remove_item(f"{self._storage_key}-code-verifier") + if response.session: + self._save_session(response.session) + self._notify_all_subscribers("SIGNED_IN", response.session) + return response diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_mfa_api.py new file mode 100644 index 00000000..16bec8d5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_mfa_api.py @@ -0,0 +1,94 @@ +from ..types import ( + AuthMFAChallengeResponse, + AuthMFAEnrollResponse, + AuthMFAGetAuthenticatorAssuranceLevelResponse, + AuthMFAListFactorsResponse, + AuthMFAUnenrollResponse, + AuthMFAVerifyResponse, + MFAChallengeAndVerifyParams, + MFAChallengeParams, + MFAEnrollParams, + MFAUnenrollParams, + MFAVerifyParams, +) + + +class SyncGoTrueMFAAPI: + """ + Contains the full multi-factor authentication API. + """ + + def enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse: + """ + Starts the enrollment process for a new Multi-Factor Authentication + factor. This method creates a new factor in the 'unverified' state. + Present the QR code or secret to the user and ask them to add it to their + authenticator app. Ask the user to provide you with an authenticator code + from their app and verify it by calling challenge and then verify. + + The first successful verification of an unverified factor activates the + factor. All other sessions are logged out and the current one gets an + `aal2` authenticator level. + """ + raise NotImplementedError() # pragma: no cover + + def challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse: + """ + Prepares a challenge used to verify that a user has access to a MFA + factor. Provide the challenge ID and verification code by calling `verify`. + """ + raise NotImplementedError() # pragma: no cover + + def challenge_and_verify( + self, + params: MFAChallengeAndVerifyParams, + ) -> AuthMFAVerifyResponse: + """ + Helper method which creates a challenge and immediately uses the given code + to verify against it thereafter. The verification code is provided by the + user by entering a code seen in their authenticator app. + """ + raise NotImplementedError() # pragma: no cover + + def verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse: + """ + Verifies a verification code against a challenge. The verification code is + provided by the user by entering a code seen in their authenticator app. + """ + raise NotImplementedError() # pragma: no cover + + def unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse: + """ + Unenroll removes a MFA factor. Unverified factors can safely be ignored + and it's not necessary to unenroll them. Unenrolling a verified MFA factor + cannot be done from a session with an `aal1` authenticator level. + """ + raise NotImplementedError() # pragma: no cover + + def list_factors(self) -> AuthMFAListFactorsResponse: + """ + Returns the list of MFA factors enabled for this user. For most use cases + you should consider using `get_authenticator_assurance_level`. + + This uses a cached version of the factors and avoids incurring a network call. + If you need to update this list, call `get_user` first. + """ + raise NotImplementedError() # pragma: no cover + + def get_authenticator_assurance_level( + self, + ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse: + """ + Returns the Authenticator Assurance Level (AAL) for the active session. + + - `aal1` (or `null`) means that the user's identity has been verified only + with a conventional login (email+password, OTP, magic link, social login, + etc.). + - `aal2` means that the user's identity has been verified both with a + conventional login and at least one MFA factor. + + Although this method returns a promise, it's fairly quick (microseconds) + and rarely uses the network. You can use this to check whether the current + user needs to be shown a screen to verify their MFA factors. + """ + raise NotImplementedError() # pragma: no cover diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/storage.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/storage.py new file mode 100644 index 00000000..03ede0c1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/storage.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Dict, Optional + + +class SyncSupportedStorage(ABC): + @abstractmethod + def get_item(self, key: str) -> Optional[str]: ... # pragma: no cover + + @abstractmethod + def set_item(self, key: str, value: str) -> None: ... # pragma: no cover + + @abstractmethod + def remove_item(self, key: str) -> None: ... # pragma: no cover + + +class SyncMemoryStorage(SyncSupportedStorage): + def __init__(self): + self.storage: Dict[str, str] = {} + + def get_item(self, key: str) -> Optional[str]: + if key in self.storage: + return self.storage[key] + + def set_item(self, key: str, value: str) -> None: + self.storage[key] = value + + def remove_item(self, key: str) -> None: + if key in self.storage: + del self.storage[key] diff --git a/.venv/lib/python3.12/site-packages/gotrue/constants.py b/.venv/lib/python3.12/site-packages/gotrue/constants.py new file mode 100644 index 00000000..d16c0f31 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/constants.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Dict + +from .version import __version__ + +GOTRUE_URL = "http://localhost:9999" +DEFAULT_HEADERS: Dict[str, str] = { + "X-Client-Info": f"gotrue-py/{__version__}", +} +EXPIRY_MARGIN = 10 # seconds +MAX_RETRIES = 10 +RETRY_INTERVAL = 2 # deciseconds +STORAGE_KEY = "supabase.auth.token" + +API_VERSION_HEADER_NAME = "X-Supabase-Api-Version" +API_VERSIONS = { + "2024-01-01": { + "timestamp": datetime.timestamp(datetime.strptime("2024-01-01", "%Y-%m-%d")), + "name": "2024-01-01", + }, +} diff --git a/.venv/lib/python3.12/site-packages/gotrue/errors.py b/.venv/lib/python3.12/site-packages/gotrue/errors.py new file mode 100644 index 00000000..fa693894 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/errors.py @@ -0,0 +1,227 @@ +from __future__ import annotations + +from typing import List, Literal, Optional + +from typing_extensions import TypedDict + +ErrorCode = Literal[ + "unexpected_failure", + "validation_failed", + "bad_json", + "email_exists", + "phone_exists", + "bad_jwt", + "not_admin", + "no_authorization", + "user_not_found", + "session_not_found", + "flow_state_not_found", + "flow_state_expired", + "signup_disabled", + "user_banned", + "provider_email_needs_verification", + "invite_not_found", + "bad_oauth_state", + "bad_oauth_callback", + "oauth_provider_not_supported", + "unexpected_audience", + "single_identity_not_deletable", + "email_conflict_identity_not_deletable", + "identity_already_exists", + "email_provider_disabled", + "phone_provider_disabled", + "too_many_enrolled_mfa_factors", + "mfa_factor_name_conflict", + "mfa_factor_not_found", + "mfa_ip_address_mismatch", + "mfa_challenge_expired", + "mfa_verification_failed", + "mfa_verification_rejected", + "insufficient_aal", + "captcha_failed", + "saml_provider_disabled", + "manual_linking_disabled", + "sms_send_failed", + "email_not_confirmed", + "phone_not_confirmed", + "reauth_nonce_missing", + "saml_relay_state_not_found", + "saml_relay_state_expired", + "saml_idp_not_found", + "saml_assertion_no_user_id", + "saml_assertion_no_email", + "user_already_exists", + "sso_provider_not_found", + "saml_metadata_fetch_failed", + "saml_idp_already_exists", + "sso_domain_already_exists", + "saml_entity_id_mismatch", + "conflict", + "provider_disabled", + "user_sso_managed", + "reauthentication_needed", + "same_password", + "reauthentication_not_valid", + "otp_expired", + "otp_disabled", + "identity_not_found", + "weak_password", + "over_request_rate_limit", + "over_email_send_rate_limit", + "over_sms_send_rate_limit", + "bad_code_verifier", + "anonymous_provider_disabled", + "hook_timeout", + "hook_timeout_after_retry", + "hook_payload_over_size_limit", + "hook_payload_invalid_content_type", + "request_timeout", + "mfa_phone_enroll_not_enabled", + "mfa_phone_verify_not_enabled", + "mfa_totp_enroll_not_enabled", + "mfa_totp_verify_not_enabled", + "mfa_webauthn_enroll_not_enabled", + "mfa_webauthn_verify_not_enabled", + "mfa_verified_factor_exists", + "invalid_credentials", + "email_address_not_authorized", + "email_address_invalid", +] + + +class AuthError(Exception): + def __init__(self, message: str, code: ErrorCode) -> None: + Exception.__init__(self, message) + self.message = message + self.name = "AuthError" + self.code = code + + +class AuthApiErrorDict(TypedDict): + name: str + message: str + status: int + code: ErrorCode + + +class AuthApiError(AuthError): + def __init__(self, message: str, status: int, code: ErrorCode) -> None: + AuthError.__init__(self, message, code) + self.name = "AuthApiError" + self.status = status + self.code = code + + def to_dict(self) -> AuthApiErrorDict: + return { + "name": self.name, + "message": self.message, + "status": self.status, + "code": self.code, + } + + +class AuthUnknownError(AuthError): + def __init__(self, message: str, original_error: Exception) -> None: + AuthError.__init__(self, message, None) + self.name = "AuthUnknownError" + self.original_error = original_error + + +class CustomAuthError(AuthError): + def __init__(self, message: str, name: str, status: int, code: ErrorCode) -> None: + AuthError.__init__(self, message, code) + self.name = name + self.status = status + + def to_dict(self) -> AuthApiErrorDict: + return { + "name": self.name, + "message": self.message, + "status": self.status, + } + + +class AuthSessionMissingError(CustomAuthError): + def __init__(self) -> None: + CustomAuthError.__init__( + self, + "Auth session missing!", + "AuthSessionMissingError", + 400, + None, + ) + + +class AuthInvalidCredentialsError(CustomAuthError): + def __init__(self, message: str) -> None: + CustomAuthError.__init__( + self, + message, + "AuthInvalidCredentialsError", + 400, + None, + ) + + +class AuthImplicitGrantRedirectErrorDetails(TypedDict): + error: str + code: str + + +class AuthImplicitGrantRedirectErrorDict(AuthApiErrorDict): + details: Optional[AuthImplicitGrantRedirectErrorDetails] + + +class AuthImplicitGrantRedirectError(CustomAuthError): + def __init__( + self, + message: str, + details: Optional[AuthImplicitGrantRedirectErrorDetails] = None, + ) -> None: + CustomAuthError.__init__( + self, + message, + "AuthImplicitGrantRedirectError", + 500, + None, + ) + self.details = details + + def to_dict(self) -> AuthImplicitGrantRedirectErrorDict: + return { + "name": self.name, + "message": self.message, + "status": self.status, + "details": self.details, + } + + +class AuthRetryableError(CustomAuthError): + def __init__(self, message: str, status: int) -> None: + CustomAuthError.__init__( + self, + message, + "AuthRetryableError", + status, + None, + ) + + +class AuthWeakPasswordError(CustomAuthError): + def __init__(self, message: str, status: int, reasons: List[str]) -> None: + CustomAuthError.__init__( + self, + message, + "AuthWeakPasswordError", + status, + "weak_password", + ) + self.reasons = reasons + + def to_dict(self) -> AuthApiErrorDict: + return { + "name": self.name, + "message": self.message, + "status": self.status, + "reasons": self.reasons, + } diff --git a/.venv/lib/python3.12/site-packages/gotrue/helpers.py b/.venv/lib/python3.12/site-packages/gotrue/helpers.py new file mode 100644 index 00000000..ae9dc7c5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/helpers.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +import base64 +import hashlib +import re +import secrets +import string +from base64 import urlsafe_b64decode +from datetime import datetime +from json import loads +from typing import Any, Dict, Optional, Type, TypeVar, cast +from urllib.parse import urlparse + +from httpx import HTTPStatusError, Response +from pydantic import BaseModel + +from .constants import API_VERSION_HEADER_NAME, API_VERSIONS +from .errors import ( + AuthApiError, + AuthError, + AuthRetryableError, + AuthUnknownError, + AuthWeakPasswordError, +) +from .types import ( + AuthOtpResponse, + AuthResponse, + GenerateLinkProperties, + GenerateLinkResponse, + LinkIdentityResponse, + Session, + SSOResponse, + User, + UserResponse, +) + +TBaseModel = TypeVar("TBaseModel", bound=BaseModel) +BASE64URL_REGEX = r"^([a-z0-9_-]{4})*($|[a-z0-9_-]{3}$|[a-z0-9_-]{2}$)$" + + +def model_validate(model: Type[TBaseModel], contents) -> TBaseModel: + """Compatibility layer between pydantic 1 and 2 for parsing an instance + of a BaseModel from varied""" + try: + # pydantic > 2 + return model.model_validate(contents) + except AttributeError: + # pydantic < 2 + return model.parse_obj(contents) + + +def model_dump(model: BaseModel) -> Dict[str, Any]: + """Compatibility layer between pydantic 1 and 2 for dumping a model's contents as a dict""" + try: + # pydantic > 2 + return model.model_dump() + except AttributeError: + # pydantic < 2 + return model.dict() + + +def model_dump_json(model: BaseModel) -> str: + """Compatibility layer between pydantic 1 and 2 for dumping a model's contents as json""" + try: + # pydantic > 2 + return model.model_dump_json() + except AttributeError: + # pydantic < 2 + return model.json() + + +def parse_auth_response(data: Any) -> AuthResponse: + session: Optional[Session] = None + if ( + "access_token" in data + and "refresh_token" in data + and "expires_in" in data + and data["access_token"] + and data["refresh_token"] + and data["expires_in"] + ): + session = model_validate(Session, data) + user_data = data.get("user", data) + user = model_validate(User, user_data) if user_data else None + return AuthResponse(session=session, user=user) + + +def parse_auth_otp_response(data: Any) -> AuthOtpResponse: + return model_validate(AuthOtpResponse, data) + + +def parse_link_identity_response(data: Any) -> LinkIdentityResponse: + return model_validate(LinkIdentityResponse, data) + + +def parse_link_response(data: Any) -> GenerateLinkResponse: + properties = GenerateLinkProperties( + action_link=data.get("action_link"), + email_otp=data.get("email_otp"), + hashed_token=data.get("hashed_token"), + redirect_to=data.get("redirect_to"), + verification_type=data.get("verification_type"), + ) + user = model_validate( + User, {k: v for k, v in data.items() if k not in model_dump(properties)} + ) + return GenerateLinkResponse(properties=properties, user=user) + + +def parse_user_response(data: Any) -> UserResponse: + if "user" not in data: + data = {"user": data} + return model_validate(UserResponse, data) + + +def parse_sso_response(data: Any) -> SSOResponse: + return model_validate(SSOResponse, data) + + +def get_error_message(error: Any) -> str: + props = ["msg", "message", "error_description", "error"] + filter = lambda prop: ( + prop in error if isinstance(error, dict) else hasattr(error, prop) + ) + return next((error[prop] for prop in props if filter(prop)), str(error)) + + +def get_error_code(error: Any) -> str: + return error.get("error_code", None) if isinstance(error, dict) else None + + +def looks_like_http_status_error(exception: Exception) -> bool: + return isinstance(exception, HTTPStatusError) + + +def handle_exception(exception: Exception) -> AuthError: + if not looks_like_http_status_error(exception): + return AuthRetryableError(get_error_message(exception), 0) + error = cast(HTTPStatusError, exception) + try: + network_error_codes = [502, 503, 504] + if error.response.status_code in network_error_codes: + return AuthRetryableError( + get_error_message(error), error.response.status_code + ) + data = error.response.json() + + error_code = None + response_api_version = parse_response_api_version(error.response) + + if ( + response_api_version + and datetime.timestamp(response_api_version) + >= API_VERSIONS.get("2024-01-01").get("timestamp") + and isinstance(data, dict) + and data + and isinstance(data.get("code"), str) + ): + error_code = data.get("code") + elif ( + isinstance(data, dict) and data and isinstance(data.get("error_code"), str) + ): + error_code = data.get("error_code") + + if error_code is None: + if ( + isinstance(data, dict) + and data + and isinstance(data.get("weak_password"), dict) + and data.get("weak_password") + and isinstance(data.get("weak_password"), list) + and len(data.get("weak_password")) + ): + return AuthWeakPasswordError( + get_error_message(data), + error.response.status_code, + data.get("weak_password").get("reasons"), + ) + elif error_code == "weak_password": + return AuthWeakPasswordError( + get_error_message(data), + error.response.status_code, + data.get("weak_password", {}).get("reasons", {}), + ) + + return AuthApiError( + get_error_message(data), + error.response.status_code or 500, + error_code, + ) + except Exception as e: + return AuthUnknownError(get_error_message(error), e) + + +def decode_jwt_payload(token: str) -> Any: + parts = token.split(".") + if len(parts) != 3: + raise ValueError("JWT is not valid: not a JWT structure") + base64url = parts[1] + # Addding padding otherwise the following error happens: + # binascii.Error: Incorrect padding + base64url_with_padding = base64url + "=" * (-len(base64url) % 4) + return loads(urlsafe_b64decode(base64url_with_padding).decode("utf-8")) + + +def generate_pkce_verifier(length=64): + """Generate a random PKCE verifier of the specified length.""" + if length < 43 or length > 128: + raise ValueError("PKCE verifier length must be between 43 and 128 characters") + + # Define characters that can be used in the PKCE verifier + charset = string.ascii_letters + string.digits + "-._~" + + return "".join(secrets.choice(charset) for _ in range(length)) + + +def generate_pkce_challenge(code_verifier): + """Generate a code challenge from a PKCE verifier.""" + # Hash the verifier using SHA-256 + verifier_bytes = code_verifier.encode("utf-8") + sha256_hash = hashlib.sha256(verifier_bytes).digest() + + return base64.urlsafe_b64encode(sha256_hash).rstrip(b"=").decode("utf-8") + + +API_VERSION_REGEX = r"^2[0-9]{3}-(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])$" + + +def parse_response_api_version(response: Response): + api_version = response.headers.get(API_VERSION_HEADER_NAME) + + if not api_version: + return None + + if re.search(API_VERSION_REGEX, api_version) is None: + return None + + try: + dt = datetime.strptime(api_version, "%Y-%m-%d") + return dt + except Exception as e: + return None + + +def is_http_url(url: str) -> bool: + return urlparse(url).scheme in {"https", "http"} + + +def is_valid_jwt(value: str) -> bool: + """Checks if value looks like a JWT, does not do any extra parsing.""" + if not isinstance(value, str): + return False + + # Remove trailing whitespaces if any. + value = value.strip() + + # Remove "Bearer " prefix if any. + if value.startswith("Bearer "): + value = value[7:] + + # Valid JWT must have 2 dots (Header.Paylod.Signature) + if value.count(".") != 2: + return False + + for part in value.split("."): + if not re.search(BASE64URL_REGEX, part, re.IGNORECASE): + return False + + return True diff --git a/.venv/lib/python3.12/site-packages/gotrue/http_clients.py b/.venv/lib/python3.12/site-packages/gotrue/http_clients.py new file mode 100644 index 00000000..6dbd91d9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/http_clients.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from httpx import AsyncClient # noqa: F401 +from httpx import Client as BaseClient + + +class SyncClient(BaseClient): + def aclose(self) -> None: + self.close() diff --git a/.venv/lib/python3.12/site-packages/gotrue/timer.py b/.venv/lib/python3.12/site-packages/gotrue/timer.py new file mode 100644 index 00000000..11f11536 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/timer.py @@ -0,0 +1,45 @@ +import asyncio +from threading import Timer as _Timer +from typing import Any, Callable, Coroutine, Optional, cast + + +class Timer: + def __init__( + self, + seconds: float, + function: Callable[[], Optional[Coroutine[Any, Any, None]]], + ) -> None: + self._milliseconds = seconds + self._function = function + self._task: Optional[asyncio.Task] = None + self._timer: Optional[_Timer] = None + + def start(self) -> None: + if asyncio.iscoroutinefunction(self._function): + + async def schedule(): + await asyncio.sleep(self._milliseconds / 1000) + await cast(Coroutine[Any, Any, None], self._function()) + + def cleanup(_): + self._task = None + + self._task = asyncio.create_task(schedule()) + self._task.add_done_callback(cleanup) + else: + self._timer = _Timer(self._milliseconds / 1000, self._function) + self._timer.daemon = True + self._timer.start() + + def cancel(self) -> None: + if self._task is not None: + self._task.cancel() + self._task = None + if self._timer is not None: + self._timer.cancel() + self._timer = None + + def is_alive(self) -> bool: + return self._task is not None or ( + self._timer is not None and self._timer.is_alive() + ) diff --git a/.venv/lib/python3.12/site-packages/gotrue/types.py b/.venv/lib/python3.12/site-packages/gotrue/types.py new file mode 100644 index 00000000..86bda3e2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/types.py @@ -0,0 +1,818 @@ +from __future__ import annotations + +from datetime import datetime +from time import time +from typing import Any, Callable, Dict, List, Optional, Union + +from pydantic import BaseModel, ConfigDict + +try: + # > 2 + from pydantic import model_validator + + model_validator_v1_v2_compat = model_validator(mode="before") +except ImportError: + # < 2 + from pydantic import root_validator + + model_validator_v1_v2_compat = root_validator + +from typing_extensions import Literal, NotRequired, TypedDict + +Provider = Literal[ + "apple", + "azure", + "bitbucket", + "discord", + "facebook", + "figma", + "fly", + "github", + "gitlab", + "google", + "kakao", + "keycloak", + "linkedin", + "linkedin_oidc", + "notion", + "slack", + "slack_oidc", + "spotify", + "twitch", + "twitter", + "workos", + "zoom", +] + +EmailOtpType = Literal[ + "signup", "invite", "magiclink", "recovery", "email_change", "email" +] + +AuthChangeEventMFA = Literal["MFA_CHALLENGE_VERIFIED"] + +AuthFlowType = Literal["pkce", "implicit"] + +AuthChangeEvent = Literal[ + "PASSWORD_RECOVERY", + "SIGNED_IN", + "SIGNED_OUT", + "TOKEN_REFRESHED", + "USER_UPDATED", + "USER_DELETED", + AuthChangeEventMFA, +] + + +class AMREntry(BaseModel): + """ + An authentication methord reference (AMR) entry. + + An entry designates what method was used by the user to verify their + identity and at what time. + """ + + method: Union[Literal["password", "otp", "oauth", "mfa/totp"], str] + """ + Authentication method name. + """ + timestamp: int + """ + Timestamp when the method was successfully used. Represents number of + seconds since 1st January 1970 (UNIX epoch) in UTC. + """ + + +class Options(TypedDict): + redirect_to: NotRequired[str] + captcha_token: NotRequired[str] + + +class InviteUserByEmailOptions(TypedDict): + redirect_to: NotRequired[str] + data: NotRequired[Any] + + +class AuthResponse(BaseModel): + user: Optional[User] = None + session: Optional[Session] = None + + +class AuthOtpResponse(BaseModel): + user: None = None + session: None = None + message_id: Optional[str] = None + + +class OAuthResponse(BaseModel): + provider: Provider + url: str + + +class SSOResponse(BaseModel): + url: str + + +class LinkIdentityResponse(BaseModel): + url: str + + +class IdentitiesResponse(BaseModel): + identities: List[UserIdentity] + + +class UserResponse(BaseModel): + user: User + + +class Session(BaseModel): + provider_token: Optional[str] = None + """ + The oauth provider token. If present, this can be used to make external API + requests to the oauth provider used. + """ + provider_refresh_token: Optional[str] = None + """ + The oauth provider refresh token. If present, this can be used to refresh + the provider_token via the oauth provider's API. + + Not all oauth providers return a provider refresh token. If the + provider_refresh_token is missing, please refer to the oauth provider's + documentation for information on how to obtain the provider refresh token. + """ + access_token: str + refresh_token: str + expires_in: int + """ + The number of seconds until the token expires (since it was issued). + Returned when a login is confirmed. + """ + expires_at: Optional[int] = None + """ + A timestamp of when the token will expire. Returned when a login is confirmed. + """ + token_type: str + user: User + + @model_validator_v1_v2_compat + def validator(cls, values: dict) -> dict: + expires_in = values.get("expires_in") + if expires_in and not values.get("expires_at"): + values["expires_at"] = round(time()) + expires_in + return values + + +class UserIdentity(BaseModel): + id: str + identity_id: str + user_id: str + identity_data: Dict[str, Any] + provider: str + created_at: datetime + last_sign_in_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + +class Factor(BaseModel): + """ + A MFA factor. + """ + + id: str + """ + ID of the factor. + """ + friendly_name: Optional[str] = None + """ + Friendly name of the factor, useful to disambiguate between multiple factors. + """ + factor_type: Union[Literal["totp", "phone"], str] + """ + Type of factor. Only `totp` supported with this version but may change in + future versions. + """ + status: Literal["verified", "unverified"] + """ + Factor's status. + """ + created_at: datetime + updated_at: datetime + + +class User(BaseModel): + id: str + app_metadata: Dict[str, Any] + user_metadata: Dict[str, Any] + aud: str + confirmation_sent_at: Optional[datetime] = None + recovery_sent_at: Optional[datetime] = None + email_change_sent_at: Optional[datetime] = None + new_email: Optional[str] = None + new_phone: Optional[str] = None + invited_at: Optional[datetime] = None + action_link: Optional[str] = None + email: Optional[str] = None + phone: Optional[str] = None + created_at: datetime + confirmed_at: Optional[datetime] = None + email_confirmed_at: Optional[datetime] = None + phone_confirmed_at: Optional[datetime] = None + last_sign_in_at: Optional[datetime] = None + role: Optional[str] = None + updated_at: Optional[datetime] = None + identities: Optional[List[UserIdentity]] = None + is_anonymous: bool = False + factors: Optional[List[Factor]] = None + + +class UserAttributes(TypedDict): + email: NotRequired[str] + phone: NotRequired[str] + password: NotRequired[str] + data: NotRequired[Any] + + +class AdminUserAttributes(UserAttributes, TypedDict): + user_metadata: NotRequired[Any] + app_metadata: NotRequired[Any] + email_confirm: NotRequired[bool] + phone_confirm: NotRequired[bool] + ban_duration: NotRequired[Union[str, Literal["none"]]] + role: NotRequired[str] + """ + The `role` claim set in the user's access token JWT. + + When a user signs up, this role is set to `authenticated` by default. You should only modify the `role` if you need to provision several levels of admin access that have different permissions on individual columns in your database. + + Setting this role to `service_role` is not recommended as it grants the user admin privileges. + """ + password_hash: NotRequired[str] + """ + The `password_hash` for the user's password. + + Allows you to specify a password hash for the user. This is useful for migrating a user's password hash from another service. + + Supports bcrypt and argon2 password hashes. + """ + id: NotRequired[str] + """ + The `id` for the user. + + Allows you to overwrite the default `id` set for the user. + """ + + +class Subscription(BaseModel): + id: str + """ + The subscriber UUID. This will be set by the client. + """ + callback: Callable[[AuthChangeEvent, Optional[Session]], None] + """ + The function to call every time there is an event. + """ + unsubscribe: Callable[[], None] + """ + Call this to remove the listener. + """ + + +class UpdatableFactorAttributes(TypedDict): + friendly_name: str + + +class SignUpWithEmailAndPasswordCredentialsOptions( + TypedDict, +): + email_redirect_to: NotRequired[str] + data: NotRequired[Any] + captcha_token: NotRequired[str] + + +class SignUpWithEmailAndPasswordCredentials(TypedDict): + email: str + password: str + options: NotRequired[SignUpWithEmailAndPasswordCredentialsOptions] + + +class SignUpWithPhoneAndPasswordCredentialsOptions(TypedDict): + data: NotRequired[Any] + captcha_token: NotRequired[str] + channel: NotRequired[Literal["sms", "whatsapp"]] + + +class SignUpWithPhoneAndPasswordCredentials(TypedDict): + phone: str + password: str + options: NotRequired[SignUpWithPhoneAndPasswordCredentialsOptions] + + +SignUpWithPasswordCredentials = Union[ + SignUpWithEmailAndPasswordCredentials, + SignUpWithPhoneAndPasswordCredentials, +] + + +class SignInWithPasswordCredentialsOptions(TypedDict): + data: NotRequired[Any] + captcha_token: NotRequired[str] + + +class SignInWithEmailAndPasswordCredentials(TypedDict): + email: str + password: str + options: NotRequired[SignInWithPasswordCredentialsOptions] + + +class SignInWithPhoneAndPasswordCredentials(TypedDict): + phone: str + password: str + options: NotRequired[SignInWithPasswordCredentialsOptions] + + +SignInWithPasswordCredentials = Union[ + SignInWithEmailAndPasswordCredentials, + SignInWithPhoneAndPasswordCredentials, +] + + +class SignInWithIdTokenCredentials(TypedDict): + """ + Provider name or OIDC `iss` value identifying which provider should be used to verify the provided token. Supported names: `google`, `apple`, `azure`, `facebook`, `kakao`, `keycloak` (deprecated). + """ + + provider: Literal["google", "apple", "azure", "facebook", "kakao"] + token: str + access_token: NotRequired[str] + nonce: NotRequired[str] + options: NotRequired[SignInWithIdTokenCredentialsOptions] + + +class SignInWithIdTokenCredentialsOptions(TypedDict): + captcha_token: NotRequired[str] + + +class SignInWithEmailAndPasswordlessCredentialsOptions(TypedDict): + email_redirect_to: NotRequired[str] + should_create_user: NotRequired[bool] + data: NotRequired[Any] + captcha_token: NotRequired[str] + + +class SignInWithEmailAndPasswordlessCredentials(TypedDict): + email: str + options: NotRequired[SignInWithEmailAndPasswordlessCredentialsOptions] + + +class SignInWithPhoneAndPasswordlessCredentialsOptions(TypedDict): + should_create_user: NotRequired[bool] + data: NotRequired[Any] + captcha_token: NotRequired[str] + channel: NotRequired[Literal["sms", "whatsapp"]] + + +class SignInWithPhoneAndPasswordlessCredentials(TypedDict): + phone: str + options: NotRequired[SignInWithPhoneAndPasswordlessCredentialsOptions] + + +SignInWithPasswordlessCredentials = Union[ + SignInWithEmailAndPasswordlessCredentials, + SignInWithPhoneAndPasswordlessCredentials, +] + + +class ResendEmailCredentialsOptions(TypedDict): + email_redirect_to: NotRequired[str] + captcha_token: NotRequired[str] + + +class ResendEmailCredentials(TypedDict): + type: Literal["signup", "email_change"] + email: str + options: NotRequired[ResendEmailCredentialsOptions] + + +class ResendPhoneCredentialsOptions(TypedDict): + captcha_token: NotRequired[str] + + +class ResendPhoneCredentials(TypedDict): + type: Literal["sms", "phone_change"] + phone: str + options: NotRequired[ResendPhoneCredentialsOptions] + + +ResendCredentials = Union[ResendEmailCredentials, ResendPhoneCredentials] + + +class SignInWithOAuthCredentialsOptions(TypedDict): + redirect_to: NotRequired[str] + scopes: NotRequired[str] + query_params: NotRequired[Dict[str, str]] + + +class SignInWithOAuthCredentials(TypedDict): + provider: Provider + options: NotRequired[SignInWithOAuthCredentialsOptions] + + +class SignInWithSSOCredentials(TypedDict): + provider_id: NotRequired[str] + domain: NotRequired[str] + options: NotRequired[SignInWithSSOOptions] + + +class SignInWithSSOOptions(TypedDict): + redirect_to: NotRequired[str] + skip_http_redirect: NotRequired[bool] + + +class SignInAnonymouslyCredentials(TypedDict): + options: NotRequired[SignInAnonymouslyCredentialsOptions] + + +class SignInAnonymouslyCredentialsOptions(TypedDict): + data: NotRequired[Any] + captcha_token: NotRequired[str] + + +class VerifyOtpParamsOptions(TypedDict): + redirect_to: NotRequired[str] + captcha_token: NotRequired[str] + + +class VerifyEmailOtpParams(TypedDict): + email: str + token: str + type: EmailOtpType + options: NotRequired[VerifyOtpParamsOptions] + + +class VerifyMobileOtpParams(TypedDict): + phone: str + token: str + type: Literal[ + "sms", + "phone_change", + ] + options: NotRequired[VerifyOtpParamsOptions] + + +class VerifyTokenHashParams(TypedDict): + token_hash: str + type: EmailOtpType + options: NotRequired[VerifyOtpParamsOptions] + + +VerifyOtpParams = Union[ + VerifyEmailOtpParams, VerifyMobileOtpParams, VerifyTokenHashParams +] + + +class GenerateLinkParamsOptions(TypedDict): + redirect_to: NotRequired[str] + + +class GenerateLinkParamsWithDataOptions(GenerateLinkParamsOptions, TypedDict): + data: NotRequired[Any] + + +class GenerateSignupLinkParams(TypedDict): + type: Literal["signup"] + email: str + password: str + options: NotRequired[GenerateLinkParamsWithDataOptions] + + +class GenerateInviteOrMagiclinkParams(TypedDict): + type: Literal["invite", "magiclink"] + email: str + options: NotRequired[GenerateLinkParamsWithDataOptions] + + +class GenerateRecoveryLinkParams(TypedDict): + type: Literal["recovery"] + email: str + options: NotRequired[GenerateLinkParamsOptions] + + +class GenerateEmailChangeLinkParams(TypedDict): + type: Literal["email_change_current", "email_change_new"] + email: str + new_email: str + options: NotRequired[GenerateLinkParamsOptions] + + +GenerateLinkParams = Union[ + GenerateSignupLinkParams, + GenerateInviteOrMagiclinkParams, + GenerateRecoveryLinkParams, + GenerateEmailChangeLinkParams, +] + +GenerateLinkType = Literal[ + "signup", + "invite", + "magiclink", + "recovery", + "email_change_current", + "email_change_new", +] + + +class MFAEnrollParams(TypedDict): + factor_type: Literal["totp", "phone"] + issuer: NotRequired[str] + friendly_name: NotRequired[str] + phone: str + + +class MFAUnenrollParams(TypedDict): + factor_id: str + """ + ID of the factor being unenrolled. + """ + + +class CodeExchangeParams(TypedDict): + code_verifier: str + """ + Randomly generated string + """ + auth_code: str + """ + Code returned after completing one of the authorization flows + """ + redirect_to: str + """ + The URL to route to after a session is successfully obtained + """ + + +class MFAVerifyParams(TypedDict): + factor_id: str + """ + ID of the factor being verified. + """ + challenge_id: str + """ + ID of the challenge being verified. + """ + code: str + """ + Verification code provided by the user. + """ + + +class MFAChallengeParams(TypedDict): + factor_id: str + """ + ID of the factor to be challenged. + """ + channel: NotRequired[Literal["sms", "whatsapp"]] + + +class MFAChallengeAndVerifyParams(TypedDict): + factor_id: str + """ + ID of the factor being verified. + """ + code: str + """ + Verification code provided by the user. + """ + + +class AuthMFAVerifyResponse(BaseModel): + access_token: str + """ + New access token (JWT) after successful verification. + """ + token_type: str + """ + Type of token, typically `Bearer`. + """ + expires_in: int + """ + Number of seconds in which the access token will expire. + """ + refresh_token: str + """ + Refresh token you can use to obtain new access tokens when expired. + """ + user: User + """ + Updated user profile. + """ + + +class AuthMFAEnrollResponseTotp(BaseModel): + qr_code: str + """ + Contains a QR code encoding the authenticator URI. You can + convert it to a URL by prepending `data:image/svg+xml;utf-8,` to + the value. Avoid logging this value to the console. + """ + secret: str + """ + The TOTP secret (also encoded in the QR code). Show this secret + in a password-style field to the user, in case they are unable to + scan the QR code. Avoid logging this value to the console. + """ + uri: str + """ + The authenticator URI encoded within the QR code, should you need + to use it. Avoid loggin this value to the console. + """ + + +class AuthMFAEnrollResponse(BaseModel): + id: str + """ + ID of the factor that was just enrolled (in an unverified state). + """ + type: Literal["totp", "phone"] + """ + Type of MFA factor. Only `totp` supported for now. + """ + totp: AuthMFAEnrollResponseTotp + """ + TOTP enrollment information. + """ + model_config = ConfigDict(arbitrary_types_allowed=True) + friendly_name: str + """ + Friendly name of the factor, useful for distinguishing between factors + """ + phone: str + """ + Phone number of the MFA factor in E.164 format. Used to send messages + """ + + +class AuthMFAUnenrollResponse(BaseModel): + id: str + """ + ID of the factor that was successfully unenrolled. + """ + + +class AuthMFAChallengeResponse(BaseModel): + id: str + """ + ID of the newly created challenge. + """ + expires_at: int + """ + Timestamp in UNIX seconds when this challenge will no longer be usable. + """ + factor_type: Literal["totp", "phone"] + """ + Factor Type which generated the challenge + """ + + +class AuthMFAListFactorsResponse(BaseModel): + all: List[Factor] + """ + All available factors (verified and unverified). + """ + totp: List[Factor] + """ + Only verified TOTP factors. (A subset of `all`.) + """ + phone: List[Factor] + """ + Only verified Phone factors. (A subset of `all`.) + """ + + +AuthenticatorAssuranceLevels = Literal["aal1", "aal2"] + + +class AuthMFAGetAuthenticatorAssuranceLevelResponse(BaseModel): + current_level: Optional[AuthenticatorAssuranceLevels] = None + """ + Current AAL level of the session. + """ + next_level: Optional[AuthenticatorAssuranceLevels] = None + """ + Next possible AAL level for the session. If the next level is higher + than the current one, the user should go through MFA. + """ + current_authentication_methods: List[AMREntry] + """ + A list of all authentication methods attached to this session. Use + the information here to detect the last time a user verified a + factor, for example if implementing a step-up scenario. + """ + + +class AuthMFAAdminDeleteFactorResponse(BaseModel): + id: str + """ + ID of the factor that was successfully deleted. + """ + + +class AuthMFAAdminDeleteFactorParams(TypedDict): + id: str + """ + ID of the MFA factor to delete. + """ + user_id: str + """ + ID of the user whose factor is being deleted. + """ + + +class AuthMFAAdminListFactorsResponse(BaseModel): + factors: List[Factor] + """ + All factors attached to the user. + """ + + +class AuthMFAAdminListFactorsParams(TypedDict): + user_id: str + """ + ID of the user for which to list all MFA factors. + """ + + +class GenerateLinkProperties(BaseModel): + """ + The properties related to the email link generated. + """ + + action_link: str + """ + The email link to send to the user. The action_link follows the following format: + + auth/v1/verify?type={verification_type}&token={hashed_token}&redirect_to={redirect_to} + """ + email_otp: str + """ + The raw email OTP. + You should send this in the email if you want your users to verify using an + OTP instead of the action link. + """ + hashed_token: str + """ + The hashed token appended to the action link. + """ + redirect_to: str + """ + The URL appended to the action link. + """ + verification_type: GenerateLinkType + """ + The verification type that the email link is associated to. + """ + + +class GenerateLinkResponse(BaseModel): + properties: GenerateLinkProperties + user: User + + +class DecodedJWTDict(TypedDict): + exp: NotRequired[int] + aal: NotRequired[Optional[AuthenticatorAssuranceLevels]] + amr: NotRequired[Optional[List[AMREntry]]] + + +SignOutScope = Literal["global", "local", "others"] + + +class SignOutOptions(TypedDict): + scope: NotRequired[SignOutScope] + + +for model in [ + AMREntry, + AuthResponse, + OAuthResponse, + UserResponse, + Session, + UserIdentity, + Factor, + User, + Subscription, + AuthMFAVerifyResponse, + AuthMFAEnrollResponseTotp, + AuthMFAEnrollResponse, + AuthMFAUnenrollResponse, + AuthMFAChallengeResponse, + AuthMFAListFactorsResponse, + AuthMFAGetAuthenticatorAssuranceLevelResponse, + AuthMFAAdminDeleteFactorResponse, + AuthMFAAdminListFactorsResponse, + GenerateLinkProperties, +]: + try: + # pydantic > 2 + model.model_rebuild() + except AttributeError: + # pydantic < 2 + model.update_forward_refs() diff --git a/.venv/lib/python3.12/site-packages/gotrue/version.py b/.venv/lib/python3.12/site-packages/gotrue/version.py new file mode 100644 index 00000000..91b91220 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/gotrue/version.py @@ -0,0 +1 @@ +__version__ = "2.11.4" # {x-release-please-version} |