about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/oauth2/challenges.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/oauth2/challenges.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/challenges.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py b/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py
new file mode 100644
index 00000000..6468498b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py
@@ -0,0 +1,281 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Challenges for reauthentication.
+"""
+
+import abc
+import base64
+import getpass
+import sys
+
+from google.auth import _helpers
+from google.auth import exceptions
+from google.oauth2 import webauthn_handler_factory
+from google.oauth2.webauthn_types import (
+    AuthenticationExtensionsClientInputs,
+    GetRequest,
+    PublicKeyCredentialDescriptor,
+)
+
+
+REAUTH_ORIGIN = "https://accounts.google.com"
+SAML_CHALLENGE_MESSAGE = (
+    "Please run `gcloud auth login` to complete reauthentication with SAML."
+)
+WEBAUTHN_TIMEOUT_MS = 120000  # Two minute timeout
+
+
+def get_user_password(text):
+    """Get password from user.
+
+    Override this function with a different logic if you are using this library
+    outside a CLI.
+
+    Args:
+        text (str): message for the password prompt.
+
+    Returns:
+        str: password string.
+    """
+    return getpass.getpass(text)
+
+
+class ReauthChallenge(metaclass=abc.ABCMeta):
+    """Base class for reauth challenges."""
+
+    @property
+    @abc.abstractmethod
+    def name(self):  # pragma: NO COVER
+        """Returns the name of the challenge."""
+        raise NotImplementedError("name property must be implemented")
+
+    @property
+    @abc.abstractmethod
+    def is_locally_eligible(self):  # pragma: NO COVER
+        """Returns true if a challenge is supported locally on this machine."""
+        raise NotImplementedError("is_locally_eligible property must be implemented")
+
+    @abc.abstractmethod
+    def obtain_challenge_input(self, metadata):  # pragma: NO COVER
+        """Performs logic required to obtain credentials and returns it.
+
+        Args:
+            metadata (Mapping): challenge metadata returned in the 'challenges' field in
+                the initial reauth request. Includes the 'challengeType' field
+                and other challenge-specific fields.
+
+        Returns:
+            response that will be send to the reauth service as the content of
+            the 'proposalResponse' field in the request body. Usually a dict
+            with the keys specific to the challenge. For example,
+            ``{'credential': password}`` for password challenge.
+        """
+        raise NotImplementedError("obtain_challenge_input method must be implemented")
+
+
+class PasswordChallenge(ReauthChallenge):
+    """Challenge that asks for user's password."""
+
+    @property
+    def name(self):
+        return "PASSWORD"
+
+    @property
+    def is_locally_eligible(self):
+        return True
+
+    @_helpers.copy_docstring(ReauthChallenge)
+    def obtain_challenge_input(self, unused_metadata):
+        passwd = get_user_password("Please enter your password:")
+        if not passwd:
+            passwd = " "  # avoid the server crashing in case of no password :D
+        return {"credential": passwd}
+
+
+class SecurityKeyChallenge(ReauthChallenge):
+    """Challenge that asks for user's security key touch."""
+
+    @property
+    def name(self):
+        return "SECURITY_KEY"
+
+    @property
+    def is_locally_eligible(self):
+        return True
+
+    @_helpers.copy_docstring(ReauthChallenge)
+    def obtain_challenge_input(self, metadata):
+        # Check if there is an available Webauthn Handler, if not use pyu2f
+        try:
+            factory = webauthn_handler_factory.WebauthnHandlerFactory()
+            webauthn_handler = factory.get_handler()
+            if webauthn_handler is not None:
+                sys.stderr.write("Please insert and touch your security key\n")
+                return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
+        except Exception:
+            # Attempt pyu2f if exception in webauthn flow
+            pass
+
+        try:
+            import pyu2f.convenience.authenticator  # type: ignore
+            import pyu2f.errors  # type: ignore
+            import pyu2f.model  # type: ignore
+        except ImportError:
+            raise exceptions.ReauthFailError(
+                "pyu2f dependency is required to use Security key reauth feature. "
+                "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`."
+            )
+        sk = metadata["securityKey"]
+        challenges = sk["challenges"]
+        # Read both 'applicationId' and 'relyingPartyId', if they are the same, use
+        # applicationId, if they are different, use relyingPartyId first and retry
+        # with applicationId
+        application_id = sk["applicationId"]
+        relying_party_id = sk["relyingPartyId"]
+
+        if application_id != relying_party_id:
+            application_parameters = [relying_party_id, application_id]
+        else:
+            application_parameters = [application_id]
+
+        challenge_data = []
+        for c in challenges:
+            kh = c["keyHandle"].encode("ascii")
+            key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh)))
+            challenge = c["challenge"].encode("ascii")
+            challenge = base64.urlsafe_b64decode(challenge)
+            challenge_data.append({"key": key, "challenge": challenge})
+
+        # Track number of tries to suppress error message until all application_parameters
+        # are tried.
+        tries = 0
+        for app_id in application_parameters:
+            try:
+                tries += 1
+                api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator(
+                    REAUTH_ORIGIN
+                )
+                response = api.Authenticate(
+                    app_id, challenge_data, print_callback=sys.stderr.write
+                )
+                return {"securityKey": response}
+            except pyu2f.errors.U2FError as e:
+                if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE:
+                    # Only show error if all app_ids have been tried
+                    if tries == len(application_parameters):
+                        sys.stderr.write("Ineligible security key.\n")
+                        return None
+                    continue
+                if e.code == pyu2f.errors.U2FError.TIMEOUT:
+                    sys.stderr.write(
+                        "Timed out while waiting for security key touch.\n"
+                    )
+                else:
+                    raise e
+            except pyu2f.errors.PluginError as e:
+                sys.stderr.write("Plugin error: {}.\n".format(e))
+                continue
+            except pyu2f.errors.NoDeviceFoundError:
+                sys.stderr.write("No security key found.\n")
+            return None
+
+    def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
+        sk = metadata.get("securityKey")
+        if sk is None:
+            raise exceptions.InvalidValue("securityKey is None")
+        challenges = sk.get("challenges")
+        application_id = sk.get("applicationId")
+        relying_party_id = sk.get("relyingPartyId")
+        if challenges is None or len(challenges) < 1:
+            raise exceptions.InvalidValue("challenges is None or empty")
+        if application_id is None:
+            raise exceptions.InvalidValue("application_id is None")
+        if relying_party_id is None:
+            raise exceptions.InvalidValue("relying_party_id is None")
+
+        allow_credentials = []
+        for challenge in challenges:
+            kh = challenge.get("keyHandle")
+            if kh is None:
+                raise exceptions.InvalidValue("keyHandle is None")
+            key_handle = self._unpadded_urlsafe_b64recode(kh)
+            allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle))
+
+        extension = AuthenticationExtensionsClientInputs(appid=application_id)
+
+        challenge = challenges[0].get("challenge")
+        if challenge is None:
+            raise exceptions.InvalidValue("challenge is None")
+
+        get_request = GetRequest(
+            origin=REAUTH_ORIGIN,
+            rpid=relying_party_id,
+            challenge=self._unpadded_urlsafe_b64recode(challenge),
+            timeout_ms=WEBAUTHN_TIMEOUT_MS,
+            allow_credentials=allow_credentials,
+            user_verification="required",
+            extensions=extension,
+        )
+
+        try:
+            get_response = webauthn_handler.get(get_request)
+        except Exception as e:
+            sys.stderr.write("Webauthn Error: {}.\n".format(e))
+            raise e
+
+        response = {
+            "clientData": get_response.response.client_data_json,
+            "authenticatorData": get_response.response.authenticator_data,
+            "signatureData": get_response.response.signature,
+            "applicationId": application_id,
+            "keyHandle": get_response.id,
+            "securityKeyReplyType": 2,
+        }
+        return {"securityKey": response}
+
+    def _unpadded_urlsafe_b64recode(self, s):
+        """Converts standard b64 encoded string to url safe b64 encoded string
+        with no padding."""
+        b = base64.urlsafe_b64decode(s)
+        return base64.urlsafe_b64encode(b).decode().rstrip("=")
+
+
+class SamlChallenge(ReauthChallenge):
+    """Challenge that asks the users to browse to their ID Providers.
+
+    Currently SAML challenge is not supported. When obtaining the challenge
+    input, exception will be raised to instruct the users to run
+    `gcloud auth login` for reauthentication.
+    """
+
+    @property
+    def name(self):
+        return "SAML"
+
+    @property
+    def is_locally_eligible(self):
+        return True
+
+    def obtain_challenge_input(self, metadata):
+        # Magic Arch has not fully supported returning a proper dedirect URL
+        # for programmatic SAML users today. So we error our here and request
+        # users to use gcloud to complete a login.
+        raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE)
+
+
+AVAILABLE_CHALLENGES = {
+    challenge.name: challenge
+    for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()]
+}