aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/gotrue
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/gotrue
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/gotrue')
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/__init__.py12
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_async/__init__.py1
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_api.py186
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_mfa_api.py32
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_base_api.py125
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_client.py1152
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_mfa_api.py94
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_async/storage.py31
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_sync/__init__.py1
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_api.py186
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_mfa_api.py32
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_base_api.py125
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_client.py1146
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_mfa_api.py94
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/_sync/storage.py31
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/constants.py23
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/errors.py227
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/helpers.py269
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/http_clients.py9
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/timer.py45
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/types.py818
-rw-r--r--.venv/lib/python3.12/site-packages/gotrue/version.py1
22 files changed, 4640 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/gotrue/__init__.py b/.venv/lib/python3.12/site-packages/gotrue/__init__.py
new file mode 100644
index 00000000..2a13526f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/__init__.py
@@ -0,0 +1,12 @@
+from __future__ import annotations
+
+from ._async.gotrue_admin_api import AsyncGoTrueAdminAPI # type: ignore # noqa: F401
+from ._async.gotrue_client import AsyncGoTrueClient # type: ignore # noqa: F401
+from ._async.storage import AsyncMemoryStorage # type: ignore # noqa: F401
+from ._async.storage import AsyncSupportedStorage # type: ignore # noqa: F401
+from ._sync.gotrue_admin_api import SyncGoTrueAdminAPI # type: ignore # noqa: F401
+from ._sync.gotrue_client import SyncGoTrueClient # type: ignore # noqa: F401
+from ._sync.storage import SyncMemoryStorage # type: ignore # noqa: F401
+from ._sync.storage import SyncSupportedStorage # type: ignore # noqa: F401
+from .types import * # type: ignore # noqa: F401, F403
+from .version import __version__
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/__init__.py b/.venv/lib/python3.12/site-packages/gotrue/_async/__init__.py
new file mode 100644
index 00000000..9d48db4f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_async/__init__.py
@@ -0,0 +1 @@
+from __future__ import annotations
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_api.py
new file mode 100644
index 00000000..54d48739
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_api.py
@@ -0,0 +1,186 @@
+from __future__ import annotations
+
+from functools import partial
+from typing import Dict, List, Optional
+
+from ..helpers import model_validate, parse_link_response, parse_user_response
+from ..http_clients import AsyncClient
+from ..types import (
+ AdminUserAttributes,
+ AuthMFAAdminDeleteFactorParams,
+ AuthMFAAdminDeleteFactorResponse,
+ AuthMFAAdminListFactorsParams,
+ AuthMFAAdminListFactorsResponse,
+ GenerateLinkParams,
+ GenerateLinkResponse,
+ InviteUserByEmailOptions,
+ SignOutScope,
+ User,
+ UserResponse,
+)
+from .gotrue_admin_mfa_api import AsyncGoTrueAdminMFAAPI
+from .gotrue_base_api import AsyncGoTrueBaseAPI
+
+
+class AsyncGoTrueAdminAPI(AsyncGoTrueBaseAPI):
+ def __init__(
+ self,
+ *,
+ url: str = "",
+ headers: Dict[str, str] = {},
+ http_client: Optional[AsyncClient] = None,
+ verify: bool = True,
+ proxy: Optional[str] = None,
+ ) -> None:
+ AsyncGoTrueBaseAPI.__init__(
+ self,
+ url=url,
+ headers=headers,
+ http_client=http_client,
+ verify=verify,
+ proxy=proxy,
+ )
+ self.mfa = AsyncGoTrueAdminMFAAPI()
+ self.mfa.list_factors = self._list_factors
+ self.mfa.delete_factor = self._delete_factor
+
+ async def sign_out(self, jwt: str, scope: SignOutScope = "global") -> None:
+ """
+ Removes a logged-in session.
+ """
+ return await self._request(
+ "POST",
+ "logout",
+ query={"scope": scope},
+ jwt=jwt,
+ no_resolve_json=True,
+ )
+
+ async def invite_user_by_email(
+ self,
+ email: str,
+ options: InviteUserByEmailOptions = {},
+ ) -> UserResponse:
+ """
+ Sends an invite link to an email address.
+ """
+ return await self._request(
+ "POST",
+ "invite",
+ body={"email": email, "data": options.get("data")},
+ redirect_to=options.get("redirect_to"),
+ xform=parse_user_response,
+ )
+
+ async def generate_link(self, params: GenerateLinkParams) -> GenerateLinkResponse:
+ """
+ Generates email links and OTPs to be sent via a custom email provider.
+ """
+ return await self._request(
+ "POST",
+ "admin/generate_link",
+ body={
+ "type": params.get("type"),
+ "email": params.get("email"),
+ "password": params.get("password"),
+ "new_email": params.get("new_email"),
+ "data": params.get("options", {}).get("data"),
+ },
+ redirect_to=params.get("options", {}).get("redirect_to"),
+ xform=parse_link_response,
+ )
+
+ # User Admin API
+
+ async def create_user(self, attributes: AdminUserAttributes) -> UserResponse:
+ """
+ Creates a new user.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return await self._request(
+ "POST",
+ "admin/users",
+ body=attributes,
+ xform=parse_user_response,
+ )
+
+ async def list_users(self, page: int = None, per_page: int = None) -> List[User]:
+ """
+ Get a list of users.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return await self._request(
+ "GET",
+ "admin/users",
+ query={"page": page, "per_page": per_page},
+ xform=lambda data: (
+ [model_validate(User, user) for user in data["users"]]
+ if "users" in data
+ else []
+ ),
+ )
+
+ async def get_user_by_id(self, uid: str) -> UserResponse:
+ """
+ Get user by id.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return await self._request(
+ "GET",
+ f"admin/users/{uid}",
+ xform=parse_user_response,
+ )
+
+ async def update_user_by_id(
+ self,
+ uid: str,
+ attributes: AdminUserAttributes,
+ ) -> UserResponse:
+ """
+ Updates the user data.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return await self._request(
+ "PUT",
+ f"admin/users/{uid}",
+ body=attributes,
+ xform=parse_user_response,
+ )
+
+ async def delete_user(self, id: str, should_soft_delete: bool = False) -> None:
+ """
+ Delete a user. Requires a `service_role` key.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ body = {"should_soft_delete": should_soft_delete}
+ return await self._request("DELETE", f"admin/users/{id}", body=body)
+
+ async def _list_factors(
+ self,
+ params: AuthMFAAdminListFactorsParams,
+ ) -> AuthMFAAdminListFactorsResponse:
+ return await self._request(
+ "GET",
+ f"admin/users/{params.get('user_id')}/factors",
+ xform=partial(model_validate, AuthMFAAdminListFactorsResponse),
+ )
+
+ async def _delete_factor(
+ self,
+ params: AuthMFAAdminDeleteFactorParams,
+ ) -> AuthMFAAdminDeleteFactorResponse:
+ return await self._request(
+ "DELETE",
+ f"admin/users/{params.get('user_id')}/factors/{params.get('factor_id')}",
+ xform=partial(model_validate, AuthMFAAdminDeleteFactorResponse),
+ )
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_mfa_api.py
new file mode 100644
index 00000000..ca812fcd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_admin_mfa_api.py
@@ -0,0 +1,32 @@
+from ..types import (
+ AuthMFAAdminDeleteFactorParams,
+ AuthMFAAdminDeleteFactorResponse,
+ AuthMFAAdminListFactorsParams,
+ AuthMFAAdminListFactorsResponse,
+)
+
+
+class AsyncGoTrueAdminMFAAPI:
+ """
+ Contains the full multi-factor authentication administration API.
+ """
+
+ async def list_factors(
+ self,
+ params: AuthMFAAdminListFactorsParams,
+ ) -> AuthMFAAdminListFactorsResponse:
+ """
+ Lists all factors attached to a user.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ async def delete_factor(
+ self,
+ params: AuthMFAAdminDeleteFactorParams,
+ ) -> AuthMFAAdminDeleteFactorResponse:
+ """
+ Deletes a factor on a user. This will log the user out of all active
+ sessions (if the deleted factor was verified). There's no need to delete
+ unverified factors.
+ """
+ raise NotImplementedError() # pragma: no cover
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_base_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_base_api.py
new file mode 100644
index 00000000..21d0b444
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_base_api.py
@@ -0,0 +1,125 @@
+from __future__ import annotations
+
+from typing import Any, Callable, Dict, Optional, TypeVar, overload
+
+from httpx import Response
+from pydantic import BaseModel
+from typing_extensions import Literal, Self
+
+from ..constants import API_VERSION_HEADER_NAME, API_VERSIONS
+from ..helpers import handle_exception, model_dump
+from ..http_clients import AsyncClient
+
+T = TypeVar("T")
+
+
+class AsyncGoTrueBaseAPI:
+ def __init__(
+ self,
+ *,
+ url: str,
+ headers: Dict[str, str],
+ http_client: Optional[AsyncClient],
+ verify: bool = True,
+ proxy: Optional[str] = None,
+ ):
+ self._url = url
+ self._headers = headers
+ self._http_client = http_client or AsyncClient(
+ verify=bool(verify),
+ proxy=proxy,
+ follow_redirects=True,
+ http2=True,
+ )
+
+ async def __aenter__(self) -> Self:
+ return self
+
+ async def __aexit__(self, exc_t, exc_v, exc_tb) -> None:
+ await self.close()
+
+ async def close(self) -> None:
+ await self._http_client.aclose()
+
+ @overload
+ async def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: Literal[False] = False,
+ xform: Callable[[Any], T],
+ ) -> T: ... # pragma: no cover
+
+ @overload
+ async def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: Literal[True],
+ xform: Callable[[Response], T],
+ ) -> T: ... # pragma: no cover
+
+ @overload
+ async def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: bool = False,
+ ) -> None: ... # pragma: no cover
+
+ async def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: bool = False,
+ xform: Optional[Callable[[Any], T]] = None,
+ ) -> Optional[T]:
+ url = f"{self._url}/{path}"
+ headers = {**self._headers, **(headers or {})}
+ if API_VERSION_HEADER_NAME not in headers:
+ headers[API_VERSION_HEADER_NAME] = API_VERSIONS["2024-01-01"].get("name")
+ if "Content-Type" not in headers:
+ headers["Content-Type"] = "application/json;charset=UTF-8"
+ if jwt:
+ headers["Authorization"] = f"Bearer {jwt}"
+ query = query or {}
+ if redirect_to:
+ query["redirect_to"] = redirect_to
+ try:
+ response = await self._http_client.request(
+ method,
+ url,
+ headers=headers,
+ params=query,
+ json=model_dump(body) if isinstance(body, BaseModel) else body,
+ )
+ response.raise_for_status()
+ result = response if no_resolve_json else response.json()
+ if xform:
+ return xform(result)
+ except Exception as e:
+ raise handle_exception(e)
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_client.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_client.py
new file mode 100644
index 00000000..2a2a5564
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_client.py
@@ -0,0 +1,1152 @@
+from __future__ import annotations
+
+from contextlib import suppress
+from functools import partial
+from json import loads
+from time import time
+from typing import Callable, Dict, List, Optional, Tuple
+from urllib.parse import parse_qs, urlencode, urlparse
+from uuid import uuid4
+
+from ..constants import (
+ DEFAULT_HEADERS,
+ EXPIRY_MARGIN,
+ GOTRUE_URL,
+ MAX_RETRIES,
+ RETRY_INTERVAL,
+ STORAGE_KEY,
+)
+from ..errors import (
+ AuthApiError,
+ AuthImplicitGrantRedirectError,
+ AuthInvalidCredentialsError,
+ AuthRetryableError,
+ AuthSessionMissingError,
+)
+from ..helpers import (
+ decode_jwt_payload,
+ generate_pkce_challenge,
+ generate_pkce_verifier,
+ model_dump,
+ model_dump_json,
+ model_validate,
+ parse_auth_otp_response,
+ parse_auth_response,
+ parse_link_identity_response,
+ parse_sso_response,
+ parse_user_response,
+)
+from ..http_clients import AsyncClient
+from ..timer import Timer
+from ..types import (
+ AuthChangeEvent,
+ AuthenticatorAssuranceLevels,
+ AuthFlowType,
+ AuthMFAChallengeResponse,
+ AuthMFAEnrollResponse,
+ AuthMFAGetAuthenticatorAssuranceLevelResponse,
+ AuthMFAListFactorsResponse,
+ AuthMFAUnenrollResponse,
+ AuthMFAVerifyResponse,
+ AuthOtpResponse,
+ AuthResponse,
+ CodeExchangeParams,
+ DecodedJWTDict,
+ IdentitiesResponse,
+ MFAChallengeAndVerifyParams,
+ MFAChallengeParams,
+ MFAEnrollParams,
+ MFAUnenrollParams,
+ MFAVerifyParams,
+ OAuthResponse,
+ Options,
+ Provider,
+ ResendCredentials,
+ Session,
+ SignInAnonymouslyCredentials,
+ SignInWithIdTokenCredentials,
+ SignInWithOAuthCredentials,
+ SignInWithPasswordCredentials,
+ SignInWithPasswordlessCredentials,
+ SignInWithSSOCredentials,
+ SignOutOptions,
+ SignUpWithPasswordCredentials,
+ Subscription,
+ UserAttributes,
+ UserIdentity,
+ UserResponse,
+ VerifyOtpParams,
+)
+from .gotrue_admin_api import AsyncGoTrueAdminAPI
+from .gotrue_base_api import AsyncGoTrueBaseAPI
+from .gotrue_mfa_api import AsyncGoTrueMFAAPI
+from .storage import AsyncMemoryStorage, AsyncSupportedStorage
+
+
+class AsyncGoTrueClient(AsyncGoTrueBaseAPI):
+ def __init__(
+ self,
+ *,
+ url: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ storage_key: Optional[str] = None,
+ auto_refresh_token: bool = True,
+ persist_session: bool = True,
+ storage: Optional[AsyncSupportedStorage] = None,
+ http_client: Optional[AsyncClient] = None,
+ flow_type: AuthFlowType = "implicit",
+ verify: bool = True,
+ proxy: Optional[str] = None,
+ ) -> None:
+ AsyncGoTrueBaseAPI.__init__(
+ self,
+ url=url or GOTRUE_URL,
+ headers=headers or DEFAULT_HEADERS,
+ http_client=http_client,
+ verify=verify,
+ proxy=proxy,
+ )
+ self._storage_key = storage_key or STORAGE_KEY
+ self._auto_refresh_token = auto_refresh_token
+ self._persist_session = persist_session
+ self._storage = storage or AsyncMemoryStorage()
+ self._in_memory_session: Optional[Session] = None
+ self._refresh_token_timer: Optional[Timer] = None
+ self._network_retries = 0
+ self._state_change_emitters: Dict[str, Subscription] = {}
+ self._flow_type = flow_type
+
+ self.admin = AsyncGoTrueAdminAPI(
+ url=self._url,
+ headers=self._headers,
+ http_client=self._http_client,
+ )
+ self.mfa = AsyncGoTrueMFAAPI()
+ self.mfa.challenge = self._challenge
+ self.mfa.challenge_and_verify = self._challenge_and_verify
+ self.mfa.enroll = self._enroll
+ self.mfa.get_authenticator_assurance_level = (
+ self._get_authenticator_assurance_level
+ )
+ self.mfa.list_factors = self._list_factors
+ self.mfa.unenroll = self._unenroll
+ self.mfa.verify = self._verify
+
+ # Initializations
+
+ async def initialize(self, *, url: Optional[str] = None) -> None:
+ if url and self._is_implicit_grant_flow(url):
+ await self.initialize_from_url(url)
+ else:
+ await self.initialize_from_storage()
+
+ async def initialize_from_storage(self) -> None:
+ return await self._recover_and_refresh()
+
+ async def initialize_from_url(self, url: str) -> None:
+ try:
+ if self._is_implicit_grant_flow(url):
+ session, redirect_type = await self._get_session_from_url(url)
+ await self._save_session(session)
+ self._notify_all_subscribers("SIGNED_IN", session)
+ if redirect_type == "recovery":
+ self._notify_all_subscribers("PASSWORD_RECOVERY", session)
+ except Exception as e:
+ await self._remove_session()
+ raise e
+
+ # Public methods
+
+ async def sign_in_anonymously(
+ self, credentials: Optional[SignInAnonymouslyCredentials] = None
+ ) -> AuthResponse:
+ """
+ Creates a new anonymous user.
+ """
+ await self._remove_session()
+ if credentials is None:
+ credentials = {"options": {}}
+ options = credentials.get("options", {})
+ data = options.get("data") or {}
+ captcha_token = options.get("captcha_token")
+ response = await self._request(
+ "POST",
+ "signup",
+ body={
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ xform=parse_auth_response,
+ )
+ if response.session:
+ await self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ async def sign_up(
+ self,
+ credentials: SignUpWithPasswordCredentials,
+ ) -> AuthResponse:
+ """
+ Creates a new user.
+ """
+ await self._remove_session()
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ password = credentials.get("password")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to") or options.get("email_redirect_to")
+ data = options.get("data") or {}
+ channel = options.get("channel", "sms")
+ captcha_token = options.get("captcha_token")
+ if email:
+ response = await self._request(
+ "POST",
+ "signup",
+ body={
+ "email": email,
+ "password": password,
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ redirect_to=redirect_to,
+ xform=parse_auth_response,
+ )
+ elif phone:
+ response = await self._request(
+ "POST",
+ "signup",
+ body={
+ "phone": phone,
+ "password": password,
+ "data": data,
+ "channel": channel,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ xform=parse_auth_response,
+ )
+ else:
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number and a password"
+ )
+ if response.session:
+ await self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ async def sign_in_with_password(
+ self,
+ credentials: SignInWithPasswordCredentials,
+ ) -> AuthResponse:
+ """
+ Log in an existing user with an email or phone and password.
+ """
+ await self._remove_session()
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ password = credentials.get("password")
+ options = credentials.get("options", {})
+ data = options.get("data") or {}
+ captcha_token = options.get("captcha_token")
+ if email:
+ response = await self._request(
+ "POST",
+ "token",
+ body={
+ "email": email,
+ "password": password,
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ query={
+ "grant_type": "password",
+ },
+ xform=parse_auth_response,
+ )
+ elif phone:
+ response = await self._request(
+ "POST",
+ "token",
+ body={
+ "phone": phone,
+ "password": password,
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ query={
+ "grant_type": "password",
+ },
+ xform=parse_auth_response,
+ )
+ else:
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number and a password"
+ )
+ if response.session:
+ await self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ async def sign_in_with_id_token(
+ self,
+ credentials: SignInWithIdTokenCredentials,
+ ) -> AuthResponse:
+ """
+ Allows signing in with an OIDC ID token. The authentication provider used should be enabled and configured.
+ """
+ await self._remove_session()
+ provider = credentials.get("provider")
+ token = credentials.get("token")
+ access_token = credentials.get("access_token")
+ nonce = credentials.get("nonce")
+ options = credentials.get("options", {})
+ captcha_token = options.get("captcha_token")
+
+ response = await self._request(
+ "POST",
+ "token",
+ body={
+ "provider": provider,
+ "id_token": token,
+ "access_token": access_token,
+ "nonce": nonce,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ query={
+ "grant_type": "id_token",
+ },
+ xform=parse_auth_response,
+ )
+
+ if response.session:
+ await self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ async def sign_in_with_sso(self, credentials: SignInWithSSOCredentials):
+ """
+ Attempts a single-sign on using an enterprise Identity Provider. A
+ successful SSO attempt will redirect the current page to the identity
+ provider authorization page. The redirect URL is implementation and SSO
+ protocol specific.
+
+ You can use it by providing a SSO domain. Typically you can extract this
+ domain by asking users for their email address. If this domain is
+ registered on the Auth instance the redirect will use that organization's
+ currently active SSO Identity Provider for the login.
+ If you have built an organization-specific login page, you can use the
+ organization's SSO Identity Provider UUID directly instead.
+ """
+ await self._remove_session()
+ provider_id = credentials.get("provider_id")
+ domain = credentials.get("domain")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to")
+ captcha_token = options.get("captcha_token")
+ # HTTPX currently does not follow redirects: https://www.python-httpx.org/compatibility/
+ # Additionally, unlike the JS client, Python is a server side language and it's not possible
+ # to automatically redirect in browser for hte user
+ skip_http_redirect = options.get("skip_http_redirect", True)
+
+ if domain:
+ return await self._request(
+ "POST",
+ "sso",
+ body={
+ "domain": domain,
+ "skip_http_redirect": skip_http_redirect,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ "redirect_to": redirect_to,
+ },
+ xform=parse_sso_response,
+ )
+ if provider_id:
+ return await self._request(
+ "POST",
+ "sso",
+ body={
+ "provider_id": provider_id,
+ "skip_http_redirect": skip_http_redirect,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ "redirect_to": redirect_to,
+ },
+ xform=parse_sso_response,
+ )
+ raise AuthInvalidCredentialsError(
+ "You must provide either a domain or provider_id"
+ )
+
+ async def sign_in_with_oauth(
+ self,
+ credentials: SignInWithOAuthCredentials,
+ ) -> OAuthResponse:
+ """
+ Log in an existing user via a third-party provider.
+ """
+ await self._remove_session()
+
+ provider = credentials.get("provider")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to")
+ scopes = options.get("scopes")
+ params = options.get("query_params", {})
+ if redirect_to:
+ params["redirect_to"] = redirect_to
+ if scopes:
+ params["scopes"] = scopes
+ url_with_qs, _ = await self._get_url_for_provider(
+ f"{self._url}/authorize", provider, params
+ )
+ return OAuthResponse(provider=provider, url=url_with_qs)
+
+ async def link_identity(
+ self, credentials: SignInWithOAuthCredentials
+ ) -> OAuthResponse:
+ provider = credentials.get("provider")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to")
+ scopes = options.get("scopes")
+ params = options.get("query_params", {})
+ if redirect_to:
+ params["redirect_to"] = redirect_to
+ if scopes:
+ params["scopes"] = scopes
+ params["skip_http_redirect"] = "true"
+ url = "user/identities/authorize"
+ _, query = await self._get_url_for_provider(url, provider, params)
+
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ response = await self._request(
+ method="GET",
+ path=url,
+ query=query,
+ jwt=session.access_token,
+ xform=parse_link_identity_response,
+ )
+ return OAuthResponse(provider=provider, url=response.url)
+
+ async def get_user_identities(self):
+ response = await self.get_user()
+ return (
+ IdentitiesResponse(identities=response.user.identities)
+ if response.user
+ else AuthSessionMissingError()
+ )
+
+ async def unlink_identity(self, identity: UserIdentity):
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ return await self._request(
+ "DELETE",
+ f"user/identities/{identity.identity_id}",
+ jwt=session.access_token,
+ )
+
+ async def sign_in_with_otp(
+ self,
+ credentials: SignInWithPasswordlessCredentials,
+ ) -> AuthOtpResponse:
+ """
+ Log in a user using magiclink or a one-time password (OTP).
+
+ If the `{{ .ConfirmationURL }}` variable is specified in
+ the email template, a magiclink will be sent.
+
+ If the `{{ .Token }}` variable is specified in the email
+ template, an OTP will be sent.
+
+ If you're using phone sign-ins, only an OTP will be sent.
+ You won't be able to send a magiclink for phone sign-ins.
+ """
+ await self._remove_session()
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ options = credentials.get("options", {})
+ email_redirect_to = options.get("email_redirect_to")
+ should_create_user = options.get("should_create_user", True)
+ data = options.get("data")
+ channel = options.get("channel", "sms")
+ captcha_token = options.get("captcha_token")
+ if email:
+ return await self._request(
+ "POST",
+ "otp",
+ body={
+ "email": email,
+ "data": data,
+ "create_user": should_create_user,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ redirect_to=email_redirect_to,
+ xform=parse_auth_otp_response,
+ )
+ if phone:
+ return await self._request(
+ "POST",
+ "otp",
+ body={
+ "phone": phone,
+ "data": data,
+ "create_user": should_create_user,
+ "channel": channel,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ xform=parse_auth_otp_response,
+ )
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number"
+ )
+
+ async def resend(
+ self,
+ credentials: ResendCredentials,
+ ) -> AuthOtpResponse:
+ """
+ Resends an existing signup confirmation email, email change email, SMS OTP or phone change OTP.
+ """
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ type = credentials.get("type")
+ options = credentials.get("options", {})
+ email_redirect_to = options.get("email_redirect_to")
+ captcha_token = options.get("captcha_token")
+ body = {
+ "type": type,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ }
+
+ if email is None and phone is None:
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number"
+ )
+
+ body.update({"email": email} if email else {"phone": phone})
+
+ return await self._request(
+ "POST",
+ "resend",
+ body=body,
+ redirect_to=email_redirect_to if email else None,
+ xform=parse_auth_otp_response,
+ )
+
+ async def verify_otp(self, params: VerifyOtpParams) -> AuthResponse:
+ """
+ Log in a user given a User supplied OTP received via mobile.
+ """
+ await self._remove_session()
+ response = await self._request(
+ "POST",
+ "verify",
+ body={
+ "gotrue_meta_security": {
+ "captcha_token": params.get("options", {}).get("captcha_token"),
+ },
+ **params,
+ },
+ redirect_to=params.get("options", {}).get("redirect_to"),
+ xform=parse_auth_response,
+ )
+ if response.session:
+ await self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ async def reauthenticate(self) -> AuthResponse:
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ return await self._request(
+ "GET",
+ "reauthenticate",
+ jwt=session.access_token,
+ xform=parse_auth_response,
+ )
+
+ async def get_session(self) -> Optional[Session]:
+ """
+ Returns the session, refreshing it if necessary.
+
+ The session returned can be null if the session is not detected which
+ can happen in the event a user is not signed-in or has logged out.
+ """
+ current_session: Optional[Session] = None
+ if self._persist_session:
+ maybe_session = await self._storage.get_item(self._storage_key)
+ current_session = self._get_valid_session(maybe_session)
+ if not current_session:
+ await self._remove_session()
+ else:
+ current_session = self._in_memory_session
+ if not current_session:
+ return None
+ time_now = round(time())
+ has_expired = (
+ current_session.expires_at <= time_now + EXPIRY_MARGIN
+ if current_session.expires_at
+ else False
+ )
+ return (
+ await self._call_refresh_token(current_session.refresh_token)
+ if has_expired
+ else current_session
+ )
+
+ async def get_user(self, jwt: Optional[str] = None) -> Optional[UserResponse]:
+ """
+ Gets the current user details if there is an existing session.
+
+ Takes in an optional access token `jwt`. If no `jwt` is provided,
+ `get_user()` will attempt to get the `jwt` from the current session.
+ """
+ if not jwt:
+ session = await self.get_session()
+ if session:
+ jwt = session.access_token
+ else:
+ return None
+ return await self._request("GET", "user", jwt=jwt, xform=parse_user_response)
+
+ async def update_user(self, attributes: UserAttributes) -> UserResponse:
+ """
+ Updates user data, if there is a logged in user.
+ """
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ response = await self._request(
+ "PUT",
+ "user",
+ body=attributes,
+ jwt=session.access_token,
+ xform=parse_user_response,
+ )
+ session.user = response.user
+ await self._save_session(session)
+ self._notify_all_subscribers("USER_UPDATED", session)
+ return response
+
+ async def set_session(self, access_token: str, refresh_token: str) -> AuthResponse:
+ """
+ Sets the session data from the current session. If the current session
+ is expired, `set_session` will take care of refreshing it to obtain a
+ new session.
+
+ If the refresh token in the current session is invalid and the current
+ session has expired, an error will be thrown.
+
+ If the current session does not contain at `expires_at` field,
+ `set_session` will use the exp claim defined in the access token.
+
+ The current session that minimally contains an access token,
+ refresh token and a user.
+ """
+ time_now = round(time())
+ expires_at = time_now
+ has_expired = True
+ session: Optional[Session] = None
+ if access_token and access_token.split(".")[1]:
+ payload = self._decode_jwt(access_token)
+ exp = payload.get("exp")
+ if exp:
+ expires_at = int(exp)
+ has_expired = expires_at <= time_now
+ if has_expired:
+ if not refresh_token:
+ raise AuthSessionMissingError()
+ response = await self._refresh_access_token(refresh_token)
+ if not response.session:
+ return AuthResponse()
+ session = response.session
+ else:
+ response = await self.get_user(access_token)
+ session = Session(
+ access_token=access_token,
+ refresh_token=refresh_token,
+ user=response.user,
+ token_type="bearer",
+ expires_in=expires_at - time_now,
+ expires_at=expires_at,
+ )
+ await self._save_session(session)
+ self._notify_all_subscribers("TOKEN_REFRESHED", session)
+ return AuthResponse(session=session, user=response.user)
+
+ async def refresh_session(
+ self, refresh_token: Optional[str] = None
+ ) -> AuthResponse:
+ """
+ Returns a new session, regardless of expiry status.
+
+ Takes in an optional current session. If not passed in, then refreshSession()
+ will attempt to retrieve it from getSession(). If the current session's
+ refresh token is invalid, an error will be thrown.
+ """
+ if not refresh_token:
+ session = await self.get_session()
+ if session:
+ refresh_token = session.refresh_token
+ if not refresh_token:
+ raise AuthSessionMissingError()
+ session = await self._call_refresh_token(refresh_token)
+ return AuthResponse(session=session, user=session.user)
+
+ async def sign_out(self, options: SignOutOptions = {"scope": "global"}) -> None:
+ """
+ `sign_out` will remove the logged in user from the
+ current session and log them out - removing all items from storage and then trigger a `"SIGNED_OUT"` event.
+
+ For advanced use cases, you can revoke all refresh tokens for a user by passing a user's JWT through to `admin.sign_out`.
+
+ There is no way to revoke a user's access token jwt until it expires.
+ It is recommended to set a shorter expiry on the jwt for this reason.
+ """
+ with suppress(AuthApiError):
+ session = await self.get_session()
+ access_token = session.access_token if session else None
+ if access_token:
+ await self.admin.sign_out(access_token, options["scope"])
+
+ if options["scope"] != "others":
+ await self._remove_session()
+ self._notify_all_subscribers("SIGNED_OUT", None)
+
+ def on_auth_state_change(
+ self,
+ callback: Callable[[AuthChangeEvent, Optional[Session]], None],
+ ) -> Subscription:
+ """
+ Receive a notification every time an auth event happens.
+ """
+ unique_id = str(uuid4())
+
+ def _unsubscribe() -> None:
+ self._state_change_emitters.pop(unique_id)
+
+ subscription = Subscription(
+ id=unique_id,
+ callback=callback,
+ unsubscribe=_unsubscribe,
+ )
+ self._state_change_emitters[unique_id] = subscription
+ return subscription
+
+ async def reset_password_for_email(self, email: str, options: Options = {}) -> None:
+ """
+ Sends a password reset request to an email address.
+ """
+ await self._request(
+ "POST",
+ "recover",
+ body={
+ "email": email,
+ "gotrue_meta_security": {
+ "captcha_token": options.get("captcha_token"),
+ },
+ },
+ redirect_to=options.get("redirect_to"),
+ )
+
+ async def reset_password_email(
+ self,
+ email: str,
+ options: Options = {},
+ ) -> None:
+ """
+ Sends a password reset request to an email address.
+ """
+ await self.reset_password_for_email(email, options)
+
+ # MFA methods
+
+ async def _enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse:
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ body = {
+ "friendly_name": params["friendly_name"],
+ "factor_type": params["factor_type"],
+ }
+
+ if params["factor_type"] == "phone":
+ body["phone"] = params["phone"]
+ else:
+ body["issuer"] = params["issuer"]
+
+ response = await self._request(
+ "POST",
+ "factors",
+ body=body,
+ jwt=session.access_token,
+ xform=partial(model_validate, AuthMFAEnrollResponse),
+ )
+ if params["factor_type"] == "totp" and response.totp.qr_code:
+ response.totp.qr_code = f"data:image/svg+xml;utf-8,{response.totp.qr_code}"
+ return response
+
+ async def _challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse:
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ return await self._request(
+ "POST",
+ f"factors/{params.get('factor_id')}/challenge",
+ body={"channel": params.get("channel")},
+ jwt=session.access_token,
+ xform=partial(model_validate, AuthMFAChallengeResponse),
+ )
+
+ async def _challenge_and_verify(
+ self,
+ params: MFAChallengeAndVerifyParams,
+ ) -> AuthMFAVerifyResponse:
+ response = await self._challenge(
+ {
+ "factor_id": params.get("factor_id"),
+ }
+ )
+ return await self._verify(
+ {
+ "factor_id": params.get("factor_id"),
+ "challenge_id": response.id,
+ "code": params.get("code"),
+ }
+ )
+
+ async def _verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse:
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ response = await self._request(
+ "POST",
+ f"factors/{params.get('factor_id')}/verify",
+ body=params,
+ jwt=session.access_token,
+ xform=partial(model_validate, AuthMFAVerifyResponse),
+ )
+ session = model_validate(Session, model_dump(response))
+ await self._save_session(session)
+ self._notify_all_subscribers("MFA_CHALLENGE_VERIFIED", session)
+ return response
+
+ async def _unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse:
+ session = await self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ return await self._request(
+ "DELETE",
+ f"factors/{params.get('factor_id')}",
+ jwt=session.access_token,
+ xform=partial(AuthMFAUnenrollResponse, model_validate),
+ )
+
+ async def _list_factors(self) -> AuthMFAListFactorsResponse:
+ response = await self.get_user()
+ all = response.user.factors or []
+ totp = [f for f in all if f.factor_type == "totp" and f.status == "verified"]
+ phone = [f for f in all if f.factor_type == "phone" and f.status == "verified"]
+ return AuthMFAListFactorsResponse(all=all, totp=totp, phone=phone)
+
+ async def _get_authenticator_assurance_level(
+ self,
+ ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse:
+ session = await self.get_session()
+ if not session:
+ return AuthMFAGetAuthenticatorAssuranceLevelResponse(
+ current_level=None,
+ next_level=None,
+ current_authentication_methods=[],
+ )
+ payload = self._decode_jwt(session.access_token)
+ current_level: Optional[AuthenticatorAssuranceLevels] = None
+ if payload.get("aal"):
+ current_level = payload.get("aal")
+ verified_factors = [
+ f for f in session.user.factors or [] if f.status == "verified"
+ ]
+ next_level = "aal2" if verified_factors else current_level
+ current_authentication_methods = payload.get("amr") or []
+ return AuthMFAGetAuthenticatorAssuranceLevelResponse(
+ current_level=current_level,
+ next_level=next_level,
+ current_authentication_methods=current_authentication_methods,
+ )
+
+ # Private methods
+
+ async def _remove_session(self) -> None:
+ if self._persist_session:
+ await self._storage.remove_item(self._storage_key)
+ else:
+ self._in_memory_session = None
+ if self._refresh_token_timer:
+ self._refresh_token_timer.cancel()
+ self._refresh_token_timer = None
+
+ async def _get_session_from_url(
+ self,
+ url: str,
+ ) -> Tuple[Session, Optional[str]]:
+ if not self._is_implicit_grant_flow(url):
+ raise AuthImplicitGrantRedirectError("Not a valid implicit grant flow url.")
+ result = urlparse(url)
+ params = parse_qs(result.query)
+ error_description = self._get_param(params, "error_description")
+ if error_description:
+ error_code = self._get_param(params, "error_code")
+ error = self._get_param(params, "error")
+ if not error_code:
+ raise AuthImplicitGrantRedirectError("No error_code detected.")
+ if not error:
+ raise AuthImplicitGrantRedirectError("No error detected.")
+ raise AuthImplicitGrantRedirectError(
+ error_description,
+ {"code": error_code, "error": error},
+ )
+ provider_token = self._get_param(params, "provider_token")
+ provider_refresh_token = self._get_param(params, "provider_refresh_token")
+ access_token = self._get_param(params, "access_token")
+ if not access_token:
+ raise AuthImplicitGrantRedirectError("No access_token detected.")
+ expires_in = self._get_param(params, "expires_in")
+ if not expires_in:
+ raise AuthImplicitGrantRedirectError("No expires_in detected.")
+ refresh_token = self._get_param(params, "refresh_token")
+ if not refresh_token:
+ raise AuthImplicitGrantRedirectError("No refresh_token detected.")
+ token_type = self._get_param(params, "token_type")
+ if not token_type:
+ raise AuthImplicitGrantRedirectError("No token_type detected.")
+ time_now = round(time())
+ expires_at = time_now + int(expires_in)
+ user = await self.get_user(access_token)
+ session = Session(
+ provider_token=provider_token,
+ provider_refresh_token=provider_refresh_token,
+ access_token=access_token,
+ expires_in=int(expires_in),
+ expires_at=expires_at,
+ refresh_token=refresh_token,
+ token_type=token_type,
+ user=user.user,
+ )
+ redirect_type = self._get_param(params, "type")
+ return session, redirect_type
+
+ async def _recover_and_refresh(self) -> None:
+ raw_session = await self._storage.get_item(self._storage_key)
+ current_session = self._get_valid_session(raw_session)
+ if not current_session:
+ if raw_session:
+ await self._remove_session()
+ return
+ time_now = round(time())
+ expires_at = current_session.expires_at
+ if expires_at and expires_at < time_now + EXPIRY_MARGIN:
+ refresh_token = current_session.refresh_token
+ if self._auto_refresh_token and refresh_token:
+ self._network_retries += 1
+ try:
+ await self._call_refresh_token(refresh_token)
+ self._network_retries = 0
+ except Exception as e:
+ if (
+ isinstance(e, AuthRetryableError)
+ and self._network_retries < MAX_RETRIES
+ ):
+ if self._refresh_token_timer:
+ self._refresh_token_timer.cancel()
+ self._refresh_token_timer = Timer(
+ (RETRY_INTERVAL ** (self._network_retries * 100)),
+ self._recover_and_refresh,
+ )
+ self._refresh_token_timer.start()
+ return
+ await self._remove_session()
+ return
+ if self._persist_session:
+ await self._save_session(current_session)
+ self._notify_all_subscribers("SIGNED_IN", current_session)
+
+ async def _call_refresh_token(self, refresh_token: str) -> Session:
+ if not refresh_token:
+ raise AuthSessionMissingError()
+ response = await self._refresh_access_token(refresh_token)
+ if not response.session:
+ raise AuthSessionMissingError()
+ await self._save_session(response.session)
+ self._notify_all_subscribers("TOKEN_REFRESHED", response.session)
+ return response.session
+
+ async def _refresh_access_token(self, refresh_token: str) -> AuthResponse:
+ return await self._request(
+ "POST",
+ "token",
+ query={"grant_type": "refresh_token"},
+ body={"refresh_token": refresh_token},
+ xform=parse_auth_response,
+ )
+
+ async def _save_session(self, session: Session) -> None:
+ if not self._persist_session:
+ self._in_memory_session = session
+ expire_at = session.expires_at
+ if expire_at:
+ time_now = round(time())
+ expire_in = expire_at - time_now
+ refresh_duration_before_expires = (
+ EXPIRY_MARGIN if expire_in > EXPIRY_MARGIN else 0.5
+ )
+ value = (expire_in - refresh_duration_before_expires) * 1000
+ await self._start_auto_refresh_token(value)
+ if self._persist_session and session.expires_at:
+ await self._storage.set_item(self._storage_key, model_dump_json(session))
+
+ async def _start_auto_refresh_token(self, value: float) -> None:
+ if self._refresh_token_timer:
+ self._refresh_token_timer.cancel()
+ self._refresh_token_timer = None
+ if value <= 0 or not self._auto_refresh_token:
+ return
+
+ async def refresh_token_function():
+ self._network_retries += 1
+ try:
+ session = await self.get_session()
+ if session:
+ await self._call_refresh_token(session.refresh_token)
+ self._network_retries = 0
+ except Exception as e:
+ if (
+ isinstance(e, AuthRetryableError)
+ and self._network_retries < MAX_RETRIES
+ ):
+ await self._start_auto_refresh_token(
+ RETRY_INTERVAL ** (self._network_retries * 100)
+ )
+
+ self._refresh_token_timer = Timer(value, refresh_token_function)
+ self._refresh_token_timer.start()
+
+ def _notify_all_subscribers(
+ self,
+ event: AuthChangeEvent,
+ session: Optional[Session],
+ ) -> None:
+ for subscription in self._state_change_emitters.values():
+ subscription.callback(event, session)
+
+ def _get_valid_session(
+ self,
+ raw_session: Optional[str],
+ ) -> Optional[Session]:
+ if not raw_session:
+ return None
+ data = loads(raw_session)
+ if not data:
+ return None
+ if not data.get("access_token"):
+ return None
+ if not data.get("refresh_token"):
+ return None
+ if not data.get("expires_at"):
+ return None
+ try:
+ expires_at = int(data["expires_at"])
+ data["expires_at"] = expires_at
+ except ValueError:
+ return None
+ try:
+ return model_validate(Session, data)
+ except Exception:
+ return None
+
+ def _get_param(
+ self,
+ query_params: Dict[str, List[str]],
+ name: str,
+ ) -> Optional[str]:
+ return query_params[name][0] if name in query_params else None
+
+ def _is_implicit_grant_flow(self, url: str) -> bool:
+ result = urlparse(url)
+ params = parse_qs(result.query)
+ return "access_token" in params or "error_description" in params
+
+ async def _get_url_for_provider(
+ self,
+ url: str,
+ provider: Provider,
+ params: Dict[str, str],
+ ) -> Tuple[str, Dict[str, str]]:
+ if self._flow_type == "pkce":
+ code_verifier = generate_pkce_verifier()
+ code_challenge = generate_pkce_challenge(code_verifier)
+ await self._storage.set_item(
+ f"{self._storage_key}-code-verifier", code_verifier
+ )
+ code_challenge_method = (
+ "plain" if code_verifier == code_challenge else "s256"
+ )
+ params["code_challenge"] = code_challenge
+ params["code_challenge_method"] = code_challenge_method
+
+ params["provider"] = provider
+ query = urlencode(params)
+ return f"{url}?{query}", params
+
+ def _decode_jwt(self, jwt: str) -> DecodedJWTDict:
+ """
+ Decodes a JWT (without performing any validation).
+ """
+ return decode_jwt_payload(jwt)
+
+ async def exchange_code_for_session(self, params: CodeExchangeParams):
+ code_verifier = params.get("code_verifier") or await self._storage.get_item(
+ f"{self._storage_key}-code-verifier"
+ )
+ response = await self._request(
+ "POST",
+ "token",
+ query={"grant_type": "pkce"},
+ body={
+ "auth_code": params.get("auth_code"),
+ "code_verifier": code_verifier,
+ },
+ redirect_to=params.get("redirect_to"),
+ xform=parse_auth_response,
+ )
+ await self._storage.remove_item(f"{self._storage_key}-code-verifier")
+ if response.session:
+ await self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_mfa_api.py
new file mode 100644
index 00000000..a30c4c73
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_async/gotrue_mfa_api.py
@@ -0,0 +1,94 @@
+from ..types import (
+ AuthMFAChallengeResponse,
+ AuthMFAEnrollResponse,
+ AuthMFAGetAuthenticatorAssuranceLevelResponse,
+ AuthMFAListFactorsResponse,
+ AuthMFAUnenrollResponse,
+ AuthMFAVerifyResponse,
+ MFAChallengeAndVerifyParams,
+ MFAChallengeParams,
+ MFAEnrollParams,
+ MFAUnenrollParams,
+ MFAVerifyParams,
+)
+
+
+class AsyncGoTrueMFAAPI:
+ """
+ Contains the full multi-factor authentication API.
+ """
+
+ async def enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse:
+ """
+ Starts the enrollment process for a new Multi-Factor Authentication
+ factor. This method creates a new factor in the 'unverified' state.
+ Present the QR code or secret to the user and ask them to add it to their
+ authenticator app. Ask the user to provide you with an authenticator code
+ from their app and verify it by calling challenge and then verify.
+
+ The first successful verification of an unverified factor activates the
+ factor. All other sessions are logged out and the current one gets an
+ `aal2` authenticator level.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ async def challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse:
+ """
+ Prepares a challenge used to verify that a user has access to a MFA
+ factor. Provide the challenge ID and verification code by calling `verify`.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ async def challenge_and_verify(
+ self,
+ params: MFAChallengeAndVerifyParams,
+ ) -> AuthMFAVerifyResponse:
+ """
+ Helper method which creates a challenge and immediately uses the given code
+ to verify against it thereafter. The verification code is provided by the
+ user by entering a code seen in their authenticator app.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ async def verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse:
+ """
+ Verifies a verification code against a challenge. The verification code is
+ provided by the user by entering a code seen in their authenticator app.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ async def unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse:
+ """
+ Unenroll removes a MFA factor. Unverified factors can safely be ignored
+ and it's not necessary to unenroll them. Unenrolling a verified MFA factor
+ cannot be done from a session with an `aal1` authenticator level.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ async def list_factors(self) -> AuthMFAListFactorsResponse:
+ """
+ Returns the list of MFA factors enabled for this user. For most use cases
+ you should consider using `get_authenticator_assurance_level`.
+
+ This uses a cached version of the factors and avoids incurring a network call.
+ If you need to update this list, call `get_user` first.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ async def get_authenticator_assurance_level(
+ self,
+ ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse:
+ """
+ Returns the Authenticator Assurance Level (AAL) for the active session.
+
+ - `aal1` (or `null`) means that the user's identity has been verified only
+ with a conventional login (email+password, OTP, magic link, social login,
+ etc.).
+ - `aal2` means that the user's identity has been verified both with a
+ conventional login and at least one MFA factor.
+
+ Although this method returns a promise, it's fairly quick (microseconds)
+ and rarely uses the network. You can use this to check whether the current
+ user needs to be shown a screen to verify their MFA factors.
+ """
+ raise NotImplementedError() # pragma: no cover
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_async/storage.py b/.venv/lib/python3.12/site-packages/gotrue/_async/storage.py
new file mode 100644
index 00000000..5239dd9d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_async/storage.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from typing import Dict, Optional
+
+
+class AsyncSupportedStorage(ABC):
+ @abstractmethod
+ async def get_item(self, key: str) -> Optional[str]: ... # pragma: no cover
+
+ @abstractmethod
+ async def set_item(self, key: str, value: str) -> None: ... # pragma: no cover
+
+ @abstractmethod
+ async def remove_item(self, key: str) -> None: ... # pragma: no cover
+
+
+class AsyncMemoryStorage(AsyncSupportedStorage):
+ def __init__(self):
+ self.storage: Dict[str, str] = {}
+
+ async def get_item(self, key: str) -> Optional[str]:
+ if key in self.storage:
+ return self.storage[key]
+
+ async def set_item(self, key: str, value: str) -> None:
+ self.storage[key] = value
+
+ async def remove_item(self, key: str) -> None:
+ if key in self.storage:
+ del self.storage[key]
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/__init__.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/__init__.py
new file mode 100644
index 00000000..9d48db4f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/__init__.py
@@ -0,0 +1 @@
+from __future__ import annotations
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_api.py
new file mode 100644
index 00000000..3997c53d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_api.py
@@ -0,0 +1,186 @@
+from __future__ import annotations
+
+from functools import partial
+from typing import Dict, List, Optional
+
+from ..helpers import model_validate, parse_link_response, parse_user_response
+from ..http_clients import SyncClient
+from ..types import (
+ AdminUserAttributes,
+ AuthMFAAdminDeleteFactorParams,
+ AuthMFAAdminDeleteFactorResponse,
+ AuthMFAAdminListFactorsParams,
+ AuthMFAAdminListFactorsResponse,
+ GenerateLinkParams,
+ GenerateLinkResponse,
+ InviteUserByEmailOptions,
+ SignOutScope,
+ User,
+ UserResponse,
+)
+from .gotrue_admin_mfa_api import SyncGoTrueAdminMFAAPI
+from .gotrue_base_api import SyncGoTrueBaseAPI
+
+
+class SyncGoTrueAdminAPI(SyncGoTrueBaseAPI):
+ def __init__(
+ self,
+ *,
+ url: str = "",
+ headers: Dict[str, str] = {},
+ http_client: Optional[SyncClient] = None,
+ verify: bool = True,
+ proxy: Optional[str] = None,
+ ) -> None:
+ SyncGoTrueBaseAPI.__init__(
+ self,
+ url=url,
+ headers=headers,
+ http_client=http_client,
+ verify=verify,
+ proxy=proxy,
+ )
+ self.mfa = SyncGoTrueAdminMFAAPI()
+ self.mfa.list_factors = self._list_factors
+ self.mfa.delete_factor = self._delete_factor
+
+ def sign_out(self, jwt: str, scope: SignOutScope = "global") -> None:
+ """
+ Removes a logged-in session.
+ """
+ return self._request(
+ "POST",
+ "logout",
+ query={"scope": scope},
+ jwt=jwt,
+ no_resolve_json=True,
+ )
+
+ def invite_user_by_email(
+ self,
+ email: str,
+ options: InviteUserByEmailOptions = {},
+ ) -> UserResponse:
+ """
+ Sends an invite link to an email address.
+ """
+ return self._request(
+ "POST",
+ "invite",
+ body={"email": email, "data": options.get("data")},
+ redirect_to=options.get("redirect_to"),
+ xform=parse_user_response,
+ )
+
+ def generate_link(self, params: GenerateLinkParams) -> GenerateLinkResponse:
+ """
+ Generates email links and OTPs to be sent via a custom email provider.
+ """
+ return self._request(
+ "POST",
+ "admin/generate_link",
+ body={
+ "type": params.get("type"),
+ "email": params.get("email"),
+ "password": params.get("password"),
+ "new_email": params.get("new_email"),
+ "data": params.get("options", {}).get("data"),
+ },
+ redirect_to=params.get("options", {}).get("redirect_to"),
+ xform=parse_link_response,
+ )
+
+ # User Admin API
+
+ def create_user(self, attributes: AdminUserAttributes) -> UserResponse:
+ """
+ Creates a new user.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return self._request(
+ "POST",
+ "admin/users",
+ body=attributes,
+ xform=parse_user_response,
+ )
+
+ def list_users(self, page: int = None, per_page: int = None) -> List[User]:
+ """
+ Get a list of users.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return self._request(
+ "GET",
+ "admin/users",
+ query={"page": page, "per_page": per_page},
+ xform=lambda data: (
+ [model_validate(User, user) for user in data["users"]]
+ if "users" in data
+ else []
+ ),
+ )
+
+ def get_user_by_id(self, uid: str) -> UserResponse:
+ """
+ Get user by id.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return self._request(
+ "GET",
+ f"admin/users/{uid}",
+ xform=parse_user_response,
+ )
+
+ def update_user_by_id(
+ self,
+ uid: str,
+ attributes: AdminUserAttributes,
+ ) -> UserResponse:
+ """
+ Updates the user data.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ return self._request(
+ "PUT",
+ f"admin/users/{uid}",
+ body=attributes,
+ xform=parse_user_response,
+ )
+
+ def delete_user(self, id: str, should_soft_delete: bool = False) -> None:
+ """
+ Delete a user. Requires a `service_role` key.
+
+ This function should only be called on a server.
+ Never expose your `service_role` key in the browser.
+ """
+ body = {"should_soft_delete": should_soft_delete}
+ return self._request("DELETE", f"admin/users/{id}", body=body)
+
+ def _list_factors(
+ self,
+ params: AuthMFAAdminListFactorsParams,
+ ) -> AuthMFAAdminListFactorsResponse:
+ return self._request(
+ "GET",
+ f"admin/users/{params.get('user_id')}/factors",
+ xform=partial(model_validate, AuthMFAAdminListFactorsResponse),
+ )
+
+ def _delete_factor(
+ self,
+ params: AuthMFAAdminDeleteFactorParams,
+ ) -> AuthMFAAdminDeleteFactorResponse:
+ return self._request(
+ "DELETE",
+ f"admin/users/{params.get('user_id')}/factors/{params.get('factor_id')}",
+ xform=partial(model_validate, AuthMFAAdminDeleteFactorResponse),
+ )
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_mfa_api.py
new file mode 100644
index 00000000..c3fcfc8e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_admin_mfa_api.py
@@ -0,0 +1,32 @@
+from ..types import (
+ AuthMFAAdminDeleteFactorParams,
+ AuthMFAAdminDeleteFactorResponse,
+ AuthMFAAdminListFactorsParams,
+ AuthMFAAdminListFactorsResponse,
+)
+
+
+class SyncGoTrueAdminMFAAPI:
+ """
+ Contains the full multi-factor authentication administration API.
+ """
+
+ def list_factors(
+ self,
+ params: AuthMFAAdminListFactorsParams,
+ ) -> AuthMFAAdminListFactorsResponse:
+ """
+ Lists all factors attached to a user.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ def delete_factor(
+ self,
+ params: AuthMFAAdminDeleteFactorParams,
+ ) -> AuthMFAAdminDeleteFactorResponse:
+ """
+ Deletes a factor on a user. This will log the user out of all active
+ sessions (if the deleted factor was verified). There's no need to delete
+ unverified factors.
+ """
+ raise NotImplementedError() # pragma: no cover
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_base_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_base_api.py
new file mode 100644
index 00000000..c6c2b7b0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_base_api.py
@@ -0,0 +1,125 @@
+from __future__ import annotations
+
+from typing import Any, Callable, Dict, Optional, TypeVar, overload
+
+from httpx import Response
+from pydantic import BaseModel
+from typing_extensions import Literal, Self
+
+from ..constants import API_VERSION_HEADER_NAME, API_VERSIONS
+from ..helpers import handle_exception, model_dump
+from ..http_clients import SyncClient
+
+T = TypeVar("T")
+
+
+class SyncGoTrueBaseAPI:
+ def __init__(
+ self,
+ *,
+ url: str,
+ headers: Dict[str, str],
+ http_client: Optional[SyncClient],
+ verify: bool = True,
+ proxy: Optional[str] = None,
+ ):
+ self._url = url
+ self._headers = headers
+ self._http_client = http_client or SyncClient(
+ verify=bool(verify),
+ proxy=proxy,
+ follow_redirects=True,
+ http2=True,
+ )
+
+ def __enter__(self) -> Self:
+ return self
+
+ def __exit__(self, exc_t, exc_v, exc_tb) -> None:
+ self.close()
+
+ def close(self) -> None:
+ self._http_client.aclose()
+
+ @overload
+ def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: Literal[False] = False,
+ xform: Callable[[Any], T],
+ ) -> T: ... # pragma: no cover
+
+ @overload
+ def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: Literal[True],
+ xform: Callable[[Response], T],
+ ) -> T: ... # pragma: no cover
+
+ @overload
+ def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: bool = False,
+ ) -> None: ... # pragma: no cover
+
+ def _request(
+ self,
+ method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
+ path: str,
+ *,
+ jwt: Optional[str] = None,
+ redirect_to: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ query: Optional[Dict[str, str]] = None,
+ body: Optional[Any] = None,
+ no_resolve_json: bool = False,
+ xform: Optional[Callable[[Any], T]] = None,
+ ) -> Optional[T]:
+ url = f"{self._url}/{path}"
+ headers = {**self._headers, **(headers or {})}
+ if API_VERSION_HEADER_NAME not in headers:
+ headers[API_VERSION_HEADER_NAME] = API_VERSIONS["2024-01-01"].get("name")
+ if "Content-Type" not in headers:
+ headers["Content-Type"] = "application/json;charset=UTF-8"
+ if jwt:
+ headers["Authorization"] = f"Bearer {jwt}"
+ query = query or {}
+ if redirect_to:
+ query["redirect_to"] = redirect_to
+ try:
+ response = self._http_client.request(
+ method,
+ url,
+ headers=headers,
+ params=query,
+ json=model_dump(body) if isinstance(body, BaseModel) else body,
+ )
+ response.raise_for_status()
+ result = response if no_resolve_json else response.json()
+ if xform:
+ return xform(result)
+ except Exception as e:
+ raise handle_exception(e)
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_client.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_client.py
new file mode 100644
index 00000000..e4e821ef
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_client.py
@@ -0,0 +1,1146 @@
+from __future__ import annotations
+
+from contextlib import suppress
+from functools import partial
+from json import loads
+from time import time
+from typing import Callable, Dict, List, Optional, Tuple
+from urllib.parse import parse_qs, urlencode, urlparse
+from uuid import uuid4
+
+from ..constants import (
+ DEFAULT_HEADERS,
+ EXPIRY_MARGIN,
+ GOTRUE_URL,
+ MAX_RETRIES,
+ RETRY_INTERVAL,
+ STORAGE_KEY,
+)
+from ..errors import (
+ AuthApiError,
+ AuthImplicitGrantRedirectError,
+ AuthInvalidCredentialsError,
+ AuthRetryableError,
+ AuthSessionMissingError,
+)
+from ..helpers import (
+ decode_jwt_payload,
+ generate_pkce_challenge,
+ generate_pkce_verifier,
+ model_dump,
+ model_dump_json,
+ model_validate,
+ parse_auth_otp_response,
+ parse_auth_response,
+ parse_link_identity_response,
+ parse_sso_response,
+ parse_user_response,
+)
+from ..http_clients import SyncClient
+from ..timer import Timer
+from ..types import (
+ AuthChangeEvent,
+ AuthenticatorAssuranceLevels,
+ AuthFlowType,
+ AuthMFAChallengeResponse,
+ AuthMFAEnrollResponse,
+ AuthMFAGetAuthenticatorAssuranceLevelResponse,
+ AuthMFAListFactorsResponse,
+ AuthMFAUnenrollResponse,
+ AuthMFAVerifyResponse,
+ AuthOtpResponse,
+ AuthResponse,
+ CodeExchangeParams,
+ DecodedJWTDict,
+ IdentitiesResponse,
+ MFAChallengeAndVerifyParams,
+ MFAChallengeParams,
+ MFAEnrollParams,
+ MFAUnenrollParams,
+ MFAVerifyParams,
+ OAuthResponse,
+ Options,
+ Provider,
+ ResendCredentials,
+ Session,
+ SignInAnonymouslyCredentials,
+ SignInWithIdTokenCredentials,
+ SignInWithOAuthCredentials,
+ SignInWithPasswordCredentials,
+ SignInWithPasswordlessCredentials,
+ SignInWithSSOCredentials,
+ SignOutOptions,
+ SignUpWithPasswordCredentials,
+ Subscription,
+ UserAttributes,
+ UserIdentity,
+ UserResponse,
+ VerifyOtpParams,
+)
+from .gotrue_admin_api import SyncGoTrueAdminAPI
+from .gotrue_base_api import SyncGoTrueBaseAPI
+from .gotrue_mfa_api import SyncGoTrueMFAAPI
+from .storage import SyncMemoryStorage, SyncSupportedStorage
+
+
+class SyncGoTrueClient(SyncGoTrueBaseAPI):
+ def __init__(
+ self,
+ *,
+ url: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = None,
+ storage_key: Optional[str] = None,
+ auto_refresh_token: bool = True,
+ persist_session: bool = True,
+ storage: Optional[SyncSupportedStorage] = None,
+ http_client: Optional[SyncClient] = None,
+ flow_type: AuthFlowType = "implicit",
+ verify: bool = True,
+ proxy: Optional[str] = None,
+ ) -> None:
+ SyncGoTrueBaseAPI.__init__(
+ self,
+ url=url or GOTRUE_URL,
+ headers=headers or DEFAULT_HEADERS,
+ http_client=http_client,
+ verify=verify,
+ proxy=proxy,
+ )
+ self._storage_key = storage_key or STORAGE_KEY
+ self._auto_refresh_token = auto_refresh_token
+ self._persist_session = persist_session
+ self._storage = storage or SyncMemoryStorage()
+ self._in_memory_session: Optional[Session] = None
+ self._refresh_token_timer: Optional[Timer] = None
+ self._network_retries = 0
+ self._state_change_emitters: Dict[str, Subscription] = {}
+ self._flow_type = flow_type
+
+ self.admin = SyncGoTrueAdminAPI(
+ url=self._url,
+ headers=self._headers,
+ http_client=self._http_client,
+ )
+ self.mfa = SyncGoTrueMFAAPI()
+ self.mfa.challenge = self._challenge
+ self.mfa.challenge_and_verify = self._challenge_and_verify
+ self.mfa.enroll = self._enroll
+ self.mfa.get_authenticator_assurance_level = (
+ self._get_authenticator_assurance_level
+ )
+ self.mfa.list_factors = self._list_factors
+ self.mfa.unenroll = self._unenroll
+ self.mfa.verify = self._verify
+
+ # Initializations
+
+ def initialize(self, *, url: Optional[str] = None) -> None:
+ if url and self._is_implicit_grant_flow(url):
+ self.initialize_from_url(url)
+ else:
+ self.initialize_from_storage()
+
+ def initialize_from_storage(self) -> None:
+ return self._recover_and_refresh()
+
+ def initialize_from_url(self, url: str) -> None:
+ try:
+ if self._is_implicit_grant_flow(url):
+ session, redirect_type = self._get_session_from_url(url)
+ self._save_session(session)
+ self._notify_all_subscribers("SIGNED_IN", session)
+ if redirect_type == "recovery":
+ self._notify_all_subscribers("PASSWORD_RECOVERY", session)
+ except Exception as e:
+ self._remove_session()
+ raise e
+
+ # Public methods
+
+ def sign_in_anonymously(
+ self, credentials: Optional[SignInAnonymouslyCredentials] = None
+ ) -> AuthResponse:
+ """
+ Creates a new anonymous user.
+ """
+ self._remove_session()
+ if credentials is None:
+ credentials = {"options": {}}
+ options = credentials.get("options", {})
+ data = options.get("data") or {}
+ captcha_token = options.get("captcha_token")
+ response = self._request(
+ "POST",
+ "signup",
+ body={
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ xform=parse_auth_response,
+ )
+ if response.session:
+ self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ def sign_up(
+ self,
+ credentials: SignUpWithPasswordCredentials,
+ ) -> AuthResponse:
+ """
+ Creates a new user.
+ """
+ self._remove_session()
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ password = credentials.get("password")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to") or options.get("email_redirect_to")
+ data = options.get("data") or {}
+ channel = options.get("channel", "sms")
+ captcha_token = options.get("captcha_token")
+ if email:
+ response = self._request(
+ "POST",
+ "signup",
+ body={
+ "email": email,
+ "password": password,
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ redirect_to=redirect_to,
+ xform=parse_auth_response,
+ )
+ elif phone:
+ response = self._request(
+ "POST",
+ "signup",
+ body={
+ "phone": phone,
+ "password": password,
+ "data": data,
+ "channel": channel,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ xform=parse_auth_response,
+ )
+ else:
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number and a password"
+ )
+ if response.session:
+ self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ def sign_in_with_password(
+ self,
+ credentials: SignInWithPasswordCredentials,
+ ) -> AuthResponse:
+ """
+ Log in an existing user with an email or phone and password.
+ """
+ self._remove_session()
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ password = credentials.get("password")
+ options = credentials.get("options", {})
+ data = options.get("data") or {}
+ captcha_token = options.get("captcha_token")
+ if email:
+ response = self._request(
+ "POST",
+ "token",
+ body={
+ "email": email,
+ "password": password,
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ query={
+ "grant_type": "password",
+ },
+ xform=parse_auth_response,
+ )
+ elif phone:
+ response = self._request(
+ "POST",
+ "token",
+ body={
+ "phone": phone,
+ "password": password,
+ "data": data,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ query={
+ "grant_type": "password",
+ },
+ xform=parse_auth_response,
+ )
+ else:
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number and a password"
+ )
+ if response.session:
+ self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ def sign_in_with_id_token(
+ self,
+ credentials: SignInWithIdTokenCredentials,
+ ) -> AuthResponse:
+ """
+ Allows signing in with an OIDC ID token. The authentication provider used should be enabled and configured.
+ """
+ self._remove_session()
+ provider = credentials.get("provider")
+ token = credentials.get("token")
+ access_token = credentials.get("access_token")
+ nonce = credentials.get("nonce")
+ options = credentials.get("options", {})
+ captcha_token = options.get("captcha_token")
+
+ response = self._request(
+ "POST",
+ "token",
+ body={
+ "provider": provider,
+ "id_token": token,
+ "access_token": access_token,
+ "nonce": nonce,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ query={
+ "grant_type": "id_token",
+ },
+ xform=parse_auth_response,
+ )
+
+ if response.session:
+ self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ def sign_in_with_sso(self, credentials: SignInWithSSOCredentials):
+ """
+ Attempts a single-sign on using an enterprise Identity Provider. A
+ successful SSO attempt will redirect the current page to the identity
+ provider authorization page. The redirect URL is implementation and SSO
+ protocol specific.
+
+ You can use it by providing a SSO domain. Typically you can extract this
+ domain by asking users for their email address. If this domain is
+ registered on the Auth instance the redirect will use that organization's
+ currently active SSO Identity Provider for the login.
+ If you have built an organization-specific login page, you can use the
+ organization's SSO Identity Provider UUID directly instead.
+ """
+ self._remove_session()
+ provider_id = credentials.get("provider_id")
+ domain = credentials.get("domain")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to")
+ captcha_token = options.get("captcha_token")
+ # HTTPX currently does not follow redirects: https://www.python-httpx.org/compatibility/
+ # Additionally, unlike the JS client, Python is a server side language and it's not possible
+ # to automatically redirect in browser for hte user
+ skip_http_redirect = options.get("skip_http_redirect", True)
+
+ if domain:
+ return self._request(
+ "POST",
+ "sso",
+ body={
+ "domain": domain,
+ "skip_http_redirect": skip_http_redirect,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ "redirect_to": redirect_to,
+ },
+ xform=parse_sso_response,
+ )
+ if provider_id:
+ return self._request(
+ "POST",
+ "sso",
+ body={
+ "provider_id": provider_id,
+ "skip_http_redirect": skip_http_redirect,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ "redirect_to": redirect_to,
+ },
+ xform=parse_sso_response,
+ )
+ raise AuthInvalidCredentialsError(
+ "You must provide either a domain or provider_id"
+ )
+
+ def sign_in_with_oauth(
+ self,
+ credentials: SignInWithOAuthCredentials,
+ ) -> OAuthResponse:
+ """
+ Log in an existing user via a third-party provider.
+ """
+ self._remove_session()
+
+ provider = credentials.get("provider")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to")
+ scopes = options.get("scopes")
+ params = options.get("query_params", {})
+ if redirect_to:
+ params["redirect_to"] = redirect_to
+ if scopes:
+ params["scopes"] = scopes
+ url_with_qs, _ = self._get_url_for_provider(
+ f"{self._url}/authorize", provider, params
+ )
+ return OAuthResponse(provider=provider, url=url_with_qs)
+
+ def link_identity(self, credentials: SignInWithOAuthCredentials) -> OAuthResponse:
+ provider = credentials.get("provider")
+ options = credentials.get("options", {})
+ redirect_to = options.get("redirect_to")
+ scopes = options.get("scopes")
+ params = options.get("query_params", {})
+ if redirect_to:
+ params["redirect_to"] = redirect_to
+ if scopes:
+ params["scopes"] = scopes
+ params["skip_http_redirect"] = "true"
+ url = "user/identities/authorize"
+ _, query = self._get_url_for_provider(url, provider, params)
+
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ response = self._request(
+ method="GET",
+ path=url,
+ query=query,
+ jwt=session.access_token,
+ xform=parse_link_identity_response,
+ )
+ return OAuthResponse(provider=provider, url=response.url)
+
+ def get_user_identities(self):
+ response = self.get_user()
+ return (
+ IdentitiesResponse(identities=response.user.identities)
+ if response.user
+ else AuthSessionMissingError()
+ )
+
+ def unlink_identity(self, identity: UserIdentity):
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ return self._request(
+ "DELETE",
+ f"user/identities/{identity.identity_id}",
+ jwt=session.access_token,
+ )
+
+ def sign_in_with_otp(
+ self,
+ credentials: SignInWithPasswordlessCredentials,
+ ) -> AuthOtpResponse:
+ """
+ Log in a user using magiclink or a one-time password (OTP).
+
+ If the `{{ .ConfirmationURL }}` variable is specified in
+ the email template, a magiclink will be sent.
+
+ If the `{{ .Token }}` variable is specified in the email
+ template, an OTP will be sent.
+
+ If you're using phone sign-ins, only an OTP will be sent.
+ You won't be able to send a magiclink for phone sign-ins.
+ """
+ self._remove_session()
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ options = credentials.get("options", {})
+ email_redirect_to = options.get("email_redirect_to")
+ should_create_user = options.get("should_create_user", True)
+ data = options.get("data")
+ channel = options.get("channel", "sms")
+ captcha_token = options.get("captcha_token")
+ if email:
+ return self._request(
+ "POST",
+ "otp",
+ body={
+ "email": email,
+ "data": data,
+ "create_user": should_create_user,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ redirect_to=email_redirect_to,
+ xform=parse_auth_otp_response,
+ )
+ if phone:
+ return self._request(
+ "POST",
+ "otp",
+ body={
+ "phone": phone,
+ "data": data,
+ "create_user": should_create_user,
+ "channel": channel,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ },
+ xform=parse_auth_otp_response,
+ )
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number"
+ )
+
+ def resend(
+ self,
+ credentials: ResendCredentials,
+ ) -> AuthOtpResponse:
+ """
+ Resends an existing signup confirmation email, email change email, SMS OTP or phone change OTP.
+ """
+ email = credentials.get("email")
+ phone = credentials.get("phone")
+ type = credentials.get("type")
+ options = credentials.get("options", {})
+ email_redirect_to = options.get("email_redirect_to")
+ captcha_token = options.get("captcha_token")
+ body = {
+ "type": type,
+ "gotrue_meta_security": {
+ "captcha_token": captcha_token,
+ },
+ }
+
+ if email is None and phone is None:
+ raise AuthInvalidCredentialsError(
+ "You must provide either an email or phone number"
+ )
+
+ body.update({"email": email} if email else {"phone": phone})
+
+ return self._request(
+ "POST",
+ "resend",
+ body=body,
+ redirect_to=email_redirect_to if email else None,
+ xform=parse_auth_otp_response,
+ )
+
+ def verify_otp(self, params: VerifyOtpParams) -> AuthResponse:
+ """
+ Log in a user given a User supplied OTP received via mobile.
+ """
+ self._remove_session()
+ response = self._request(
+ "POST",
+ "verify",
+ body={
+ "gotrue_meta_security": {
+ "captcha_token": params.get("options", {}).get("captcha_token"),
+ },
+ **params,
+ },
+ redirect_to=params.get("options", {}).get("redirect_to"),
+ xform=parse_auth_response,
+ )
+ if response.session:
+ self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
+
+ def reauthenticate(self) -> AuthResponse:
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ return self._request(
+ "GET",
+ "reauthenticate",
+ jwt=session.access_token,
+ xform=parse_auth_response,
+ )
+
+ def get_session(self) -> Optional[Session]:
+ """
+ Returns the session, refreshing it if necessary.
+
+ The session returned can be null if the session is not detected which
+ can happen in the event a user is not signed-in or has logged out.
+ """
+ current_session: Optional[Session] = None
+ if self._persist_session:
+ maybe_session = self._storage.get_item(self._storage_key)
+ current_session = self._get_valid_session(maybe_session)
+ if not current_session:
+ self._remove_session()
+ else:
+ current_session = self._in_memory_session
+ if not current_session:
+ return None
+ time_now = round(time())
+ has_expired = (
+ current_session.expires_at <= time_now + EXPIRY_MARGIN
+ if current_session.expires_at
+ else False
+ )
+ return (
+ self._call_refresh_token(current_session.refresh_token)
+ if has_expired
+ else current_session
+ )
+
+ def get_user(self, jwt: Optional[str] = None) -> Optional[UserResponse]:
+ """
+ Gets the current user details if there is an existing session.
+
+ Takes in an optional access token `jwt`. If no `jwt` is provided,
+ `get_user()` will attempt to get the `jwt` from the current session.
+ """
+ if not jwt:
+ session = self.get_session()
+ if session:
+ jwt = session.access_token
+ else:
+ return None
+ return self._request("GET", "user", jwt=jwt, xform=parse_user_response)
+
+ def update_user(self, attributes: UserAttributes) -> UserResponse:
+ """
+ Updates user data, if there is a logged in user.
+ """
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ response = self._request(
+ "PUT",
+ "user",
+ body=attributes,
+ jwt=session.access_token,
+ xform=parse_user_response,
+ )
+ session.user = response.user
+ self._save_session(session)
+ self._notify_all_subscribers("USER_UPDATED", session)
+ return response
+
+ def set_session(self, access_token: str, refresh_token: str) -> AuthResponse:
+ """
+ Sets the session data from the current session. If the current session
+ is expired, `set_session` will take care of refreshing it to obtain a
+ new session.
+
+ If the refresh token in the current session is invalid and the current
+ session has expired, an error will be thrown.
+
+ If the current session does not contain at `expires_at` field,
+ `set_session` will use the exp claim defined in the access token.
+
+ The current session that minimally contains an access token,
+ refresh token and a user.
+ """
+ time_now = round(time())
+ expires_at = time_now
+ has_expired = True
+ session: Optional[Session] = None
+ if access_token and access_token.split(".")[1]:
+ payload = self._decode_jwt(access_token)
+ exp = payload.get("exp")
+ if exp:
+ expires_at = int(exp)
+ has_expired = expires_at <= time_now
+ if has_expired:
+ if not refresh_token:
+ raise AuthSessionMissingError()
+ response = self._refresh_access_token(refresh_token)
+ if not response.session:
+ return AuthResponse()
+ session = response.session
+ else:
+ response = self.get_user(access_token)
+ session = Session(
+ access_token=access_token,
+ refresh_token=refresh_token,
+ user=response.user,
+ token_type="bearer",
+ expires_in=expires_at - time_now,
+ expires_at=expires_at,
+ )
+ self._save_session(session)
+ self._notify_all_subscribers("TOKEN_REFRESHED", session)
+ return AuthResponse(session=session, user=response.user)
+
+ def refresh_session(self, refresh_token: Optional[str] = None) -> AuthResponse:
+ """
+ Returns a new session, regardless of expiry status.
+
+ Takes in an optional current session. If not passed in, then refreshSession()
+ will attempt to retrieve it from getSession(). If the current session's
+ refresh token is invalid, an error will be thrown.
+ """
+ if not refresh_token:
+ session = self.get_session()
+ if session:
+ refresh_token = session.refresh_token
+ if not refresh_token:
+ raise AuthSessionMissingError()
+ session = self._call_refresh_token(refresh_token)
+ return AuthResponse(session=session, user=session.user)
+
+ def sign_out(self, options: SignOutOptions = {"scope": "global"}) -> None:
+ """
+ `sign_out` will remove the logged in user from the
+ current session and log them out - removing all items from storage and then trigger a `"SIGNED_OUT"` event.
+
+ For advanced use cases, you can revoke all refresh tokens for a user by passing a user's JWT through to `admin.sign_out`.
+
+ There is no way to revoke a user's access token jwt until it expires.
+ It is recommended to set a shorter expiry on the jwt for this reason.
+ """
+ with suppress(AuthApiError):
+ session = self.get_session()
+ access_token = session.access_token if session else None
+ if access_token:
+ self.admin.sign_out(access_token, options["scope"])
+
+ if options["scope"] != "others":
+ self._remove_session()
+ self._notify_all_subscribers("SIGNED_OUT", None)
+
+ def on_auth_state_change(
+ self,
+ callback: Callable[[AuthChangeEvent, Optional[Session]], None],
+ ) -> Subscription:
+ """
+ Receive a notification every time an auth event happens.
+ """
+ unique_id = str(uuid4())
+
+ def _unsubscribe() -> None:
+ self._state_change_emitters.pop(unique_id)
+
+ subscription = Subscription(
+ id=unique_id,
+ callback=callback,
+ unsubscribe=_unsubscribe,
+ )
+ self._state_change_emitters[unique_id] = subscription
+ return subscription
+
+ def reset_password_for_email(self, email: str, options: Options = {}) -> None:
+ """
+ Sends a password reset request to an email address.
+ """
+ self._request(
+ "POST",
+ "recover",
+ body={
+ "email": email,
+ "gotrue_meta_security": {
+ "captcha_token": options.get("captcha_token"),
+ },
+ },
+ redirect_to=options.get("redirect_to"),
+ )
+
+ def reset_password_email(
+ self,
+ email: str,
+ options: Options = {},
+ ) -> None:
+ """
+ Sends a password reset request to an email address.
+ """
+ self.reset_password_for_email(email, options)
+
+ # MFA methods
+
+ def _enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse:
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+
+ body = {
+ "friendly_name": params["friendly_name"],
+ "factor_type": params["factor_type"],
+ }
+
+ if params["factor_type"] == "phone":
+ body["phone"] = params["phone"]
+ else:
+ body["issuer"] = params["issuer"]
+
+ response = self._request(
+ "POST",
+ "factors",
+ body=body,
+ jwt=session.access_token,
+ xform=partial(model_validate, AuthMFAEnrollResponse),
+ )
+ if params["factor_type"] == "totp" and response.totp.qr_code:
+ response.totp.qr_code = f"data:image/svg+xml;utf-8,{response.totp.qr_code}"
+ return response
+
+ def _challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse:
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ return self._request(
+ "POST",
+ f"factors/{params.get('factor_id')}/challenge",
+ body={"channel": params.get("channel")},
+ jwt=session.access_token,
+ xform=partial(model_validate, AuthMFAChallengeResponse),
+ )
+
+ def _challenge_and_verify(
+ self,
+ params: MFAChallengeAndVerifyParams,
+ ) -> AuthMFAVerifyResponse:
+ response = self._challenge(
+ {
+ "factor_id": params.get("factor_id"),
+ }
+ )
+ return self._verify(
+ {
+ "factor_id": params.get("factor_id"),
+ "challenge_id": response.id,
+ "code": params.get("code"),
+ }
+ )
+
+ def _verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse:
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ response = self._request(
+ "POST",
+ f"factors/{params.get('factor_id')}/verify",
+ body=params,
+ jwt=session.access_token,
+ xform=partial(model_validate, AuthMFAVerifyResponse),
+ )
+ session = model_validate(Session, model_dump(response))
+ self._save_session(session)
+ self._notify_all_subscribers("MFA_CHALLENGE_VERIFIED", session)
+ return response
+
+ def _unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse:
+ session = self.get_session()
+ if not session:
+ raise AuthSessionMissingError()
+ return self._request(
+ "DELETE",
+ f"factors/{params.get('factor_id')}",
+ jwt=session.access_token,
+ xform=partial(AuthMFAUnenrollResponse, model_validate),
+ )
+
+ def _list_factors(self) -> AuthMFAListFactorsResponse:
+ response = self.get_user()
+ all = response.user.factors or []
+ totp = [f for f in all if f.factor_type == "totp" and f.status == "verified"]
+ phone = [f for f in all if f.factor_type == "phone" and f.status == "verified"]
+ return AuthMFAListFactorsResponse(all=all, totp=totp, phone=phone)
+
+ def _get_authenticator_assurance_level(
+ self,
+ ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse:
+ session = self.get_session()
+ if not session:
+ return AuthMFAGetAuthenticatorAssuranceLevelResponse(
+ current_level=None,
+ next_level=None,
+ current_authentication_methods=[],
+ )
+ payload = self._decode_jwt(session.access_token)
+ current_level: Optional[AuthenticatorAssuranceLevels] = None
+ if payload.get("aal"):
+ current_level = payload.get("aal")
+ verified_factors = [
+ f for f in session.user.factors or [] if f.status == "verified"
+ ]
+ next_level = "aal2" if verified_factors else current_level
+ current_authentication_methods = payload.get("amr") or []
+ return AuthMFAGetAuthenticatorAssuranceLevelResponse(
+ current_level=current_level,
+ next_level=next_level,
+ current_authentication_methods=current_authentication_methods,
+ )
+
+ # Private methods
+
+ def _remove_session(self) -> None:
+ if self._persist_session:
+ self._storage.remove_item(self._storage_key)
+ else:
+ self._in_memory_session = None
+ if self._refresh_token_timer:
+ self._refresh_token_timer.cancel()
+ self._refresh_token_timer = None
+
+ def _get_session_from_url(
+ self,
+ url: str,
+ ) -> Tuple[Session, Optional[str]]:
+ if not self._is_implicit_grant_flow(url):
+ raise AuthImplicitGrantRedirectError("Not a valid implicit grant flow url.")
+ result = urlparse(url)
+ params = parse_qs(result.query)
+ error_description = self._get_param(params, "error_description")
+ if error_description:
+ error_code = self._get_param(params, "error_code")
+ error = self._get_param(params, "error")
+ if not error_code:
+ raise AuthImplicitGrantRedirectError("No error_code detected.")
+ if not error:
+ raise AuthImplicitGrantRedirectError("No error detected.")
+ raise AuthImplicitGrantRedirectError(
+ error_description,
+ {"code": error_code, "error": error},
+ )
+ provider_token = self._get_param(params, "provider_token")
+ provider_refresh_token = self._get_param(params, "provider_refresh_token")
+ access_token = self._get_param(params, "access_token")
+ if not access_token:
+ raise AuthImplicitGrantRedirectError("No access_token detected.")
+ expires_in = self._get_param(params, "expires_in")
+ if not expires_in:
+ raise AuthImplicitGrantRedirectError("No expires_in detected.")
+ refresh_token = self._get_param(params, "refresh_token")
+ if not refresh_token:
+ raise AuthImplicitGrantRedirectError("No refresh_token detected.")
+ token_type = self._get_param(params, "token_type")
+ if not token_type:
+ raise AuthImplicitGrantRedirectError("No token_type detected.")
+ time_now = round(time())
+ expires_at = time_now + int(expires_in)
+ user = self.get_user(access_token)
+ session = Session(
+ provider_token=provider_token,
+ provider_refresh_token=provider_refresh_token,
+ access_token=access_token,
+ expires_in=int(expires_in),
+ expires_at=expires_at,
+ refresh_token=refresh_token,
+ token_type=token_type,
+ user=user.user,
+ )
+ redirect_type = self._get_param(params, "type")
+ return session, redirect_type
+
+ def _recover_and_refresh(self) -> None:
+ raw_session = self._storage.get_item(self._storage_key)
+ current_session = self._get_valid_session(raw_session)
+ if not current_session:
+ if raw_session:
+ self._remove_session()
+ return
+ time_now = round(time())
+ expires_at = current_session.expires_at
+ if expires_at and expires_at < time_now + EXPIRY_MARGIN:
+ refresh_token = current_session.refresh_token
+ if self._auto_refresh_token and refresh_token:
+ self._network_retries += 1
+ try:
+ self._call_refresh_token(refresh_token)
+ self._network_retries = 0
+ except Exception as e:
+ if (
+ isinstance(e, AuthRetryableError)
+ and self._network_retries < MAX_RETRIES
+ ):
+ if self._refresh_token_timer:
+ self._refresh_token_timer.cancel()
+ self._refresh_token_timer = Timer(
+ (RETRY_INTERVAL ** (self._network_retries * 100)),
+ self._recover_and_refresh,
+ )
+ self._refresh_token_timer.start()
+ return
+ self._remove_session()
+ return
+ if self._persist_session:
+ self._save_session(current_session)
+ self._notify_all_subscribers("SIGNED_IN", current_session)
+
+ def _call_refresh_token(self, refresh_token: str) -> Session:
+ if not refresh_token:
+ raise AuthSessionMissingError()
+ response = self._refresh_access_token(refresh_token)
+ if not response.session:
+ raise AuthSessionMissingError()
+ self._save_session(response.session)
+ self._notify_all_subscribers("TOKEN_REFRESHED", response.session)
+ return response.session
+
+ def _refresh_access_token(self, refresh_token: str) -> AuthResponse:
+ return self._request(
+ "POST",
+ "token",
+ query={"grant_type": "refresh_token"},
+ body={"refresh_token": refresh_token},
+ xform=parse_auth_response,
+ )
+
+ def _save_session(self, session: Session) -> None:
+ if not self._persist_session:
+ self._in_memory_session = session
+ expire_at = session.expires_at
+ if expire_at:
+ time_now = round(time())
+ expire_in = expire_at - time_now
+ refresh_duration_before_expires = (
+ EXPIRY_MARGIN if expire_in > EXPIRY_MARGIN else 0.5
+ )
+ value = (expire_in - refresh_duration_before_expires) * 1000
+ self._start_auto_refresh_token(value)
+ if self._persist_session and session.expires_at:
+ self._storage.set_item(self._storage_key, model_dump_json(session))
+
+ def _start_auto_refresh_token(self, value: float) -> None:
+ if self._refresh_token_timer:
+ self._refresh_token_timer.cancel()
+ self._refresh_token_timer = None
+ if value <= 0 or not self._auto_refresh_token:
+ return
+
+ def refresh_token_function():
+ self._network_retries += 1
+ try:
+ session = self.get_session()
+ if session:
+ self._call_refresh_token(session.refresh_token)
+ self._network_retries = 0
+ except Exception as e:
+ if (
+ isinstance(e, AuthRetryableError)
+ and self._network_retries < MAX_RETRIES
+ ):
+ self._start_auto_refresh_token(
+ RETRY_INTERVAL ** (self._network_retries * 100)
+ )
+
+ self._refresh_token_timer = Timer(value, refresh_token_function)
+ self._refresh_token_timer.start()
+
+ def _notify_all_subscribers(
+ self,
+ event: AuthChangeEvent,
+ session: Optional[Session],
+ ) -> None:
+ for subscription in self._state_change_emitters.values():
+ subscription.callback(event, session)
+
+ def _get_valid_session(
+ self,
+ raw_session: Optional[str],
+ ) -> Optional[Session]:
+ if not raw_session:
+ return None
+ data = loads(raw_session)
+ if not data:
+ return None
+ if not data.get("access_token"):
+ return None
+ if not data.get("refresh_token"):
+ return None
+ if not data.get("expires_at"):
+ return None
+ try:
+ expires_at = int(data["expires_at"])
+ data["expires_at"] = expires_at
+ except ValueError:
+ return None
+ try:
+ return model_validate(Session, data)
+ except Exception:
+ return None
+
+ def _get_param(
+ self,
+ query_params: Dict[str, List[str]],
+ name: str,
+ ) -> Optional[str]:
+ return query_params[name][0] if name in query_params else None
+
+ def _is_implicit_grant_flow(self, url: str) -> bool:
+ result = urlparse(url)
+ params = parse_qs(result.query)
+ return "access_token" in params or "error_description" in params
+
+ def _get_url_for_provider(
+ self,
+ url: str,
+ provider: Provider,
+ params: Dict[str, str],
+ ) -> Tuple[str, Dict[str, str]]:
+ if self._flow_type == "pkce":
+ code_verifier = generate_pkce_verifier()
+ code_challenge = generate_pkce_challenge(code_verifier)
+ self._storage.set_item(f"{self._storage_key}-code-verifier", code_verifier)
+ code_challenge_method = (
+ "plain" if code_verifier == code_challenge else "s256"
+ )
+ params["code_challenge"] = code_challenge
+ params["code_challenge_method"] = code_challenge_method
+
+ params["provider"] = provider
+ query = urlencode(params)
+ return f"{url}?{query}", params
+
+ def _decode_jwt(self, jwt: str) -> DecodedJWTDict:
+ """
+ Decodes a JWT (without performing any validation).
+ """
+ return decode_jwt_payload(jwt)
+
+ def exchange_code_for_session(self, params: CodeExchangeParams):
+ code_verifier = params.get("code_verifier") or self._storage.get_item(
+ f"{self._storage_key}-code-verifier"
+ )
+ response = self._request(
+ "POST",
+ "token",
+ query={"grant_type": "pkce"},
+ body={
+ "auth_code": params.get("auth_code"),
+ "code_verifier": code_verifier,
+ },
+ redirect_to=params.get("redirect_to"),
+ xform=parse_auth_response,
+ )
+ self._storage.remove_item(f"{self._storage_key}-code-verifier")
+ if response.session:
+ self._save_session(response.session)
+ self._notify_all_subscribers("SIGNED_IN", response.session)
+ return response
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_mfa_api.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_mfa_api.py
new file mode 100644
index 00000000..16bec8d5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/gotrue_mfa_api.py
@@ -0,0 +1,94 @@
+from ..types import (
+ AuthMFAChallengeResponse,
+ AuthMFAEnrollResponse,
+ AuthMFAGetAuthenticatorAssuranceLevelResponse,
+ AuthMFAListFactorsResponse,
+ AuthMFAUnenrollResponse,
+ AuthMFAVerifyResponse,
+ MFAChallengeAndVerifyParams,
+ MFAChallengeParams,
+ MFAEnrollParams,
+ MFAUnenrollParams,
+ MFAVerifyParams,
+)
+
+
+class SyncGoTrueMFAAPI:
+ """
+ Contains the full multi-factor authentication API.
+ """
+
+ def enroll(self, params: MFAEnrollParams) -> AuthMFAEnrollResponse:
+ """
+ Starts the enrollment process for a new Multi-Factor Authentication
+ factor. This method creates a new factor in the 'unverified' state.
+ Present the QR code or secret to the user and ask them to add it to their
+ authenticator app. Ask the user to provide you with an authenticator code
+ from their app and verify it by calling challenge and then verify.
+
+ The first successful verification of an unverified factor activates the
+ factor. All other sessions are logged out and the current one gets an
+ `aal2` authenticator level.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ def challenge(self, params: MFAChallengeParams) -> AuthMFAChallengeResponse:
+ """
+ Prepares a challenge used to verify that a user has access to a MFA
+ factor. Provide the challenge ID and verification code by calling `verify`.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ def challenge_and_verify(
+ self,
+ params: MFAChallengeAndVerifyParams,
+ ) -> AuthMFAVerifyResponse:
+ """
+ Helper method which creates a challenge and immediately uses the given code
+ to verify against it thereafter. The verification code is provided by the
+ user by entering a code seen in their authenticator app.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ def verify(self, params: MFAVerifyParams) -> AuthMFAVerifyResponse:
+ """
+ Verifies a verification code against a challenge. The verification code is
+ provided by the user by entering a code seen in their authenticator app.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ def unenroll(self, params: MFAUnenrollParams) -> AuthMFAUnenrollResponse:
+ """
+ Unenroll removes a MFA factor. Unverified factors can safely be ignored
+ and it's not necessary to unenroll them. Unenrolling a verified MFA factor
+ cannot be done from a session with an `aal1` authenticator level.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ def list_factors(self) -> AuthMFAListFactorsResponse:
+ """
+ Returns the list of MFA factors enabled for this user. For most use cases
+ you should consider using `get_authenticator_assurance_level`.
+
+ This uses a cached version of the factors and avoids incurring a network call.
+ If you need to update this list, call `get_user` first.
+ """
+ raise NotImplementedError() # pragma: no cover
+
+ def get_authenticator_assurance_level(
+ self,
+ ) -> AuthMFAGetAuthenticatorAssuranceLevelResponse:
+ """
+ Returns the Authenticator Assurance Level (AAL) for the active session.
+
+ - `aal1` (or `null`) means that the user's identity has been verified only
+ with a conventional login (email+password, OTP, magic link, social login,
+ etc.).
+ - `aal2` means that the user's identity has been verified both with a
+ conventional login and at least one MFA factor.
+
+ Although this method returns a promise, it's fairly quick (microseconds)
+ and rarely uses the network. You can use this to check whether the current
+ user needs to be shown a screen to verify their MFA factors.
+ """
+ raise NotImplementedError() # pragma: no cover
diff --git a/.venv/lib/python3.12/site-packages/gotrue/_sync/storage.py b/.venv/lib/python3.12/site-packages/gotrue/_sync/storage.py
new file mode 100644
index 00000000..03ede0c1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/_sync/storage.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from typing import Dict, Optional
+
+
+class SyncSupportedStorage(ABC):
+ @abstractmethod
+ def get_item(self, key: str) -> Optional[str]: ... # pragma: no cover
+
+ @abstractmethod
+ def set_item(self, key: str, value: str) -> None: ... # pragma: no cover
+
+ @abstractmethod
+ def remove_item(self, key: str) -> None: ... # pragma: no cover
+
+
+class SyncMemoryStorage(SyncSupportedStorage):
+ def __init__(self):
+ self.storage: Dict[str, str] = {}
+
+ def get_item(self, key: str) -> Optional[str]:
+ if key in self.storage:
+ return self.storage[key]
+
+ def set_item(self, key: str, value: str) -> None:
+ self.storage[key] = value
+
+ def remove_item(self, key: str) -> None:
+ if key in self.storage:
+ del self.storage[key]
diff --git a/.venv/lib/python3.12/site-packages/gotrue/constants.py b/.venv/lib/python3.12/site-packages/gotrue/constants.py
new file mode 100644
index 00000000..d16c0f31
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/constants.py
@@ -0,0 +1,23 @@
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Dict
+
+from .version import __version__
+
+GOTRUE_URL = "http://localhost:9999"
+DEFAULT_HEADERS: Dict[str, str] = {
+ "X-Client-Info": f"gotrue-py/{__version__}",
+}
+EXPIRY_MARGIN = 10 # seconds
+MAX_RETRIES = 10
+RETRY_INTERVAL = 2 # deciseconds
+STORAGE_KEY = "supabase.auth.token"
+
+API_VERSION_HEADER_NAME = "X-Supabase-Api-Version"
+API_VERSIONS = {
+ "2024-01-01": {
+ "timestamp": datetime.timestamp(datetime.strptime("2024-01-01", "%Y-%m-%d")),
+ "name": "2024-01-01",
+ },
+}
diff --git a/.venv/lib/python3.12/site-packages/gotrue/errors.py b/.venv/lib/python3.12/site-packages/gotrue/errors.py
new file mode 100644
index 00000000..fa693894
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/errors.py
@@ -0,0 +1,227 @@
+from __future__ import annotations
+
+from typing import List, Literal, Optional
+
+from typing_extensions import TypedDict
+
+ErrorCode = Literal[
+ "unexpected_failure",
+ "validation_failed",
+ "bad_json",
+ "email_exists",
+ "phone_exists",
+ "bad_jwt",
+ "not_admin",
+ "no_authorization",
+ "user_not_found",
+ "session_not_found",
+ "flow_state_not_found",
+ "flow_state_expired",
+ "signup_disabled",
+ "user_banned",
+ "provider_email_needs_verification",
+ "invite_not_found",
+ "bad_oauth_state",
+ "bad_oauth_callback",
+ "oauth_provider_not_supported",
+ "unexpected_audience",
+ "single_identity_not_deletable",
+ "email_conflict_identity_not_deletable",
+ "identity_already_exists",
+ "email_provider_disabled",
+ "phone_provider_disabled",
+ "too_many_enrolled_mfa_factors",
+ "mfa_factor_name_conflict",
+ "mfa_factor_not_found",
+ "mfa_ip_address_mismatch",
+ "mfa_challenge_expired",
+ "mfa_verification_failed",
+ "mfa_verification_rejected",
+ "insufficient_aal",
+ "captcha_failed",
+ "saml_provider_disabled",
+ "manual_linking_disabled",
+ "sms_send_failed",
+ "email_not_confirmed",
+ "phone_not_confirmed",
+ "reauth_nonce_missing",
+ "saml_relay_state_not_found",
+ "saml_relay_state_expired",
+ "saml_idp_not_found",
+ "saml_assertion_no_user_id",
+ "saml_assertion_no_email",
+ "user_already_exists",
+ "sso_provider_not_found",
+ "saml_metadata_fetch_failed",
+ "saml_idp_already_exists",
+ "sso_domain_already_exists",
+ "saml_entity_id_mismatch",
+ "conflict",
+ "provider_disabled",
+ "user_sso_managed",
+ "reauthentication_needed",
+ "same_password",
+ "reauthentication_not_valid",
+ "otp_expired",
+ "otp_disabled",
+ "identity_not_found",
+ "weak_password",
+ "over_request_rate_limit",
+ "over_email_send_rate_limit",
+ "over_sms_send_rate_limit",
+ "bad_code_verifier",
+ "anonymous_provider_disabled",
+ "hook_timeout",
+ "hook_timeout_after_retry",
+ "hook_payload_over_size_limit",
+ "hook_payload_invalid_content_type",
+ "request_timeout",
+ "mfa_phone_enroll_not_enabled",
+ "mfa_phone_verify_not_enabled",
+ "mfa_totp_enroll_not_enabled",
+ "mfa_totp_verify_not_enabled",
+ "mfa_webauthn_enroll_not_enabled",
+ "mfa_webauthn_verify_not_enabled",
+ "mfa_verified_factor_exists",
+ "invalid_credentials",
+ "email_address_not_authorized",
+ "email_address_invalid",
+]
+
+
+class AuthError(Exception):
+ def __init__(self, message: str, code: ErrorCode) -> None:
+ Exception.__init__(self, message)
+ self.message = message
+ self.name = "AuthError"
+ self.code = code
+
+
+class AuthApiErrorDict(TypedDict):
+ name: str
+ message: str
+ status: int
+ code: ErrorCode
+
+
+class AuthApiError(AuthError):
+ def __init__(self, message: str, status: int, code: ErrorCode) -> None:
+ AuthError.__init__(self, message, code)
+ self.name = "AuthApiError"
+ self.status = status
+ self.code = code
+
+ def to_dict(self) -> AuthApiErrorDict:
+ return {
+ "name": self.name,
+ "message": self.message,
+ "status": self.status,
+ "code": self.code,
+ }
+
+
+class AuthUnknownError(AuthError):
+ def __init__(self, message: str, original_error: Exception) -> None:
+ AuthError.__init__(self, message, None)
+ self.name = "AuthUnknownError"
+ self.original_error = original_error
+
+
+class CustomAuthError(AuthError):
+ def __init__(self, message: str, name: str, status: int, code: ErrorCode) -> None:
+ AuthError.__init__(self, message, code)
+ self.name = name
+ self.status = status
+
+ def to_dict(self) -> AuthApiErrorDict:
+ return {
+ "name": self.name,
+ "message": self.message,
+ "status": self.status,
+ }
+
+
+class AuthSessionMissingError(CustomAuthError):
+ def __init__(self) -> None:
+ CustomAuthError.__init__(
+ self,
+ "Auth session missing!",
+ "AuthSessionMissingError",
+ 400,
+ None,
+ )
+
+
+class AuthInvalidCredentialsError(CustomAuthError):
+ def __init__(self, message: str) -> None:
+ CustomAuthError.__init__(
+ self,
+ message,
+ "AuthInvalidCredentialsError",
+ 400,
+ None,
+ )
+
+
+class AuthImplicitGrantRedirectErrorDetails(TypedDict):
+ error: str
+ code: str
+
+
+class AuthImplicitGrantRedirectErrorDict(AuthApiErrorDict):
+ details: Optional[AuthImplicitGrantRedirectErrorDetails]
+
+
+class AuthImplicitGrantRedirectError(CustomAuthError):
+ def __init__(
+ self,
+ message: str,
+ details: Optional[AuthImplicitGrantRedirectErrorDetails] = None,
+ ) -> None:
+ CustomAuthError.__init__(
+ self,
+ message,
+ "AuthImplicitGrantRedirectError",
+ 500,
+ None,
+ )
+ self.details = details
+
+ def to_dict(self) -> AuthImplicitGrantRedirectErrorDict:
+ return {
+ "name": self.name,
+ "message": self.message,
+ "status": self.status,
+ "details": self.details,
+ }
+
+
+class AuthRetryableError(CustomAuthError):
+ def __init__(self, message: str, status: int) -> None:
+ CustomAuthError.__init__(
+ self,
+ message,
+ "AuthRetryableError",
+ status,
+ None,
+ )
+
+
+class AuthWeakPasswordError(CustomAuthError):
+ def __init__(self, message: str, status: int, reasons: List[str]) -> None:
+ CustomAuthError.__init__(
+ self,
+ message,
+ "AuthWeakPasswordError",
+ status,
+ "weak_password",
+ )
+ self.reasons = reasons
+
+ def to_dict(self) -> AuthApiErrorDict:
+ return {
+ "name": self.name,
+ "message": self.message,
+ "status": self.status,
+ "reasons": self.reasons,
+ }
diff --git a/.venv/lib/python3.12/site-packages/gotrue/helpers.py b/.venv/lib/python3.12/site-packages/gotrue/helpers.py
new file mode 100644
index 00000000..ae9dc7c5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/helpers.py
@@ -0,0 +1,269 @@
+from __future__ import annotations
+
+import base64
+import hashlib
+import re
+import secrets
+import string
+from base64 import urlsafe_b64decode
+from datetime import datetime
+from json import loads
+from typing import Any, Dict, Optional, Type, TypeVar, cast
+from urllib.parse import urlparse
+
+from httpx import HTTPStatusError, Response
+from pydantic import BaseModel
+
+from .constants import API_VERSION_HEADER_NAME, API_VERSIONS
+from .errors import (
+ AuthApiError,
+ AuthError,
+ AuthRetryableError,
+ AuthUnknownError,
+ AuthWeakPasswordError,
+)
+from .types import (
+ AuthOtpResponse,
+ AuthResponse,
+ GenerateLinkProperties,
+ GenerateLinkResponse,
+ LinkIdentityResponse,
+ Session,
+ SSOResponse,
+ User,
+ UserResponse,
+)
+
+TBaseModel = TypeVar("TBaseModel", bound=BaseModel)
+BASE64URL_REGEX = r"^([a-z0-9_-]{4})*($|[a-z0-9_-]{3}$|[a-z0-9_-]{2}$)$"
+
+
+def model_validate(model: Type[TBaseModel], contents) -> TBaseModel:
+ """Compatibility layer between pydantic 1 and 2 for parsing an instance
+ of a BaseModel from varied"""
+ try:
+ # pydantic > 2
+ return model.model_validate(contents)
+ except AttributeError:
+ # pydantic < 2
+ return model.parse_obj(contents)
+
+
+def model_dump(model: BaseModel) -> Dict[str, Any]:
+ """Compatibility layer between pydantic 1 and 2 for dumping a model's contents as a dict"""
+ try:
+ # pydantic > 2
+ return model.model_dump()
+ except AttributeError:
+ # pydantic < 2
+ return model.dict()
+
+
+def model_dump_json(model: BaseModel) -> str:
+ """Compatibility layer between pydantic 1 and 2 for dumping a model's contents as json"""
+ try:
+ # pydantic > 2
+ return model.model_dump_json()
+ except AttributeError:
+ # pydantic < 2
+ return model.json()
+
+
+def parse_auth_response(data: Any) -> AuthResponse:
+ session: Optional[Session] = None
+ if (
+ "access_token" in data
+ and "refresh_token" in data
+ and "expires_in" in data
+ and data["access_token"]
+ and data["refresh_token"]
+ and data["expires_in"]
+ ):
+ session = model_validate(Session, data)
+ user_data = data.get("user", data)
+ user = model_validate(User, user_data) if user_data else None
+ return AuthResponse(session=session, user=user)
+
+
+def parse_auth_otp_response(data: Any) -> AuthOtpResponse:
+ return model_validate(AuthOtpResponse, data)
+
+
+def parse_link_identity_response(data: Any) -> LinkIdentityResponse:
+ return model_validate(LinkIdentityResponse, data)
+
+
+def parse_link_response(data: Any) -> GenerateLinkResponse:
+ properties = GenerateLinkProperties(
+ action_link=data.get("action_link"),
+ email_otp=data.get("email_otp"),
+ hashed_token=data.get("hashed_token"),
+ redirect_to=data.get("redirect_to"),
+ verification_type=data.get("verification_type"),
+ )
+ user = model_validate(
+ User, {k: v for k, v in data.items() if k not in model_dump(properties)}
+ )
+ return GenerateLinkResponse(properties=properties, user=user)
+
+
+def parse_user_response(data: Any) -> UserResponse:
+ if "user" not in data:
+ data = {"user": data}
+ return model_validate(UserResponse, data)
+
+
+def parse_sso_response(data: Any) -> SSOResponse:
+ return model_validate(SSOResponse, data)
+
+
+def get_error_message(error: Any) -> str:
+ props = ["msg", "message", "error_description", "error"]
+ filter = lambda prop: (
+ prop in error if isinstance(error, dict) else hasattr(error, prop)
+ )
+ return next((error[prop] for prop in props if filter(prop)), str(error))
+
+
+def get_error_code(error: Any) -> str:
+ return error.get("error_code", None) if isinstance(error, dict) else None
+
+
+def looks_like_http_status_error(exception: Exception) -> bool:
+ return isinstance(exception, HTTPStatusError)
+
+
+def handle_exception(exception: Exception) -> AuthError:
+ if not looks_like_http_status_error(exception):
+ return AuthRetryableError(get_error_message(exception), 0)
+ error = cast(HTTPStatusError, exception)
+ try:
+ network_error_codes = [502, 503, 504]
+ if error.response.status_code in network_error_codes:
+ return AuthRetryableError(
+ get_error_message(error), error.response.status_code
+ )
+ data = error.response.json()
+
+ error_code = None
+ response_api_version = parse_response_api_version(error.response)
+
+ if (
+ response_api_version
+ and datetime.timestamp(response_api_version)
+ >= API_VERSIONS.get("2024-01-01").get("timestamp")
+ and isinstance(data, dict)
+ and data
+ and isinstance(data.get("code"), str)
+ ):
+ error_code = data.get("code")
+ elif (
+ isinstance(data, dict) and data and isinstance(data.get("error_code"), str)
+ ):
+ error_code = data.get("error_code")
+
+ if error_code is None:
+ if (
+ isinstance(data, dict)
+ and data
+ and isinstance(data.get("weak_password"), dict)
+ and data.get("weak_password")
+ and isinstance(data.get("weak_password"), list)
+ and len(data.get("weak_password"))
+ ):
+ return AuthWeakPasswordError(
+ get_error_message(data),
+ error.response.status_code,
+ data.get("weak_password").get("reasons"),
+ )
+ elif error_code == "weak_password":
+ return AuthWeakPasswordError(
+ get_error_message(data),
+ error.response.status_code,
+ data.get("weak_password", {}).get("reasons", {}),
+ )
+
+ return AuthApiError(
+ get_error_message(data),
+ error.response.status_code or 500,
+ error_code,
+ )
+ except Exception as e:
+ return AuthUnknownError(get_error_message(error), e)
+
+
+def decode_jwt_payload(token: str) -> Any:
+ parts = token.split(".")
+ if len(parts) != 3:
+ raise ValueError("JWT is not valid: not a JWT structure")
+ base64url = parts[1]
+ # Addding padding otherwise the following error happens:
+ # binascii.Error: Incorrect padding
+ base64url_with_padding = base64url + "=" * (-len(base64url) % 4)
+ return loads(urlsafe_b64decode(base64url_with_padding).decode("utf-8"))
+
+
+def generate_pkce_verifier(length=64):
+ """Generate a random PKCE verifier of the specified length."""
+ if length < 43 or length > 128:
+ raise ValueError("PKCE verifier length must be between 43 and 128 characters")
+
+ # Define characters that can be used in the PKCE verifier
+ charset = string.ascii_letters + string.digits + "-._~"
+
+ return "".join(secrets.choice(charset) for _ in range(length))
+
+
+def generate_pkce_challenge(code_verifier):
+ """Generate a code challenge from a PKCE verifier."""
+ # Hash the verifier using SHA-256
+ verifier_bytes = code_verifier.encode("utf-8")
+ sha256_hash = hashlib.sha256(verifier_bytes).digest()
+
+ return base64.urlsafe_b64encode(sha256_hash).rstrip(b"=").decode("utf-8")
+
+
+API_VERSION_REGEX = r"^2[0-9]{3}-(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])$"
+
+
+def parse_response_api_version(response: Response):
+ api_version = response.headers.get(API_VERSION_HEADER_NAME)
+
+ if not api_version:
+ return None
+
+ if re.search(API_VERSION_REGEX, api_version) is None:
+ return None
+
+ try:
+ dt = datetime.strptime(api_version, "%Y-%m-%d")
+ return dt
+ except Exception as e:
+ return None
+
+
+def is_http_url(url: str) -> bool:
+ return urlparse(url).scheme in {"https", "http"}
+
+
+def is_valid_jwt(value: str) -> bool:
+ """Checks if value looks like a JWT, does not do any extra parsing."""
+ if not isinstance(value, str):
+ return False
+
+ # Remove trailing whitespaces if any.
+ value = value.strip()
+
+ # Remove "Bearer " prefix if any.
+ if value.startswith("Bearer "):
+ value = value[7:]
+
+ # Valid JWT must have 2 dots (Header.Paylod.Signature)
+ if value.count(".") != 2:
+ return False
+
+ for part in value.split("."):
+ if not re.search(BASE64URL_REGEX, part, re.IGNORECASE):
+ return False
+
+ return True
diff --git a/.venv/lib/python3.12/site-packages/gotrue/http_clients.py b/.venv/lib/python3.12/site-packages/gotrue/http_clients.py
new file mode 100644
index 00000000..6dbd91d9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/http_clients.py
@@ -0,0 +1,9 @@
+from __future__ import annotations
+
+from httpx import AsyncClient # noqa: F401
+from httpx import Client as BaseClient
+
+
+class SyncClient(BaseClient):
+ def aclose(self) -> None:
+ self.close()
diff --git a/.venv/lib/python3.12/site-packages/gotrue/timer.py b/.venv/lib/python3.12/site-packages/gotrue/timer.py
new file mode 100644
index 00000000..11f11536
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/timer.py
@@ -0,0 +1,45 @@
+import asyncio
+from threading import Timer as _Timer
+from typing import Any, Callable, Coroutine, Optional, cast
+
+
+class Timer:
+ def __init__(
+ self,
+ seconds: float,
+ function: Callable[[], Optional[Coroutine[Any, Any, None]]],
+ ) -> None:
+ self._milliseconds = seconds
+ self._function = function
+ self._task: Optional[asyncio.Task] = None
+ self._timer: Optional[_Timer] = None
+
+ def start(self) -> None:
+ if asyncio.iscoroutinefunction(self._function):
+
+ async def schedule():
+ await asyncio.sleep(self._milliseconds / 1000)
+ await cast(Coroutine[Any, Any, None], self._function())
+
+ def cleanup(_):
+ self._task = None
+
+ self._task = asyncio.create_task(schedule())
+ self._task.add_done_callback(cleanup)
+ else:
+ self._timer = _Timer(self._milliseconds / 1000, self._function)
+ self._timer.daemon = True
+ self._timer.start()
+
+ def cancel(self) -> None:
+ if self._task is not None:
+ self._task.cancel()
+ self._task = None
+ if self._timer is not None:
+ self._timer.cancel()
+ self._timer = None
+
+ def is_alive(self) -> bool:
+ return self._task is not None or (
+ self._timer is not None and self._timer.is_alive()
+ )
diff --git a/.venv/lib/python3.12/site-packages/gotrue/types.py b/.venv/lib/python3.12/site-packages/gotrue/types.py
new file mode 100644
index 00000000..86bda3e2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/types.py
@@ -0,0 +1,818 @@
+from __future__ import annotations
+
+from datetime import datetime
+from time import time
+from typing import Any, Callable, Dict, List, Optional, Union
+
+from pydantic import BaseModel, ConfigDict
+
+try:
+ # > 2
+ from pydantic import model_validator
+
+ model_validator_v1_v2_compat = model_validator(mode="before")
+except ImportError:
+ # < 2
+ from pydantic import root_validator
+
+ model_validator_v1_v2_compat = root_validator
+
+from typing_extensions import Literal, NotRequired, TypedDict
+
+Provider = Literal[
+ "apple",
+ "azure",
+ "bitbucket",
+ "discord",
+ "facebook",
+ "figma",
+ "fly",
+ "github",
+ "gitlab",
+ "google",
+ "kakao",
+ "keycloak",
+ "linkedin",
+ "linkedin_oidc",
+ "notion",
+ "slack",
+ "slack_oidc",
+ "spotify",
+ "twitch",
+ "twitter",
+ "workos",
+ "zoom",
+]
+
+EmailOtpType = Literal[
+ "signup", "invite", "magiclink", "recovery", "email_change", "email"
+]
+
+AuthChangeEventMFA = Literal["MFA_CHALLENGE_VERIFIED"]
+
+AuthFlowType = Literal["pkce", "implicit"]
+
+AuthChangeEvent = Literal[
+ "PASSWORD_RECOVERY",
+ "SIGNED_IN",
+ "SIGNED_OUT",
+ "TOKEN_REFRESHED",
+ "USER_UPDATED",
+ "USER_DELETED",
+ AuthChangeEventMFA,
+]
+
+
+class AMREntry(BaseModel):
+ """
+ An authentication methord reference (AMR) entry.
+
+ An entry designates what method was used by the user to verify their
+ identity and at what time.
+ """
+
+ method: Union[Literal["password", "otp", "oauth", "mfa/totp"], str]
+ """
+ Authentication method name.
+ """
+ timestamp: int
+ """
+ Timestamp when the method was successfully used. Represents number of
+ seconds since 1st January 1970 (UNIX epoch) in UTC.
+ """
+
+
+class Options(TypedDict):
+ redirect_to: NotRequired[str]
+ captcha_token: NotRequired[str]
+
+
+class InviteUserByEmailOptions(TypedDict):
+ redirect_to: NotRequired[str]
+ data: NotRequired[Any]
+
+
+class AuthResponse(BaseModel):
+ user: Optional[User] = None
+ session: Optional[Session] = None
+
+
+class AuthOtpResponse(BaseModel):
+ user: None = None
+ session: None = None
+ message_id: Optional[str] = None
+
+
+class OAuthResponse(BaseModel):
+ provider: Provider
+ url: str
+
+
+class SSOResponse(BaseModel):
+ url: str
+
+
+class LinkIdentityResponse(BaseModel):
+ url: str
+
+
+class IdentitiesResponse(BaseModel):
+ identities: List[UserIdentity]
+
+
+class UserResponse(BaseModel):
+ user: User
+
+
+class Session(BaseModel):
+ provider_token: Optional[str] = None
+ """
+ The oauth provider token. If present, this can be used to make external API
+ requests to the oauth provider used.
+ """
+ provider_refresh_token: Optional[str] = None
+ """
+ The oauth provider refresh token. If present, this can be used to refresh
+ the provider_token via the oauth provider's API.
+
+ Not all oauth providers return a provider refresh token. If the
+ provider_refresh_token is missing, please refer to the oauth provider's
+ documentation for information on how to obtain the provider refresh token.
+ """
+ access_token: str
+ refresh_token: str
+ expires_in: int
+ """
+ The number of seconds until the token expires (since it was issued).
+ Returned when a login is confirmed.
+ """
+ expires_at: Optional[int] = None
+ """
+ A timestamp of when the token will expire. Returned when a login is confirmed.
+ """
+ token_type: str
+ user: User
+
+ @model_validator_v1_v2_compat
+ def validator(cls, values: dict) -> dict:
+ expires_in = values.get("expires_in")
+ if expires_in and not values.get("expires_at"):
+ values["expires_at"] = round(time()) + expires_in
+ return values
+
+
+class UserIdentity(BaseModel):
+ id: str
+ identity_id: str
+ user_id: str
+ identity_data: Dict[str, Any]
+ provider: str
+ created_at: datetime
+ last_sign_in_at: Optional[datetime] = None
+ updated_at: Optional[datetime] = None
+
+
+class Factor(BaseModel):
+ """
+ A MFA factor.
+ """
+
+ id: str
+ """
+ ID of the factor.
+ """
+ friendly_name: Optional[str] = None
+ """
+ Friendly name of the factor, useful to disambiguate between multiple factors.
+ """
+ factor_type: Union[Literal["totp", "phone"], str]
+ """
+ Type of factor. Only `totp` supported with this version but may change in
+ future versions.
+ """
+ status: Literal["verified", "unverified"]
+ """
+ Factor's status.
+ """
+ created_at: datetime
+ updated_at: datetime
+
+
+class User(BaseModel):
+ id: str
+ app_metadata: Dict[str, Any]
+ user_metadata: Dict[str, Any]
+ aud: str
+ confirmation_sent_at: Optional[datetime] = None
+ recovery_sent_at: Optional[datetime] = None
+ email_change_sent_at: Optional[datetime] = None
+ new_email: Optional[str] = None
+ new_phone: Optional[str] = None
+ invited_at: Optional[datetime] = None
+ action_link: Optional[str] = None
+ email: Optional[str] = None
+ phone: Optional[str] = None
+ created_at: datetime
+ confirmed_at: Optional[datetime] = None
+ email_confirmed_at: Optional[datetime] = None
+ phone_confirmed_at: Optional[datetime] = None
+ last_sign_in_at: Optional[datetime] = None
+ role: Optional[str] = None
+ updated_at: Optional[datetime] = None
+ identities: Optional[List[UserIdentity]] = None
+ is_anonymous: bool = False
+ factors: Optional[List[Factor]] = None
+
+
+class UserAttributes(TypedDict):
+ email: NotRequired[str]
+ phone: NotRequired[str]
+ password: NotRequired[str]
+ data: NotRequired[Any]
+
+
+class AdminUserAttributes(UserAttributes, TypedDict):
+ user_metadata: NotRequired[Any]
+ app_metadata: NotRequired[Any]
+ email_confirm: NotRequired[bool]
+ phone_confirm: NotRequired[bool]
+ ban_duration: NotRequired[Union[str, Literal["none"]]]
+ role: NotRequired[str]
+ """
+ The `role` claim set in the user's access token JWT.
+
+ When a user signs up, this role is set to `authenticated` by default. You should only modify the `role` if you need to provision several levels of admin access that have different permissions on individual columns in your database.
+
+ Setting this role to `service_role` is not recommended as it grants the user admin privileges.
+ """
+ password_hash: NotRequired[str]
+ """
+ The `password_hash` for the user's password.
+
+ Allows you to specify a password hash for the user. This is useful for migrating a user's password hash from another service.
+
+ Supports bcrypt and argon2 password hashes.
+ """
+ id: NotRequired[str]
+ """
+ The `id` for the user.
+
+ Allows you to overwrite the default `id` set for the user.
+ """
+
+
+class Subscription(BaseModel):
+ id: str
+ """
+ The subscriber UUID. This will be set by the client.
+ """
+ callback: Callable[[AuthChangeEvent, Optional[Session]], None]
+ """
+ The function to call every time there is an event.
+ """
+ unsubscribe: Callable[[], None]
+ """
+ Call this to remove the listener.
+ """
+
+
+class UpdatableFactorAttributes(TypedDict):
+ friendly_name: str
+
+
+class SignUpWithEmailAndPasswordCredentialsOptions(
+ TypedDict,
+):
+ email_redirect_to: NotRequired[str]
+ data: NotRequired[Any]
+ captcha_token: NotRequired[str]
+
+
+class SignUpWithEmailAndPasswordCredentials(TypedDict):
+ email: str
+ password: str
+ options: NotRequired[SignUpWithEmailAndPasswordCredentialsOptions]
+
+
+class SignUpWithPhoneAndPasswordCredentialsOptions(TypedDict):
+ data: NotRequired[Any]
+ captcha_token: NotRequired[str]
+ channel: NotRequired[Literal["sms", "whatsapp"]]
+
+
+class SignUpWithPhoneAndPasswordCredentials(TypedDict):
+ phone: str
+ password: str
+ options: NotRequired[SignUpWithPhoneAndPasswordCredentialsOptions]
+
+
+SignUpWithPasswordCredentials = Union[
+ SignUpWithEmailAndPasswordCredentials,
+ SignUpWithPhoneAndPasswordCredentials,
+]
+
+
+class SignInWithPasswordCredentialsOptions(TypedDict):
+ data: NotRequired[Any]
+ captcha_token: NotRequired[str]
+
+
+class SignInWithEmailAndPasswordCredentials(TypedDict):
+ email: str
+ password: str
+ options: NotRequired[SignInWithPasswordCredentialsOptions]
+
+
+class SignInWithPhoneAndPasswordCredentials(TypedDict):
+ phone: str
+ password: str
+ options: NotRequired[SignInWithPasswordCredentialsOptions]
+
+
+SignInWithPasswordCredentials = Union[
+ SignInWithEmailAndPasswordCredentials,
+ SignInWithPhoneAndPasswordCredentials,
+]
+
+
+class SignInWithIdTokenCredentials(TypedDict):
+ """
+ Provider name or OIDC `iss` value identifying which provider should be used to verify the provided token. Supported names: `google`, `apple`, `azure`, `facebook`, `kakao`, `keycloak` (deprecated).
+ """
+
+ provider: Literal["google", "apple", "azure", "facebook", "kakao"]
+ token: str
+ access_token: NotRequired[str]
+ nonce: NotRequired[str]
+ options: NotRequired[SignInWithIdTokenCredentialsOptions]
+
+
+class SignInWithIdTokenCredentialsOptions(TypedDict):
+ captcha_token: NotRequired[str]
+
+
+class SignInWithEmailAndPasswordlessCredentialsOptions(TypedDict):
+ email_redirect_to: NotRequired[str]
+ should_create_user: NotRequired[bool]
+ data: NotRequired[Any]
+ captcha_token: NotRequired[str]
+
+
+class SignInWithEmailAndPasswordlessCredentials(TypedDict):
+ email: str
+ options: NotRequired[SignInWithEmailAndPasswordlessCredentialsOptions]
+
+
+class SignInWithPhoneAndPasswordlessCredentialsOptions(TypedDict):
+ should_create_user: NotRequired[bool]
+ data: NotRequired[Any]
+ captcha_token: NotRequired[str]
+ channel: NotRequired[Literal["sms", "whatsapp"]]
+
+
+class SignInWithPhoneAndPasswordlessCredentials(TypedDict):
+ phone: str
+ options: NotRequired[SignInWithPhoneAndPasswordlessCredentialsOptions]
+
+
+SignInWithPasswordlessCredentials = Union[
+ SignInWithEmailAndPasswordlessCredentials,
+ SignInWithPhoneAndPasswordlessCredentials,
+]
+
+
+class ResendEmailCredentialsOptions(TypedDict):
+ email_redirect_to: NotRequired[str]
+ captcha_token: NotRequired[str]
+
+
+class ResendEmailCredentials(TypedDict):
+ type: Literal["signup", "email_change"]
+ email: str
+ options: NotRequired[ResendEmailCredentialsOptions]
+
+
+class ResendPhoneCredentialsOptions(TypedDict):
+ captcha_token: NotRequired[str]
+
+
+class ResendPhoneCredentials(TypedDict):
+ type: Literal["sms", "phone_change"]
+ phone: str
+ options: NotRequired[ResendPhoneCredentialsOptions]
+
+
+ResendCredentials = Union[ResendEmailCredentials, ResendPhoneCredentials]
+
+
+class SignInWithOAuthCredentialsOptions(TypedDict):
+ redirect_to: NotRequired[str]
+ scopes: NotRequired[str]
+ query_params: NotRequired[Dict[str, str]]
+
+
+class SignInWithOAuthCredentials(TypedDict):
+ provider: Provider
+ options: NotRequired[SignInWithOAuthCredentialsOptions]
+
+
+class SignInWithSSOCredentials(TypedDict):
+ provider_id: NotRequired[str]
+ domain: NotRequired[str]
+ options: NotRequired[SignInWithSSOOptions]
+
+
+class SignInWithSSOOptions(TypedDict):
+ redirect_to: NotRequired[str]
+ skip_http_redirect: NotRequired[bool]
+
+
+class SignInAnonymouslyCredentials(TypedDict):
+ options: NotRequired[SignInAnonymouslyCredentialsOptions]
+
+
+class SignInAnonymouslyCredentialsOptions(TypedDict):
+ data: NotRequired[Any]
+ captcha_token: NotRequired[str]
+
+
+class VerifyOtpParamsOptions(TypedDict):
+ redirect_to: NotRequired[str]
+ captcha_token: NotRequired[str]
+
+
+class VerifyEmailOtpParams(TypedDict):
+ email: str
+ token: str
+ type: EmailOtpType
+ options: NotRequired[VerifyOtpParamsOptions]
+
+
+class VerifyMobileOtpParams(TypedDict):
+ phone: str
+ token: str
+ type: Literal[
+ "sms",
+ "phone_change",
+ ]
+ options: NotRequired[VerifyOtpParamsOptions]
+
+
+class VerifyTokenHashParams(TypedDict):
+ token_hash: str
+ type: EmailOtpType
+ options: NotRequired[VerifyOtpParamsOptions]
+
+
+VerifyOtpParams = Union[
+ VerifyEmailOtpParams, VerifyMobileOtpParams, VerifyTokenHashParams
+]
+
+
+class GenerateLinkParamsOptions(TypedDict):
+ redirect_to: NotRequired[str]
+
+
+class GenerateLinkParamsWithDataOptions(GenerateLinkParamsOptions, TypedDict):
+ data: NotRequired[Any]
+
+
+class GenerateSignupLinkParams(TypedDict):
+ type: Literal["signup"]
+ email: str
+ password: str
+ options: NotRequired[GenerateLinkParamsWithDataOptions]
+
+
+class GenerateInviteOrMagiclinkParams(TypedDict):
+ type: Literal["invite", "magiclink"]
+ email: str
+ options: NotRequired[GenerateLinkParamsWithDataOptions]
+
+
+class GenerateRecoveryLinkParams(TypedDict):
+ type: Literal["recovery"]
+ email: str
+ options: NotRequired[GenerateLinkParamsOptions]
+
+
+class GenerateEmailChangeLinkParams(TypedDict):
+ type: Literal["email_change_current", "email_change_new"]
+ email: str
+ new_email: str
+ options: NotRequired[GenerateLinkParamsOptions]
+
+
+GenerateLinkParams = Union[
+ GenerateSignupLinkParams,
+ GenerateInviteOrMagiclinkParams,
+ GenerateRecoveryLinkParams,
+ GenerateEmailChangeLinkParams,
+]
+
+GenerateLinkType = Literal[
+ "signup",
+ "invite",
+ "magiclink",
+ "recovery",
+ "email_change_current",
+ "email_change_new",
+]
+
+
+class MFAEnrollParams(TypedDict):
+ factor_type: Literal["totp", "phone"]
+ issuer: NotRequired[str]
+ friendly_name: NotRequired[str]
+ phone: str
+
+
+class MFAUnenrollParams(TypedDict):
+ factor_id: str
+ """
+ ID of the factor being unenrolled.
+ """
+
+
+class CodeExchangeParams(TypedDict):
+ code_verifier: str
+ """
+ Randomly generated string
+ """
+ auth_code: str
+ """
+ Code returned after completing one of the authorization flows
+ """
+ redirect_to: str
+ """
+ The URL to route to after a session is successfully obtained
+ """
+
+
+class MFAVerifyParams(TypedDict):
+ factor_id: str
+ """
+ ID of the factor being verified.
+ """
+ challenge_id: str
+ """
+ ID of the challenge being verified.
+ """
+ code: str
+ """
+ Verification code provided by the user.
+ """
+
+
+class MFAChallengeParams(TypedDict):
+ factor_id: str
+ """
+ ID of the factor to be challenged.
+ """
+ channel: NotRequired[Literal["sms", "whatsapp"]]
+
+
+class MFAChallengeAndVerifyParams(TypedDict):
+ factor_id: str
+ """
+ ID of the factor being verified.
+ """
+ code: str
+ """
+ Verification code provided by the user.
+ """
+
+
+class AuthMFAVerifyResponse(BaseModel):
+ access_token: str
+ """
+ New access token (JWT) after successful verification.
+ """
+ token_type: str
+ """
+ Type of token, typically `Bearer`.
+ """
+ expires_in: int
+ """
+ Number of seconds in which the access token will expire.
+ """
+ refresh_token: str
+ """
+ Refresh token you can use to obtain new access tokens when expired.
+ """
+ user: User
+ """
+ Updated user profile.
+ """
+
+
+class AuthMFAEnrollResponseTotp(BaseModel):
+ qr_code: str
+ """
+ Contains a QR code encoding the authenticator URI. You can
+ convert it to a URL by prepending `data:image/svg+xml;utf-8,` to
+ the value. Avoid logging this value to the console.
+ """
+ secret: str
+ """
+ The TOTP secret (also encoded in the QR code). Show this secret
+ in a password-style field to the user, in case they are unable to
+ scan the QR code. Avoid logging this value to the console.
+ """
+ uri: str
+ """
+ The authenticator URI encoded within the QR code, should you need
+ to use it. Avoid loggin this value to the console.
+ """
+
+
+class AuthMFAEnrollResponse(BaseModel):
+ id: str
+ """
+ ID of the factor that was just enrolled (in an unverified state).
+ """
+ type: Literal["totp", "phone"]
+ """
+ Type of MFA factor. Only `totp` supported for now.
+ """
+ totp: AuthMFAEnrollResponseTotp
+ """
+ TOTP enrollment information.
+ """
+ model_config = ConfigDict(arbitrary_types_allowed=True)
+ friendly_name: str
+ """
+ Friendly name of the factor, useful for distinguishing between factors
+ """
+ phone: str
+ """
+ Phone number of the MFA factor in E.164 format. Used to send messages
+ """
+
+
+class AuthMFAUnenrollResponse(BaseModel):
+ id: str
+ """
+ ID of the factor that was successfully unenrolled.
+ """
+
+
+class AuthMFAChallengeResponse(BaseModel):
+ id: str
+ """
+ ID of the newly created challenge.
+ """
+ expires_at: int
+ """
+ Timestamp in UNIX seconds when this challenge will no longer be usable.
+ """
+ factor_type: Literal["totp", "phone"]
+ """
+ Factor Type which generated the challenge
+ """
+
+
+class AuthMFAListFactorsResponse(BaseModel):
+ all: List[Factor]
+ """
+ All available factors (verified and unverified).
+ """
+ totp: List[Factor]
+ """
+ Only verified TOTP factors. (A subset of `all`.)
+ """
+ phone: List[Factor]
+ """
+ Only verified Phone factors. (A subset of `all`.)
+ """
+
+
+AuthenticatorAssuranceLevels = Literal["aal1", "aal2"]
+
+
+class AuthMFAGetAuthenticatorAssuranceLevelResponse(BaseModel):
+ current_level: Optional[AuthenticatorAssuranceLevels] = None
+ """
+ Current AAL level of the session.
+ """
+ next_level: Optional[AuthenticatorAssuranceLevels] = None
+ """
+ Next possible AAL level for the session. If the next level is higher
+ than the current one, the user should go through MFA.
+ """
+ current_authentication_methods: List[AMREntry]
+ """
+ A list of all authentication methods attached to this session. Use
+ the information here to detect the last time a user verified a
+ factor, for example if implementing a step-up scenario.
+ """
+
+
+class AuthMFAAdminDeleteFactorResponse(BaseModel):
+ id: str
+ """
+ ID of the factor that was successfully deleted.
+ """
+
+
+class AuthMFAAdminDeleteFactorParams(TypedDict):
+ id: str
+ """
+ ID of the MFA factor to delete.
+ """
+ user_id: str
+ """
+ ID of the user whose factor is being deleted.
+ """
+
+
+class AuthMFAAdminListFactorsResponse(BaseModel):
+ factors: List[Factor]
+ """
+ All factors attached to the user.
+ """
+
+
+class AuthMFAAdminListFactorsParams(TypedDict):
+ user_id: str
+ """
+ ID of the user for which to list all MFA factors.
+ """
+
+
+class GenerateLinkProperties(BaseModel):
+ """
+ The properties related to the email link generated.
+ """
+
+ action_link: str
+ """
+ The email link to send to the user. The action_link follows the following format:
+
+ auth/v1/verify?type={verification_type}&token={hashed_token}&redirect_to={redirect_to}
+ """
+ email_otp: str
+ """
+ The raw email OTP.
+ You should send this in the email if you want your users to verify using an
+ OTP instead of the action link.
+ """
+ hashed_token: str
+ """
+ The hashed token appended to the action link.
+ """
+ redirect_to: str
+ """
+ The URL appended to the action link.
+ """
+ verification_type: GenerateLinkType
+ """
+ The verification type that the email link is associated to.
+ """
+
+
+class GenerateLinkResponse(BaseModel):
+ properties: GenerateLinkProperties
+ user: User
+
+
+class DecodedJWTDict(TypedDict):
+ exp: NotRequired[int]
+ aal: NotRequired[Optional[AuthenticatorAssuranceLevels]]
+ amr: NotRequired[Optional[List[AMREntry]]]
+
+
+SignOutScope = Literal["global", "local", "others"]
+
+
+class SignOutOptions(TypedDict):
+ scope: NotRequired[SignOutScope]
+
+
+for model in [
+ AMREntry,
+ AuthResponse,
+ OAuthResponse,
+ UserResponse,
+ Session,
+ UserIdentity,
+ Factor,
+ User,
+ Subscription,
+ AuthMFAVerifyResponse,
+ AuthMFAEnrollResponseTotp,
+ AuthMFAEnrollResponse,
+ AuthMFAUnenrollResponse,
+ AuthMFAChallengeResponse,
+ AuthMFAListFactorsResponse,
+ AuthMFAGetAuthenticatorAssuranceLevelResponse,
+ AuthMFAAdminDeleteFactorResponse,
+ AuthMFAAdminListFactorsResponse,
+ GenerateLinkProperties,
+]:
+ try:
+ # pydantic > 2
+ model.model_rebuild()
+ except AttributeError:
+ # pydantic < 2
+ model.update_forward_refs()
diff --git a/.venv/lib/python3.12/site-packages/gotrue/version.py b/.venv/lib/python3.12/site-packages/gotrue/version.py
new file mode 100644
index 00000000..91b91220
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/gotrue/version.py
@@ -0,0 +1 @@
+__version__ = "2.11.4" # {x-release-please-version}