aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/oauth2
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/oauth2
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/oauth2')
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/__init__.py36
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/_client.py508
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/_client_async.py286
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/_credentials_async.py118
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/_id_token_async.py285
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/_reauth_async.py328
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/_service_account_async.py132
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/challenges.py281
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/credentials.py614
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py251
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/id_token.py358
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/py.typed2
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/reauth.py369
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/service_account.py847
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/sts.py176
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/utils.py168
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py82
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler_factory.py16
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/webauthn_types.py156
19 files changed, 5013 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/__init__.py b/.venv/lib/python3.12/site-packages/google/oauth2/__init__.py
new file mode 100644
index 00000000..accae965
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/__init__.py
@@ -0,0 +1,36 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google OAuth 2.0 Library for Python."""
+
+import sys
+import warnings
+
+
+class Python37DeprecationWarning(DeprecationWarning): # pragma: NO COVER
+ """
+ Deprecation warning raised when Python 3.7 runtime is detected.
+ Python 3.7 support will be dropped after January 1, 2024.
+ """
+
+ pass
+
+
+# Checks if the current runtime is Python 3.7.
+if sys.version_info.major == 3 and sys.version_info.minor == 7: # pragma: NO COVER
+ message = (
+ "After January 1, 2024, new releases of this library will drop support "
+ "for Python 3.7."
+ )
+ warnings.warn(message, Python37DeprecationWarning)
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_client.py b/.venv/lib/python3.12/site-packages/google/oauth2/_client.py
new file mode 100644
index 00000000..5a9fc350
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/_client.py
@@ -0,0 +1,508 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""OAuth 2.0 client.
+
+This is a client for interacting with an OAuth 2.0 authorization server's
+token endpoint.
+
+For more information about the token endpoint, see
+`Section 3.1 of rfc6749`_
+
+.. _Section 3.1 of rfc6749: https://tools.ietf.org/html/rfc6749#section-3.2
+"""
+
+import datetime
+import http.client as http_client
+import json
+import urllib
+
+from google.auth import _exponential_backoff
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import jwt
+from google.auth import metrics
+from google.auth import transport
+
+_URLENCODED_CONTENT_TYPE = "application/x-www-form-urlencoded"
+_JSON_CONTENT_TYPE = "application/json"
+_JWT_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer"
+_REFRESH_GRANT_TYPE = "refresh_token"
+
+
+def _handle_error_response(response_data, retryable_error):
+ """Translates an error response into an exception.
+
+ Args:
+ response_data (Mapping | str): The decoded response data.
+ retryable_error Optional[bool]: A boolean indicating if an error is retryable.
+ Defaults to False.
+
+ Raises:
+ google.auth.exceptions.RefreshError: The errors contained in response_data.
+ """
+
+ retryable_error = retryable_error if retryable_error else False
+
+ if isinstance(response_data, str):
+ raise exceptions.RefreshError(response_data, retryable=retryable_error)
+ try:
+ error_details = "{}: {}".format(
+ response_data["error"], response_data.get("error_description")
+ )
+ # If no details could be extracted, use the response data.
+ except (KeyError, ValueError):
+ error_details = json.dumps(response_data)
+
+ raise exceptions.RefreshError(
+ error_details, response_data, retryable=retryable_error
+ )
+
+
+def _can_retry(status_code, response_data):
+ """Checks if a request can be retried by inspecting the status code
+ and response body of the request.
+
+ Args:
+ status_code (int): The response status code.
+ response_data (Mapping | str): The decoded response data.
+
+ Returns:
+ bool: True if the response is retryable. False otherwise.
+ """
+ if status_code in transport.DEFAULT_RETRYABLE_STATUS_CODES:
+ return True
+
+ try:
+ # For a failed response, response_body could be a string
+ error_desc = response_data.get("error_description") or ""
+ error_code = response_data.get("error") or ""
+
+ if not isinstance(error_code, str) or not isinstance(error_desc, str):
+ return False
+
+ # Per Oauth 2.0 RFC https://www.rfc-editor.org/rfc/rfc6749.html#section-4.1.2.1
+ # This is needed because a redirect will not return a 500 status code.
+ retryable_error_descriptions = {
+ "internal_failure",
+ "server_error",
+ "temporarily_unavailable",
+ }
+
+ if any(e in retryable_error_descriptions for e in (error_code, error_desc)):
+ return True
+
+ except AttributeError:
+ pass
+
+ return False
+
+
+def _parse_expiry(response_data):
+ """Parses the expiry field from a response into a datetime.
+
+ Args:
+ response_data (Mapping): The JSON-parsed response data.
+
+ Returns:
+ Optional[datetime]: The expiration or ``None`` if no expiration was
+ specified.
+ """
+ expires_in = response_data.get("expires_in", None)
+
+ if expires_in is not None:
+ # Some services do not respect the OAUTH2.0 RFC and send expires_in as a
+ # JSON String.
+ if isinstance(expires_in, str):
+ expires_in = int(expires_in)
+
+ return _helpers.utcnow() + datetime.timedelta(seconds=expires_in)
+ else:
+ return None
+
+
+def _token_endpoint_request_no_throw(
+ request,
+ token_uri,
+ body,
+ access_token=None,
+ use_json=False,
+ can_retry=True,
+ headers=None,
+ **kwargs
+):
+ """Makes a request to the OAuth 2.0 authorization server's token endpoint.
+ This function doesn't throw on response errors.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ body (Mapping[str, str]): The parameters to send in the request body.
+ access_token (Optional(str)): The access token needed to make the request.
+ use_json (Optional(bool)): Use urlencoded format or json format for the
+ content type. The default value is False.
+ can_retry (bool): Enable or disable request retry behavior.
+ headers (Optional[Mapping[str, str]]): The headers for the request.
+ kwargs: Additional arguments passed on to the request method. The
+ kwargs will be passed to `requests.request` method, see:
+ https://docs.python-requests.org/en/latest/api/#requests.request.
+ For example, you can use `cert=("cert_pem_path", "key_pem_path")`
+ to set up client side SSL certificate, and use
+ `verify="ca_bundle_path"` to set up the CA certificates for sever
+ side SSL certificate verification.
+
+ Returns:
+ Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating
+ if the request is successful, a mapping for the JSON-decoded response
+ data and in the case of an error a boolean indicating if the error
+ is retryable.
+ """
+ if use_json:
+ headers_to_use = {"Content-Type": _JSON_CONTENT_TYPE}
+ body = json.dumps(body).encode("utf-8")
+ else:
+ headers_to_use = {"Content-Type": _URLENCODED_CONTENT_TYPE}
+ body = urllib.parse.urlencode(body).encode("utf-8")
+
+ if access_token:
+ headers_to_use["Authorization"] = "Bearer {}".format(access_token)
+
+ if headers:
+ headers_to_use.update(headers)
+
+ response_data = {}
+ retryable_error = False
+
+ retries = _exponential_backoff.ExponentialBackoff()
+ for _ in retries:
+ response = request(
+ method="POST", url=token_uri, headers=headers_to_use, body=body, **kwargs
+ )
+ response_body = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+
+ try:
+ # response_body should be a JSON
+ response_data = json.loads(response_body)
+ except ValueError:
+ response_data = response_body
+
+ if response.status == http_client.OK:
+ return True, response_data, None
+
+ retryable_error = _can_retry(
+ status_code=response.status, response_data=response_data
+ )
+
+ if not can_retry or not retryable_error:
+ return False, response_data, retryable_error
+
+ return False, response_data, retryable_error
+
+
+def _token_endpoint_request(
+ request,
+ token_uri,
+ body,
+ access_token=None,
+ use_json=False,
+ can_retry=True,
+ headers=None,
+ **kwargs
+):
+ """Makes a request to the OAuth 2.0 authorization server's token endpoint.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ body (Mapping[str, str]): The parameters to send in the request body.
+ access_token (Optional(str)): The access token needed to make the request.
+ use_json (Optional(bool)): Use urlencoded format or json format for the
+ content type. The default value is False.
+ can_retry (bool): Enable or disable request retry behavior.
+ headers (Optional[Mapping[str, str]]): The headers for the request.
+ kwargs: Additional arguments passed on to the request method. The
+ kwargs will be passed to `requests.request` method, see:
+ https://docs.python-requests.org/en/latest/api/#requests.request.
+ For example, you can use `cert=("cert_pem_path", "key_pem_path")`
+ to set up client side SSL certificate, and use
+ `verify="ca_bundle_path"` to set up the CA certificates for sever
+ side SSL certificate verification.
+
+ Returns:
+ Mapping[str, str]: The JSON-decoded response data.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+
+ response_status_ok, response_data, retryable_error = _token_endpoint_request_no_throw(
+ request,
+ token_uri,
+ body,
+ access_token=access_token,
+ use_json=use_json,
+ can_retry=can_retry,
+ headers=headers,
+ **kwargs
+ )
+ if not response_status_ok:
+ _handle_error_response(response_data, retryable_error)
+ return response_data
+
+
+def jwt_grant(request, token_uri, assertion, can_retry=True):
+ """Implements the JWT Profile for OAuth 2.0 Authorization Grants.
+
+ For more details, see `rfc7523 section 4`_.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ assertion (str): The OAuth 2.0 assertion.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Tuple[str, Optional[datetime], Mapping[str, str]]: The access token,
+ expiration, and additional data returned by the token endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+
+ .. _rfc7523 section 4: https://tools.ietf.org/html/rfc7523#section-4
+ """
+ body = {"assertion": assertion, "grant_type": _JWT_GRANT_TYPE}
+
+ response_data = _token_endpoint_request(
+ request,
+ token_uri,
+ body,
+ can_retry=can_retry,
+ headers={
+ metrics.API_CLIENT_HEADER: metrics.token_request_access_token_sa_assertion()
+ },
+ )
+
+ try:
+ access_token = response_data["access_token"]
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "No access token in response.", response_data, retryable=False
+ )
+ raise new_exc from caught_exc
+
+ expiry = _parse_expiry(response_data)
+
+ return access_token, expiry, response_data
+
+
+def call_iam_generate_id_token_endpoint(
+ request,
+ iam_id_token_endpoint,
+ signer_email,
+ audience,
+ access_token,
+ universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+):
+ """Call iam.generateIdToken endpoint to get ID token.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ iam_id_token_endpoint (str): The IAM ID token endpoint to use.
+ signer_email (str): The signer email used to form the IAM
+ generateIdToken endpoint.
+ audience (str): The audience for the ID token.
+ access_token (str): The access token used to call the IAM endpoint.
+
+ Returns:
+ Tuple[str, datetime]: The ID token and expiration.
+ """
+ body = {"audience": audience, "includeEmail": "true", "useEmailAzp": "true"}
+
+ response_data = _token_endpoint_request(
+ request,
+ iam_id_token_endpoint.replace(
+ credentials.DEFAULT_UNIVERSE_DOMAIN, universe_domain
+ ).format(signer_email),
+ body,
+ access_token=access_token,
+ use_json=True,
+ )
+
+ try:
+ id_token = response_data["token"]
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "No ID token in response.", response_data, retryable=False
+ )
+ raise new_exc from caught_exc
+
+ payload = jwt.decode(id_token, verify=False)
+ expiry = datetime.datetime.utcfromtimestamp(payload["exp"])
+
+ return id_token, expiry
+
+
+def id_token_jwt_grant(request, token_uri, assertion, can_retry=True):
+ """Implements the JWT Profile for OAuth 2.0 Authorization Grants, but
+ requests an OpenID Connect ID Token instead of an access token.
+
+ This is a variant on the standard JWT Profile that is currently unique
+ to Google. This was added for the benefit of authenticating to services
+ that require ID Tokens instead of access tokens or JWT bearer tokens.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorization server's token endpoint
+ URI.
+ assertion (str): JWT token signed by a service account. The token's
+ payload must include a ``target_audience`` claim.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Tuple[str, Optional[datetime], Mapping[str, str]]:
+ The (encoded) Open ID Connect ID Token, expiration, and additional
+ data returned by the endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+ body = {"assertion": assertion, "grant_type": _JWT_GRANT_TYPE}
+
+ response_data = _token_endpoint_request(
+ request,
+ token_uri,
+ body,
+ can_retry=can_retry,
+ headers={
+ metrics.API_CLIENT_HEADER: metrics.token_request_id_token_sa_assertion()
+ },
+ )
+
+ try:
+ id_token = response_data["id_token"]
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "No ID token in response.", response_data, retryable=False
+ )
+ raise new_exc from caught_exc
+
+ payload = jwt.decode(id_token, verify=False)
+ expiry = datetime.datetime.utcfromtimestamp(payload["exp"])
+
+ return id_token, expiry, response_data
+
+
+def _handle_refresh_grant_response(response_data, refresh_token):
+ """Extract tokens from refresh grant response.
+
+ Args:
+ response_data (Mapping[str, str]): Refresh grant response data.
+ refresh_token (str): Current refresh token.
+
+ Returns:
+ Tuple[str, str, Optional[datetime], Mapping[str, str]]: The access token,
+ refresh token, expiration, and additional data returned by the token
+ endpoint. If response_data doesn't have refresh token, then the current
+ refresh token will be returned.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+ try:
+ access_token = response_data["access_token"]
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "No access token in response.", response_data, retryable=False
+ )
+ raise new_exc from caught_exc
+
+ refresh_token = response_data.get("refresh_token", refresh_token)
+ expiry = _parse_expiry(response_data)
+
+ return access_token, refresh_token, expiry, response_data
+
+
+def refresh_grant(
+ request,
+ token_uri,
+ refresh_token,
+ client_id,
+ client_secret,
+ scopes=None,
+ rapt_token=None,
+ can_retry=True,
+):
+ """Implements the OAuth 2.0 refresh token grant.
+
+ For more details, see `rfc678 section 6`_.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ refresh_token (str): The refresh token to use to get a new access
+ token.
+ client_id (str): The OAuth 2.0 application's client ID.
+ client_secret (str): The Oauth 2.0 appliaction's client secret.
+ scopes (Optional(Sequence[str])): Scopes to request. If present, all
+ scopes must be authorized for the refresh token. Useful if refresh
+ token has a wild card scope (e.g.
+ 'https://www.googleapis.com/auth/any-api').
+ rapt_token (Optional(str)): The reauth Proof Token.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Tuple[str, str, Optional[datetime], Mapping[str, str]]: The access
+ token, new or current refresh token, expiration, and additional data
+ returned by the token endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+
+ .. _rfc6748 section 6: https://tools.ietf.org/html/rfc6749#section-6
+ """
+ body = {
+ "grant_type": _REFRESH_GRANT_TYPE,
+ "client_id": client_id,
+ "client_secret": client_secret,
+ "refresh_token": refresh_token,
+ }
+ if scopes:
+ body["scope"] = " ".join(scopes)
+ if rapt_token:
+ body["rapt"] = rapt_token
+
+ response_data = _token_endpoint_request(
+ request, token_uri, body, can_retry=can_retry
+ )
+ return _handle_refresh_grant_response(response_data, refresh_token)
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_client_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_client_async.py
new file mode 100644
index 00000000..8867f0a5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/_client_async.py
@@ -0,0 +1,286 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""OAuth 2.0 async client.
+
+This is a client for interacting with an OAuth 2.0 authorization server's
+token endpoint.
+
+For more information about the token endpoint, see
+`Section 3.1 of rfc6749`_
+
+.. _Section 3.1 of rfc6749: https://tools.ietf.org/html/rfc6749#section-3.2
+"""
+
+import datetime
+import http.client as http_client
+import json
+import urllib
+
+from google.auth import _exponential_backoff
+from google.auth import exceptions
+from google.auth import jwt
+from google.oauth2 import _client as client
+
+
+async def _token_endpoint_request_no_throw(
+ request, token_uri, body, access_token=None, use_json=False, can_retry=True
+):
+ """Makes a request to the OAuth 2.0 authorization server's token endpoint.
+ This function doesn't throw on response errors.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ body (Mapping[str, str]): The parameters to send in the request body.
+ access_token (Optional(str)): The access token needed to make the request.
+ use_json (Optional(bool)): Use urlencoded format or json format for the
+ content type. The default value is False.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating
+ if the request is successful, a mapping for the JSON-decoded response
+ data and in the case of an error a boolean indicating if the error
+ is retryable.
+ """
+ if use_json:
+ headers = {"Content-Type": client._JSON_CONTENT_TYPE}
+ body = json.dumps(body).encode("utf-8")
+ else:
+ headers = {"Content-Type": client._URLENCODED_CONTENT_TYPE}
+ body = urllib.parse.urlencode(body).encode("utf-8")
+
+ if access_token:
+ headers["Authorization"] = "Bearer {}".format(access_token)
+
+ response_data = {}
+ retryable_error = False
+
+ retries = _exponential_backoff.ExponentialBackoff()
+ for _ in retries:
+ response = await request(
+ method="POST", url=token_uri, headers=headers, body=body
+ )
+
+ # Using data.read() resulted in zlib decompression errors. This may require future investigation.
+ response_body1 = await response.content()
+
+ response_body = (
+ response_body1.decode("utf-8")
+ if hasattr(response_body1, "decode")
+ else response_body1
+ )
+
+ try:
+ response_data = json.loads(response_body)
+ except ValueError:
+ response_data = response_body
+
+ if response.status == http_client.OK:
+ return True, response_data, None
+
+ retryable_error = client._can_retry(
+ status_code=response.status, response_data=response_data
+ )
+
+ if not can_retry or not retryable_error:
+ return False, response_data, retryable_error
+
+ return False, response_data, retryable_error
+
+
+async def _token_endpoint_request(
+ request, token_uri, body, access_token=None, use_json=False, can_retry=True
+):
+ """Makes a request to the OAuth 2.0 authorization server's token endpoint.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ body (Mapping[str, str]): The parameters to send in the request body.
+ access_token (Optional(str)): The access token needed to make the request.
+ use_json (Optional(bool)): Use urlencoded format or json format for the
+ content type. The default value is False.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Mapping[str, str]: The JSON-decoded response data.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+
+ response_status_ok, response_data, retryable_error = await _token_endpoint_request_no_throw(
+ request,
+ token_uri,
+ body,
+ access_token=access_token,
+ use_json=use_json,
+ can_retry=can_retry,
+ )
+ if not response_status_ok:
+ client._handle_error_response(response_data, retryable_error)
+ return response_data
+
+
+async def jwt_grant(request, token_uri, assertion, can_retry=True):
+ """Implements the JWT Profile for OAuth 2.0 Authorization Grants.
+
+ For more details, see `rfc7523 section 4`_.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ assertion (str): The OAuth 2.0 assertion.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Tuple[str, Optional[datetime], Mapping[str, str]]: The access token,
+ expiration, and additional data returned by the token endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+
+ .. _rfc7523 section 4: https://tools.ietf.org/html/rfc7523#section-4
+ """
+ body = {"assertion": assertion, "grant_type": client._JWT_GRANT_TYPE}
+
+ response_data = await _token_endpoint_request(
+ request, token_uri, body, can_retry=can_retry
+ )
+
+ try:
+ access_token = response_data["access_token"]
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "No access token in response.", response_data, retryable=False
+ )
+ raise new_exc from caught_exc
+
+ expiry = client._parse_expiry(response_data)
+
+ return access_token, expiry, response_data
+
+
+async def id_token_jwt_grant(request, token_uri, assertion, can_retry=True):
+ """Implements the JWT Profile for OAuth 2.0 Authorization Grants, but
+ requests an OpenID Connect ID Token instead of an access token.
+
+ This is a variant on the standard JWT Profile that is currently unique
+ to Google. This was added for the benefit of authenticating to services
+ that require ID Tokens instead of access tokens or JWT bearer tokens.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorization server's token endpoint
+ URI.
+ assertion (str): JWT token signed by a service account. The token's
+ payload must include a ``target_audience`` claim.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Tuple[str, Optional[datetime], Mapping[str, str]]:
+ The (encoded) Open ID Connect ID Token, expiration, and additional
+ data returned by the endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+ body = {"assertion": assertion, "grant_type": client._JWT_GRANT_TYPE}
+
+ response_data = await _token_endpoint_request(
+ request, token_uri, body, can_retry=can_retry
+ )
+
+ try:
+ id_token = response_data["id_token"]
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "No ID token in response.", response_data, retryable=False
+ )
+ raise new_exc from caught_exc
+
+ payload = jwt.decode(id_token, verify=False)
+ expiry = datetime.datetime.utcfromtimestamp(payload["exp"])
+
+ return id_token, expiry, response_data
+
+
+async def refresh_grant(
+ request,
+ token_uri,
+ refresh_token,
+ client_id,
+ client_secret,
+ scopes=None,
+ rapt_token=None,
+ can_retry=True,
+):
+ """Implements the OAuth 2.0 refresh token grant.
+
+ For more details, see `rfc678 section 6`_.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ refresh_token (str): The refresh token to use to get a new access
+ token.
+ client_id (str): The OAuth 2.0 application's client ID.
+ client_secret (str): The Oauth 2.0 appliaction's client secret.
+ scopes (Optional(Sequence[str])): Scopes to request. If present, all
+ scopes must be authorized for the refresh token. Useful if refresh
+ token has a wild card scope (e.g.
+ 'https://www.googleapis.com/auth/any-api').
+ rapt_token (Optional(str)): The reauth Proof Token.
+ can_retry (bool): Enable or disable request retry behavior.
+
+ Returns:
+ Tuple[str, Optional[str], Optional[datetime], Mapping[str, str]]: The
+ access token, new or current refresh token, expiration, and additional data
+ returned by the token endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+
+ .. _rfc6748 section 6: https://tools.ietf.org/html/rfc6749#section-6
+ """
+ body = {
+ "grant_type": client._REFRESH_GRANT_TYPE,
+ "client_id": client_id,
+ "client_secret": client_secret,
+ "refresh_token": refresh_token,
+ }
+ if scopes:
+ body["scope"] = " ".join(scopes)
+ if rapt_token:
+ body["rapt"] = rapt_token
+
+ response_data = await _token_endpoint_request(
+ request, token_uri, body, can_retry=can_retry
+ )
+ return client._handle_refresh_grant_response(response_data, refresh_token)
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_credentials_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_credentials_async.py
new file mode 100644
index 00000000..b5561aae
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/_credentials_async.py
@@ -0,0 +1,118 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""OAuth 2.0 Async Credentials.
+
+This module provides credentials based on OAuth 2.0 access and refresh tokens.
+These credentials usually access resources on behalf of a user (resource
+owner).
+
+Specifically, this is intended to use access tokens acquired using the
+`Authorization Code grant`_ and can refresh those tokens using a
+optional `refresh token`_.
+
+Obtaining the initial access and refresh token is outside of the scope of this
+module. Consult `rfc6749 section 4.1`_ for complete details on the
+Authorization Code grant flow.
+
+.. _Authorization Code grant: https://tools.ietf.org/html/rfc6749#section-1.3.1
+.. _refresh token: https://tools.ietf.org/html/rfc6749#section-6
+.. _rfc6749 section 4.1: https://tools.ietf.org/html/rfc6749#section-4.1
+"""
+
+from google.auth import _credentials_async as credentials
+from google.auth import _helpers
+from google.auth import exceptions
+from google.oauth2 import _reauth_async as reauth
+from google.oauth2 import credentials as oauth2_credentials
+
+
+class Credentials(oauth2_credentials.Credentials):
+ """Credentials using OAuth 2.0 access and refresh tokens.
+
+ The credentials are considered immutable. If you want to modify the
+ quota project, use :meth:`with_quota_project` or ::
+
+ credentials = credentials.with_quota_project('myproject-123)
+ """
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ async def refresh(self, request):
+ if (
+ self._refresh_token is None
+ or self._token_uri is None
+ or self._client_id is None
+ or self._client_secret is None
+ ):
+ raise exceptions.RefreshError(
+ "The credentials do not contain the necessary fields need to "
+ "refresh the access token. You must specify refresh_token, "
+ "token_uri, client_id, and client_secret."
+ )
+
+ (
+ access_token,
+ refresh_token,
+ expiry,
+ grant_response,
+ rapt_token,
+ ) = await reauth.refresh_grant(
+ request,
+ self._token_uri,
+ self._refresh_token,
+ self._client_id,
+ self._client_secret,
+ scopes=self._scopes,
+ rapt_token=self._rapt_token,
+ enable_reauth_refresh=self._enable_reauth_refresh,
+ )
+
+ self.token = access_token
+ self.expiry = expiry
+ self._refresh_token = refresh_token
+ self._id_token = grant_response.get("id_token")
+ self._rapt_token = rapt_token
+
+ if self._scopes and "scope" in grant_response:
+ requested_scopes = frozenset(self._scopes)
+ granted_scopes = frozenset(grant_response["scope"].split())
+ scopes_requested_but_not_granted = requested_scopes - granted_scopes
+ if scopes_requested_but_not_granted:
+ raise exceptions.RefreshError(
+ "Not all requested scopes were granted by the "
+ "authorization server, missing scopes {}.".format(
+ ", ".join(scopes_requested_but_not_granted)
+ )
+ )
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ async def before_request(self, request, method, url, headers):
+ if not self.valid:
+ await self.refresh(request)
+ self.apply(headers)
+
+
+class UserAccessTokenCredentials(oauth2_credentials.UserAccessTokenCredentials):
+ """Access token credentials for user account.
+
+ Obtain the access token for a given user account or the current active
+ user account with the ``gcloud auth print-access-token`` command.
+
+ Args:
+ account (Optional[str]): Account to get the access token for. If not
+ specified, the current active account will be used.
+ quota_project_id (Optional[str]): The project ID used for quota
+ and billing.
+
+ """
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_id_token_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_id_token_async.py
new file mode 100644
index 00000000..6594e416
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/_id_token_async.py
@@ -0,0 +1,285 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google ID Token helpers.
+
+Provides support for verifying `OpenID Connect ID Tokens`_, especially ones
+generated by Google infrastructure.
+
+To parse and verify an ID Token issued by Google's OAuth 2.0 authorization
+server use :func:`verify_oauth2_token`. To verify an ID Token issued by
+Firebase, use :func:`verify_firebase_token`.
+
+A general purpose ID Token verifier is available as :func:`verify_token`.
+
+Example::
+
+ from google.oauth2 import _id_token_async
+ from google.auth.transport import aiohttp_requests
+
+ request = aiohttp_requests.Request()
+
+ id_info = await _id_token_async.verify_oauth2_token(
+ token, request, 'my-client-id.example.com')
+
+ if id_info['iss'] != 'https://accounts.google.com':
+ raise ValueError('Wrong issuer.')
+
+ userid = id_info['sub']
+
+By default, this will re-fetch certificates for each verification. Because
+Google's public keys are only changed infrequently (on the order of once per
+day), you may wish to take advantage of caching to reduce latency and the
+potential for network errors. This can be accomplished using an external
+library like `CacheControl`_ to create a cache-aware
+:class:`google.auth.transport.Request`::
+
+ import cachecontrol
+ import google.auth.transport.requests
+ import requests
+
+ session = requests.session()
+ cached_session = cachecontrol.CacheControl(session)
+ request = google.auth.transport.requests.Request(session=cached_session)
+
+.. _OpenID Connect ID Token:
+ http://openid.net/specs/openid-connect-core-1_0.html#IDToken
+.. _CacheControl: https://cachecontrol.readthedocs.io
+"""
+
+import http.client as http_client
+import json
+import os
+
+from google.auth import environment_vars
+from google.auth import exceptions
+from google.auth import jwt
+from google.auth.transport import requests
+from google.oauth2 import id_token as sync_id_token
+
+
+async def _fetch_certs(request, certs_url):
+ """Fetches certificates.
+
+ Google-style cerificate endpoints return JSON in the format of
+ ``{'key id': 'x509 certificate'}``.
+
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests. This must be an aiohttp request.
+ certs_url (str): The certificate endpoint URL.
+
+ Returns:
+ Mapping[str, str]: A mapping of public key ID to x.509 certificate
+ data.
+ """
+ response = await request(certs_url, method="GET")
+
+ if response.status != http_client.OK:
+ raise exceptions.TransportError(
+ "Could not fetch certificates at {}".format(certs_url)
+ )
+
+ data = await response.content()
+
+ return json.loads(data)
+
+
+async def verify_token(
+ id_token,
+ request,
+ audience=None,
+ certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL,
+ clock_skew_in_seconds=0,
+):
+ """Verifies an ID token and returns the decoded token.
+
+ Args:
+ id_token (Union[str, bytes]): The encoded token.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests. This must be an aiohttp request.
+ audience (str): The audience that this token is intended for. If None
+ then the audience is not verified.
+ certs_url (str): The URL that specifies the certificates to use to
+ verify the token. This URL should return JSON in the format of
+ ``{'key id': 'x509 certificate'}``.
+ clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
+ validation.
+
+ Returns:
+ Mapping[str, Any]: The decoded token.
+ """
+ certs = await _fetch_certs(request, certs_url)
+
+ return jwt.decode(
+ id_token,
+ certs=certs,
+ audience=audience,
+ clock_skew_in_seconds=clock_skew_in_seconds,
+ )
+
+
+async def verify_oauth2_token(
+ id_token, request, audience=None, clock_skew_in_seconds=0
+):
+ """Verifies an ID Token issued by Google's OAuth 2.0 authorization server.
+
+ Args:
+ id_token (Union[str, bytes]): The encoded token.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests. This must be an aiohttp request.
+ audience (str): The audience that this token is intended for. This is
+ typically your application's OAuth 2.0 client ID. If None then the
+ audience is not verified.
+ clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
+ validation.
+
+ Returns:
+ Mapping[str, Any]: The decoded token.
+
+ Raises:
+ exceptions.GoogleAuthError: If the issuer is invalid.
+ """
+ idinfo = await verify_token(
+ id_token,
+ request,
+ audience=audience,
+ certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL,
+ clock_skew_in_seconds=clock_skew_in_seconds,
+ )
+
+ if idinfo["iss"] not in sync_id_token._GOOGLE_ISSUERS:
+ raise exceptions.GoogleAuthError(
+ "Wrong issuer. 'iss' should be one of the following: {}".format(
+ sync_id_token._GOOGLE_ISSUERS
+ )
+ )
+
+ return idinfo
+
+
+async def verify_firebase_token(
+ id_token, request, audience=None, clock_skew_in_seconds=0
+):
+ """Verifies an ID Token issued by Firebase Authentication.
+
+ Args:
+ id_token (Union[str, bytes]): The encoded token.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests. This must be an aiohttp request.
+ audience (str): The audience that this token is intended for. This is
+ typically your Firebase application ID. If None then the audience
+ is not verified.
+ clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
+ validation.
+
+ Returns:
+ Mapping[str, Any]: The decoded token.
+ """
+ return await verify_token(
+ id_token,
+ request,
+ audience=audience,
+ certs_url=sync_id_token._GOOGLE_APIS_CERTS_URL,
+ clock_skew_in_seconds=clock_skew_in_seconds,
+ )
+
+
+async def fetch_id_token(request, audience):
+ """Fetch the ID Token from the current environment.
+
+ This function acquires ID token from the environment in the following order.
+ See https://google.aip.dev/auth/4110.
+
+ 1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set
+ to the path of a valid service account JSON file, then ID token is
+ acquired using this service account credentials.
+ 2. If the application is running in Compute Engine, App Engine or Cloud Run,
+ then the ID token are obtained from the metadata server.
+ 3. If metadata server doesn't exist and no valid service account credentials
+ are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will
+ be raised.
+
+ Example::
+
+ import google.oauth2._id_token_async
+ import google.auth.transport.aiohttp_requests
+
+ request = google.auth.transport.aiohttp_requests.Request()
+ target_audience = "https://pubsub.googleapis.com"
+
+ id_token = await google.oauth2._id_token_async.fetch_id_token(request, target_audience)
+
+ Args:
+ request (google.auth.transport.aiohttp_requests.Request): A callable used to make
+ HTTP requests.
+ audience (str): The audience that this ID token is intended for.
+
+ Returns:
+ str: The ID token.
+
+ Raises:
+ ~google.auth.exceptions.DefaultCredentialsError:
+ If metadata server doesn't exist and no valid service account
+ credentials are found.
+ """
+ # 1. Try to get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment
+ # variable.
+ credentials_filename = os.environ.get(environment_vars.CREDENTIALS)
+ if credentials_filename:
+ if not (
+ os.path.exists(credentials_filename)
+ and os.path.isfile(credentials_filename)
+ ):
+ raise exceptions.DefaultCredentialsError(
+ "GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
+ )
+
+ try:
+ with open(credentials_filename, "r") as f:
+ from google.oauth2 import _service_account_async as service_account
+
+ info = json.load(f)
+ if info.get("type") == "service_account":
+ credentials = service_account.IDTokenCredentials.from_service_account_info(
+ info, target_audience=audience
+ )
+ await credentials.refresh(request)
+ return credentials.token
+ except ValueError as caught_exc:
+ new_exc = exceptions.DefaultCredentialsError(
+ "GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.",
+ caught_exc,
+ )
+ raise new_exc from caught_exc
+
+ # 2. Try to fetch ID token from metada server if it exists. The code works
+ # for GAE and Cloud Run metadata server as well.
+ try:
+ from google.auth import compute_engine
+ from google.auth.compute_engine import _metadata
+
+ request_new = requests.Request()
+ if _metadata.ping(request_new):
+ credentials = compute_engine.IDTokenCredentials(
+ request_new, audience, use_metadata_identity_endpoint=True
+ )
+ credentials.refresh(request_new)
+ return credentials.token
+ except (ImportError, exceptions.TransportError):
+ pass
+
+ raise exceptions.DefaultCredentialsError(
+ "Neither metadata server or valid service account credentials are found."
+ )
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_reauth_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_reauth_async.py
new file mode 100644
index 00000000..de3675c5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/_reauth_async.py
@@ -0,0 +1,328 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A module that provides functions for handling rapt authentication.
+
+Reauth is a process of obtaining additional authentication (such as password,
+security token, etc.) while refreshing OAuth 2.0 credentials for a user.
+
+Credentials that use the Reauth flow must have the reauth scope,
+``https://www.googleapis.com/auth/accounts.reauth``.
+
+This module provides a high-level function for executing the Reauth process,
+:func:`refresh_grant`, and lower-level helpers for doing the individual
+steps of the reauth process.
+
+Those steps are:
+
+1. Obtaining a list of challenges from the reauth server.
+2. Running through each challenge and sending the result back to the reauth
+ server.
+3. Refreshing the access token using the returned rapt token.
+"""
+
+import sys
+
+from google.auth import exceptions
+from google.oauth2 import _client
+from google.oauth2 import _client_async
+from google.oauth2 import challenges
+from google.oauth2 import reauth
+
+
+async def _get_challenges(
+ request, supported_challenge_types, access_token, requested_scopes=None
+):
+ """Does initial request to reauth API to get the challenges.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests. This must be an aiohttp request.
+ supported_challenge_types (Sequence[str]): list of challenge names
+ supported by the manager.
+ access_token (str): Access token with reauth scopes.
+ requested_scopes (Optional(Sequence[str])): Authorized scopes for the credentials.
+
+ Returns:
+ dict: The response from the reauth API.
+ """
+ body = {"supportedChallengeTypes": supported_challenge_types}
+ if requested_scopes:
+ body["oauthScopesForDomainPolicyLookup"] = requested_scopes
+
+ return await _client_async._token_endpoint_request(
+ request,
+ reauth._REAUTH_API + ":start",
+ body,
+ access_token=access_token,
+ use_json=True,
+ )
+
+
+async def _send_challenge_result(
+ request, session_id, challenge_id, client_input, access_token
+):
+ """Attempt to refresh access token by sending next challenge result.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests. This must be an aiohttp request.
+ session_id (str): session id returned by the initial reauth call.
+ challenge_id (str): challenge id returned by the initial reauth call.
+ client_input: dict with a challenge-specific client input. For example:
+ ``{'credential': password}`` for password challenge.
+ access_token (str): Access token with reauth scopes.
+
+ Returns:
+ dict: The response from the reauth API.
+ """
+ body = {
+ "sessionId": session_id,
+ "challengeId": challenge_id,
+ "action": "RESPOND",
+ "proposalResponse": client_input,
+ }
+
+ return await _client_async._token_endpoint_request(
+ request,
+ reauth._REAUTH_API + "/{}:continue".format(session_id),
+ body,
+ access_token=access_token,
+ use_json=True,
+ )
+
+
+async def _run_next_challenge(msg, request, access_token):
+ """Get the next challenge from msg and run it.
+
+ Args:
+ msg (dict): Reauth API response body (either from the initial request to
+ https://reauth.googleapis.com/v2/sessions:start or from sending the
+ previous challenge response to
+ https://reauth.googleapis.com/v2/sessions/id:continue)
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests. This must be an aiohttp request.
+ access_token (str): reauth access token
+
+ Returns:
+ dict: The response from the reauth API.
+
+ Raises:
+ google.auth.exceptions.ReauthError: if reauth failed.
+ """
+ for challenge in msg["challenges"]:
+ if challenge["status"] != "READY":
+ # Skip non-activated challenges.
+ continue
+ c = challenges.AVAILABLE_CHALLENGES.get(challenge["challengeType"], None)
+ if not c:
+ raise exceptions.ReauthFailError(
+ "Unsupported challenge type {0}. Supported types: {1}".format(
+ challenge["challengeType"],
+ ",".join(list(challenges.AVAILABLE_CHALLENGES.keys())),
+ )
+ )
+ if not c.is_locally_eligible:
+ raise exceptions.ReauthFailError(
+ "Challenge {0} is not locally eligible".format(
+ challenge["challengeType"]
+ )
+ )
+ client_input = c.obtain_challenge_input(challenge)
+ if not client_input:
+ return None
+ return await _send_challenge_result(
+ request,
+ msg["sessionId"],
+ challenge["challengeId"],
+ client_input,
+ access_token,
+ )
+ return None
+
+
+async def _obtain_rapt(request, access_token, requested_scopes):
+ """Given an http request method and reauth access token, get rapt token.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests. This must be an aiohttp request.
+ access_token (str): reauth access token
+ requested_scopes (Sequence[str]): scopes required by the client application
+
+ Returns:
+ str: The rapt token.
+
+ Raises:
+ google.auth.exceptions.ReauthError: if reauth failed
+ """
+ msg = await _get_challenges(
+ request,
+ list(challenges.AVAILABLE_CHALLENGES.keys()),
+ access_token,
+ requested_scopes,
+ )
+
+ if msg["status"] == reauth._AUTHENTICATED:
+ return msg["encodedProofOfReauthToken"]
+
+ for _ in range(0, reauth.RUN_CHALLENGE_RETRY_LIMIT):
+ if not (
+ msg["status"] == reauth._CHALLENGE_REQUIRED
+ or msg["status"] == reauth._CHALLENGE_PENDING
+ ):
+ raise exceptions.ReauthFailError(
+ "Reauthentication challenge failed due to API error: {}".format(
+ msg["status"]
+ )
+ )
+
+ if not reauth.is_interactive():
+ raise exceptions.ReauthFailError(
+ "Reauthentication challenge could not be answered because you are not"
+ " in an interactive session."
+ )
+
+ msg = await _run_next_challenge(msg, request, access_token)
+
+ if msg["status"] == reauth._AUTHENTICATED:
+ return msg["encodedProofOfReauthToken"]
+
+ # If we got here it means we didn't get authenticated.
+ raise exceptions.ReauthFailError("Failed to obtain rapt token.")
+
+
+async def get_rapt_token(
+ request, client_id, client_secret, refresh_token, token_uri, scopes=None
+):
+ """Given an http request method and refresh_token, get rapt token.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests. This must be an aiohttp request.
+ client_id (str): client id to get access token for reauth scope.
+ client_secret (str): client secret for the client_id
+ refresh_token (str): refresh token to refresh access token
+ token_uri (str): uri to refresh access token
+ scopes (Optional(Sequence[str])): scopes required by the client application
+
+ Returns:
+ str: The rapt token.
+ Raises:
+ google.auth.exceptions.RefreshError: If reauth failed.
+ """
+ sys.stderr.write("Reauthentication required.\n")
+
+ # Get access token for reauth.
+ access_token, _, _, _ = await _client_async.refresh_grant(
+ request=request,
+ client_id=client_id,
+ client_secret=client_secret,
+ refresh_token=refresh_token,
+ token_uri=token_uri,
+ scopes=[reauth._REAUTH_SCOPE],
+ )
+
+ # Get rapt token from reauth API.
+ rapt_token = await _obtain_rapt(request, access_token, requested_scopes=scopes)
+
+ return rapt_token
+
+
+async def refresh_grant(
+ request,
+ token_uri,
+ refresh_token,
+ client_id,
+ client_secret,
+ scopes=None,
+ rapt_token=None,
+ enable_reauth_refresh=False,
+):
+ """Implements the reauthentication flow.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests. This must be an aiohttp request.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ refresh_token (str): The refresh token to use to get a new access
+ token.
+ client_id (str): The OAuth 2.0 application's client ID.
+ client_secret (str): The Oauth 2.0 appliaction's client secret.
+ scopes (Optional(Sequence[str])): Scopes to request. If present, all
+ scopes must be authorized for the refresh token. Useful if refresh
+ token has a wild card scope (e.g.
+ 'https://www.googleapis.com/auth/any-api').
+ rapt_token (Optional(str)): The rapt token for reauth.
+ enable_reauth_refresh (Optional[bool]): Whether reauth refresh flow
+ should be used. The default value is False. This option is for
+ gcloud only, other users should use the default value.
+
+ Returns:
+ Tuple[str, Optional[str], Optional[datetime], Mapping[str, str], str]: The
+ access token, new refresh token, expiration, the additional data
+ returned by the token endpoint, and the rapt token.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+ body = {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "client_id": client_id,
+ "client_secret": client_secret,
+ "refresh_token": refresh_token,
+ }
+ if scopes:
+ body["scope"] = " ".join(scopes)
+ if rapt_token:
+ body["rapt"] = rapt_token
+
+ response_status_ok, response_data, retryable_error = await _client_async._token_endpoint_request_no_throw(
+ request, token_uri, body
+ )
+ if (
+ not response_status_ok
+ and response_data.get("error") == reauth._REAUTH_NEEDED_ERROR
+ and (
+ response_data.get("error_subtype")
+ == reauth._REAUTH_NEEDED_ERROR_INVALID_RAPT
+ or response_data.get("error_subtype")
+ == reauth._REAUTH_NEEDED_ERROR_RAPT_REQUIRED
+ )
+ ):
+ if not enable_reauth_refresh:
+ raise exceptions.RefreshError(
+ "Reauthentication is needed. Please run `gcloud auth application-default login` to reauthenticate."
+ )
+
+ rapt_token = await get_rapt_token(
+ request, client_id, client_secret, refresh_token, token_uri, scopes=scopes
+ )
+ body["rapt"] = rapt_token
+ (
+ response_status_ok,
+ response_data,
+ retryable_error,
+ ) = await _client_async._token_endpoint_request_no_throw(
+ request, token_uri, body
+ )
+
+ if not response_status_ok:
+ _client._handle_error_response(response_data, retryable_error)
+ refresh_response = _client._handle_refresh_grant_response(
+ response_data, refresh_token
+ )
+ return refresh_response + (rapt_token,)
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_service_account_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_service_account_async.py
new file mode 100644
index 00000000..cfd315a7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/_service_account_async.py
@@ -0,0 +1,132 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
+
+NOTE: This file adds asynchronous refresh methods to both credentials
+classes, and therefore async/await syntax is required when calling this
+method when using service account credentials with asynchronous functionality.
+Otherwise, all other methods are inherited from the regular service account
+credentials file google.oauth2.service_account
+
+"""
+
+from google.auth import _credentials_async as credentials_async
+from google.auth import _helpers
+from google.oauth2 import _client_async
+from google.oauth2 import service_account
+
+
+class Credentials(
+ service_account.Credentials, credentials_async.Scoped, credentials_async.Credentials
+):
+ """Service account credentials
+
+ Usually, you'll create these credentials with one of the helper
+ constructors. To create credentials using a Google service account
+ private key JSON file::
+
+ credentials = _service_account_async.Credentials.from_service_account_file(
+ 'service-account.json')
+
+ Or if you already have the service account file loaded::
+
+ service_account_info = json.load(open('service_account.json'))
+ credentials = _service_account_async.Credentials.from_service_account_info(
+ service_account_info)
+
+ Both helper methods pass on arguments to the constructor, so you can
+ specify additional scopes and a subject if necessary::
+
+ credentials = _service_account_async.Credentials.from_service_account_file(
+ 'service-account.json',
+ scopes=['email'],
+ subject='user@example.com')
+
+ The credentials are considered immutable. If you want to modify the scopes
+ or the subject used for delegation, use :meth:`with_scopes` or
+ :meth:`with_subject`::
+
+ scoped_credentials = credentials.with_scopes(['email'])
+ delegated_credentials = credentials.with_subject(subject)
+
+ To add a quota project, use :meth:`with_quota_project`::
+
+ credentials = credentials.with_quota_project('myproject-123')
+ """
+
+ @_helpers.copy_docstring(credentials_async.Credentials)
+ async def refresh(self, request):
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = await _client_async.jwt_grant(
+ request, self._token_uri, assertion
+ )
+ self.token = access_token
+ self.expiry = expiry
+
+
+class IDTokenCredentials(
+ service_account.IDTokenCredentials,
+ credentials_async.Signing,
+ credentials_async.Credentials,
+):
+ """Open ID Connect ID Token-based service account credentials.
+
+ These credentials are largely similar to :class:`.Credentials`, but instead
+ of using an OAuth 2.0 Access Token as the bearer token, they use an Open
+ ID Connect ID Token as the bearer token. These credentials are useful when
+ communicating to services that require ID Tokens and can not accept access
+ tokens.
+
+ Usually, you'll create these credentials with one of the helper
+ constructors. To create credentials using a Google service account
+ private key JSON file::
+
+ credentials = (
+ _service_account_async.IDTokenCredentials.from_service_account_file(
+ 'service-account.json'))
+
+ Or if you already have the service account file loaded::
+
+ service_account_info = json.load(open('service_account.json'))
+ credentials = (
+ _service_account_async.IDTokenCredentials.from_service_account_info(
+ service_account_info))
+
+ Both helper methods pass on arguments to the constructor, so you can
+ specify additional scopes and a subject if necessary::
+
+ credentials = (
+ _service_account_async.IDTokenCredentials.from_service_account_file(
+ 'service-account.json',
+ scopes=['email'],
+ subject='user@example.com'))
+
+ The credentials are considered immutable. If you want to modify the scopes
+ or the subject used for delegation, use :meth:`with_scopes` or
+ :meth:`with_subject`::
+
+ scoped_credentials = credentials.with_scopes(['email'])
+ delegated_credentials = credentials.with_subject(subject)
+
+ """
+
+ @_helpers.copy_docstring(credentials_async.Credentials)
+ async def refresh(self, request):
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = await _client_async.id_token_jwt_grant(
+ request, self._token_uri, assertion
+ )
+ self.token = access_token
+ self.expiry = expiry
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py b/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py
new file mode 100644
index 00000000..6468498b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py
@@ -0,0 +1,281 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Challenges for reauthentication.
+"""
+
+import abc
+import base64
+import getpass
+import sys
+
+from google.auth import _helpers
+from google.auth import exceptions
+from google.oauth2 import webauthn_handler_factory
+from google.oauth2.webauthn_types import (
+ AuthenticationExtensionsClientInputs,
+ GetRequest,
+ PublicKeyCredentialDescriptor,
+)
+
+
+REAUTH_ORIGIN = "https://accounts.google.com"
+SAML_CHALLENGE_MESSAGE = (
+ "Please run `gcloud auth login` to complete reauthentication with SAML."
+)
+WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout
+
+
+def get_user_password(text):
+ """Get password from user.
+
+ Override this function with a different logic if you are using this library
+ outside a CLI.
+
+ Args:
+ text (str): message for the password prompt.
+
+ Returns:
+ str: password string.
+ """
+ return getpass.getpass(text)
+
+
+class ReauthChallenge(metaclass=abc.ABCMeta):
+ """Base class for reauth challenges."""
+
+ @property
+ @abc.abstractmethod
+ def name(self): # pragma: NO COVER
+ """Returns the name of the challenge."""
+ raise NotImplementedError("name property must be implemented")
+
+ @property
+ @abc.abstractmethod
+ def is_locally_eligible(self): # pragma: NO COVER
+ """Returns true if a challenge is supported locally on this machine."""
+ raise NotImplementedError("is_locally_eligible property must be implemented")
+
+ @abc.abstractmethod
+ def obtain_challenge_input(self, metadata): # pragma: NO COVER
+ """Performs logic required to obtain credentials and returns it.
+
+ Args:
+ metadata (Mapping): challenge metadata returned in the 'challenges' field in
+ the initial reauth request. Includes the 'challengeType' field
+ and other challenge-specific fields.
+
+ Returns:
+ response that will be send to the reauth service as the content of
+ the 'proposalResponse' field in the request body. Usually a dict
+ with the keys specific to the challenge. For example,
+ ``{'credential': password}`` for password challenge.
+ """
+ raise NotImplementedError("obtain_challenge_input method must be implemented")
+
+
+class PasswordChallenge(ReauthChallenge):
+ """Challenge that asks for user's password."""
+
+ @property
+ def name(self):
+ return "PASSWORD"
+
+ @property
+ def is_locally_eligible(self):
+ return True
+
+ @_helpers.copy_docstring(ReauthChallenge)
+ def obtain_challenge_input(self, unused_metadata):
+ passwd = get_user_password("Please enter your password:")
+ if not passwd:
+ passwd = " " # avoid the server crashing in case of no password :D
+ return {"credential": passwd}
+
+
+class SecurityKeyChallenge(ReauthChallenge):
+ """Challenge that asks for user's security key touch."""
+
+ @property
+ def name(self):
+ return "SECURITY_KEY"
+
+ @property
+ def is_locally_eligible(self):
+ return True
+
+ @_helpers.copy_docstring(ReauthChallenge)
+ def obtain_challenge_input(self, metadata):
+ # Check if there is an available Webauthn Handler, if not use pyu2f
+ try:
+ factory = webauthn_handler_factory.WebauthnHandlerFactory()
+ webauthn_handler = factory.get_handler()
+ if webauthn_handler is not None:
+ sys.stderr.write("Please insert and touch your security key\n")
+ return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
+ except Exception:
+ # Attempt pyu2f if exception in webauthn flow
+ pass
+
+ try:
+ import pyu2f.convenience.authenticator # type: ignore
+ import pyu2f.errors # type: ignore
+ import pyu2f.model # type: ignore
+ except ImportError:
+ raise exceptions.ReauthFailError(
+ "pyu2f dependency is required to use Security key reauth feature. "
+ "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`."
+ )
+ sk = metadata["securityKey"]
+ challenges = sk["challenges"]
+ # Read both 'applicationId' and 'relyingPartyId', if they are the same, use
+ # applicationId, if they are different, use relyingPartyId first and retry
+ # with applicationId
+ application_id = sk["applicationId"]
+ relying_party_id = sk["relyingPartyId"]
+
+ if application_id != relying_party_id:
+ application_parameters = [relying_party_id, application_id]
+ else:
+ application_parameters = [application_id]
+
+ challenge_data = []
+ for c in challenges:
+ kh = c["keyHandle"].encode("ascii")
+ key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh)))
+ challenge = c["challenge"].encode("ascii")
+ challenge = base64.urlsafe_b64decode(challenge)
+ challenge_data.append({"key": key, "challenge": challenge})
+
+ # Track number of tries to suppress error message until all application_parameters
+ # are tried.
+ tries = 0
+ for app_id in application_parameters:
+ try:
+ tries += 1
+ api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator(
+ REAUTH_ORIGIN
+ )
+ response = api.Authenticate(
+ app_id, challenge_data, print_callback=sys.stderr.write
+ )
+ return {"securityKey": response}
+ except pyu2f.errors.U2FError as e:
+ if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE:
+ # Only show error if all app_ids have been tried
+ if tries == len(application_parameters):
+ sys.stderr.write("Ineligible security key.\n")
+ return None
+ continue
+ if e.code == pyu2f.errors.U2FError.TIMEOUT:
+ sys.stderr.write(
+ "Timed out while waiting for security key touch.\n"
+ )
+ else:
+ raise e
+ except pyu2f.errors.PluginError as e:
+ sys.stderr.write("Plugin error: {}.\n".format(e))
+ continue
+ except pyu2f.errors.NoDeviceFoundError:
+ sys.stderr.write("No security key found.\n")
+ return None
+
+ def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
+ sk = metadata.get("securityKey")
+ if sk is None:
+ raise exceptions.InvalidValue("securityKey is None")
+ challenges = sk.get("challenges")
+ application_id = sk.get("applicationId")
+ relying_party_id = sk.get("relyingPartyId")
+ if challenges is None or len(challenges) < 1:
+ raise exceptions.InvalidValue("challenges is None or empty")
+ if application_id is None:
+ raise exceptions.InvalidValue("application_id is None")
+ if relying_party_id is None:
+ raise exceptions.InvalidValue("relying_party_id is None")
+
+ allow_credentials = []
+ for challenge in challenges:
+ kh = challenge.get("keyHandle")
+ if kh is None:
+ raise exceptions.InvalidValue("keyHandle is None")
+ key_handle = self._unpadded_urlsafe_b64recode(kh)
+ allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle))
+
+ extension = AuthenticationExtensionsClientInputs(appid=application_id)
+
+ challenge = challenges[0].get("challenge")
+ if challenge is None:
+ raise exceptions.InvalidValue("challenge is None")
+
+ get_request = GetRequest(
+ origin=REAUTH_ORIGIN,
+ rpid=relying_party_id,
+ challenge=self._unpadded_urlsafe_b64recode(challenge),
+ timeout_ms=WEBAUTHN_TIMEOUT_MS,
+ allow_credentials=allow_credentials,
+ user_verification="required",
+ extensions=extension,
+ )
+
+ try:
+ get_response = webauthn_handler.get(get_request)
+ except Exception as e:
+ sys.stderr.write("Webauthn Error: {}.\n".format(e))
+ raise e
+
+ response = {
+ "clientData": get_response.response.client_data_json,
+ "authenticatorData": get_response.response.authenticator_data,
+ "signatureData": get_response.response.signature,
+ "applicationId": application_id,
+ "keyHandle": get_response.id,
+ "securityKeyReplyType": 2,
+ }
+ return {"securityKey": response}
+
+ def _unpadded_urlsafe_b64recode(self, s):
+ """Converts standard b64 encoded string to url safe b64 encoded string
+ with no padding."""
+ b = base64.urlsafe_b64decode(s)
+ return base64.urlsafe_b64encode(b).decode().rstrip("=")
+
+
+class SamlChallenge(ReauthChallenge):
+ """Challenge that asks the users to browse to their ID Providers.
+
+ Currently SAML challenge is not supported. When obtaining the challenge
+ input, exception will be raised to instruct the users to run
+ `gcloud auth login` for reauthentication.
+ """
+
+ @property
+ def name(self):
+ return "SAML"
+
+ @property
+ def is_locally_eligible(self):
+ return True
+
+ def obtain_challenge_input(self, metadata):
+ # Magic Arch has not fully supported returning a proper dedirect URL
+ # for programmatic SAML users today. So we error our here and request
+ # users to use gcloud to complete a login.
+ raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE)
+
+
+AVAILABLE_CHALLENGES = {
+ challenge.name: challenge
+ for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()]
+}
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/credentials.py b/.venv/lib/python3.12/site-packages/google/oauth2/credentials.py
new file mode 100644
index 00000000..6e158089
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/credentials.py
@@ -0,0 +1,614 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""OAuth 2.0 Credentials.
+
+This module provides credentials based on OAuth 2.0 access and refresh tokens.
+These credentials usually access resources on behalf of a user (resource
+owner).
+
+Specifically, this is intended to use access tokens acquired using the
+`Authorization Code grant`_ and can refresh those tokens using a
+optional `refresh token`_.
+
+Obtaining the initial access and refresh token is outside of the scope of this
+module. Consult `rfc6749 section 4.1`_ for complete details on the
+Authorization Code grant flow.
+
+.. _Authorization Code grant: https://tools.ietf.org/html/rfc6749#section-1.3.1
+.. _refresh token: https://tools.ietf.org/html/rfc6749#section-6
+.. _rfc6749 section 4.1: https://tools.ietf.org/html/rfc6749#section-4.1
+"""
+
+from datetime import datetime
+import io
+import json
+import logging
+import warnings
+
+from google.auth import _cloud_sdk
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import metrics
+from google.oauth2 import reauth
+
+_LOGGER = logging.getLogger(__name__)
+
+
+# The Google OAuth 2.0 token endpoint. Used for authorized user credentials.
+_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
+
+# The Google OAuth 2.0 token info endpoint. Used for getting token info JSON from access tokens.
+_GOOGLE_OAUTH2_TOKEN_INFO_ENDPOINT = "https://oauth2.googleapis.com/tokeninfo"
+
+
+class Credentials(credentials.ReadOnlyScoped, credentials.CredentialsWithQuotaProject):
+ """Credentials using OAuth 2.0 access and refresh tokens.
+
+ The credentials are considered immutable except the tokens and the token
+ expiry, which are updated after refresh. If you want to modify the quota
+ project, use :meth:`with_quota_project` or ::
+
+ credentials = credentials.with_quota_project('myproject-123')
+
+ Reauth is disabled by default. To enable reauth, set the
+ `enable_reauth_refresh` parameter to True in the constructor. Note that
+ reauth feature is intended for gcloud to use only.
+ If reauth is enabled, `pyu2f` dependency has to be installed in order to use security
+ key reauth feature. Dependency can be installed via `pip install pyu2f` or `pip install
+ google-auth[reauth]`.
+ """
+
+ def __init__(
+ self,
+ token,
+ refresh_token=None,
+ id_token=None,
+ token_uri=None,
+ client_id=None,
+ client_secret=None,
+ scopes=None,
+ default_scopes=None,
+ quota_project_id=None,
+ expiry=None,
+ rapt_token=None,
+ refresh_handler=None,
+ enable_reauth_refresh=False,
+ granted_scopes=None,
+ trust_boundary=None,
+ universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+ account=None,
+ ):
+ """
+ Args:
+ token (Optional(str)): The OAuth 2.0 access token. Can be None
+ if refresh information is provided.
+ refresh_token (str): The OAuth 2.0 refresh token. If specified,
+ credentials can be refreshed.
+ id_token (str): The Open ID Connect ID Token.
+ token_uri (str): The OAuth 2.0 authorization server's token
+ endpoint URI. Must be specified for refresh, can be left as
+ None if the token can not be refreshed.
+ client_id (str): The OAuth 2.0 client ID. Must be specified for
+ refresh, can be left as None if the token can not be refreshed.
+ client_secret(str): The OAuth 2.0 client secret. Must be specified
+ for refresh, can be left as None if the token can not be
+ refreshed.
+ scopes (Sequence[str]): The scopes used to obtain authorization.
+ This parameter is used by :meth:`has_scopes`. OAuth 2.0
+ credentials can not request additional scopes after
+ authorization. The scopes must be derivable from the refresh
+ token if refresh information is provided (e.g. The refresh
+ token scopes are a superset of this or contain a wild card
+ scope like 'https://www.googleapis.com/auth/any-api').
+ default_scopes (Sequence[str]): Default scopes passed by a
+ Google client library. Use 'scopes' for user-defined scopes.
+ quota_project_id (Optional[str]): The project ID used for quota and billing.
+ This project may be different from the project used to
+ create the credentials.
+ rapt_token (Optional[str]): The reauth Proof Token.
+ refresh_handler (Optional[Callable[[google.auth.transport.Request, Sequence[str]], [str, datetime]]]):
+ A callable which takes in the HTTP request callable and the list of
+ OAuth scopes and when called returns an access token string for the
+ requested scopes and its expiry datetime. This is useful when no
+ refresh tokens are provided and tokens are obtained by calling
+ some external process on demand. It is particularly useful for
+ retrieving downscoped tokens from a token broker.
+ enable_reauth_refresh (Optional[bool]): Whether reauth refresh flow
+ should be used. This flag is for gcloud to use only.
+ granted_scopes (Optional[Sequence[str]]): The scopes that were consented/granted by the user.
+ This could be different from the requested scopes and it could be empty if granted
+ and requested scopes were same.
+ trust_boundary (str): String representation of trust boundary meta.
+ universe_domain (Optional[str]): The universe domain. The default
+ universe domain is googleapis.com.
+ account (Optional[str]): The account associated with the credential.
+ """
+ super(Credentials, self).__init__()
+ self.token = token
+ self.expiry = expiry
+ self._refresh_token = refresh_token
+ self._id_token = id_token
+ self._scopes = scopes
+ self._default_scopes = default_scopes
+ self._granted_scopes = granted_scopes
+ self._token_uri = token_uri
+ self._client_id = client_id
+ self._client_secret = client_secret
+ self._quota_project_id = quota_project_id
+ self._rapt_token = rapt_token
+ self.refresh_handler = refresh_handler
+ self._enable_reauth_refresh = enable_reauth_refresh
+ self._trust_boundary = trust_boundary
+ self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
+ self._account = account or ""
+ self._cred_file_path = None
+
+ def __getstate__(self):
+ """A __getstate__ method must exist for the __setstate__ to be called
+ This is identical to the default implementation.
+ See https://docs.python.org/3.7/library/pickle.html#object.__setstate__
+ """
+ state_dict = self.__dict__.copy()
+ # Remove _refresh_handler function as there are limitations pickling and
+ # unpickling certain callables (lambda, functools.partial instances)
+ # because they need to be importable.
+ # Instead, the refresh_handler setter should be used to repopulate this.
+ if "_refresh_handler" in state_dict:
+ del state_dict["_refresh_handler"]
+
+ if "_refresh_worker" in state_dict:
+ del state_dict["_refresh_worker"]
+ return state_dict
+
+ def __setstate__(self, d):
+ """Credentials pickled with older versions of the class do not have
+ all the attributes."""
+ self.token = d.get("token")
+ self.expiry = d.get("expiry")
+ self._refresh_token = d.get("_refresh_token")
+ self._id_token = d.get("_id_token")
+ self._scopes = d.get("_scopes")
+ self._default_scopes = d.get("_default_scopes")
+ self._granted_scopes = d.get("_granted_scopes")
+ self._token_uri = d.get("_token_uri")
+ self._client_id = d.get("_client_id")
+ self._client_secret = d.get("_client_secret")
+ self._quota_project_id = d.get("_quota_project_id")
+ self._rapt_token = d.get("_rapt_token")
+ self._enable_reauth_refresh = d.get("_enable_reauth_refresh")
+ self._trust_boundary = d.get("_trust_boundary")
+ self._universe_domain = (
+ d.get("_universe_domain") or credentials.DEFAULT_UNIVERSE_DOMAIN
+ )
+ self._cred_file_path = d.get("_cred_file_path")
+ # The refresh_handler setter should be used to repopulate this.
+ self._refresh_handler = None
+ self._refresh_worker = None
+ self._use_non_blocking_refresh = d.get("_use_non_blocking_refresh", False)
+ self._account = d.get("_account", "")
+
+ @property
+ def refresh_token(self):
+ """Optional[str]: The OAuth 2.0 refresh token."""
+ return self._refresh_token
+
+ @property
+ def scopes(self):
+ """Optional[str]: The OAuth 2.0 permission scopes."""
+ return self._scopes
+
+ @property
+ def granted_scopes(self):
+ """Optional[Sequence[str]]: The OAuth 2.0 permission scopes that were granted by the user."""
+ return self._granted_scopes
+
+ @property
+ def token_uri(self):
+ """Optional[str]: The OAuth 2.0 authorization server's token endpoint
+ URI."""
+ return self._token_uri
+
+ @property
+ def id_token(self):
+ """Optional[str]: The Open ID Connect ID Token.
+
+ Depending on the authorization server and the scopes requested, this
+ may be populated when credentials are obtained and updated when
+ :meth:`refresh` is called. This token is a JWT. It can be verified
+ and decoded using :func:`google.oauth2.id_token.verify_oauth2_token`.
+ """
+ return self._id_token
+
+ @property
+ def client_id(self):
+ """Optional[str]: The OAuth 2.0 client ID."""
+ return self._client_id
+
+ @property
+ def client_secret(self):
+ """Optional[str]: The OAuth 2.0 client secret."""
+ return self._client_secret
+
+ @property
+ def requires_scopes(self):
+ """False: OAuth 2.0 credentials have their scopes set when
+ the initial token is requested and can not be changed."""
+ return False
+
+ @property
+ def rapt_token(self):
+ """Optional[str]: The reauth Proof Token."""
+ return self._rapt_token
+
+ @property
+ def refresh_handler(self):
+ """Returns the refresh handler if available.
+
+ Returns:
+ Optional[Callable[[google.auth.transport.Request, Sequence[str]], [str, datetime]]]:
+ The current refresh handler.
+ """
+ return self._refresh_handler
+
+ @refresh_handler.setter
+ def refresh_handler(self, value):
+ """Updates the current refresh handler.
+
+ Args:
+ value (Optional[Callable[[google.auth.transport.Request, Sequence[str]], [str, datetime]]]):
+ The updated value of the refresh handler.
+
+ Raises:
+ TypeError: If the value is not a callable or None.
+ """
+ if not callable(value) and value is not None:
+ raise TypeError("The provided refresh_handler is not a callable or None.")
+ self._refresh_handler = value
+
+ @property
+ def account(self):
+ """str: The user account associated with the credential. If the account is unknown an empty string is returned."""
+ return self._account
+
+ def _make_copy(self):
+ cred = self.__class__(
+ self.token,
+ refresh_token=self.refresh_token,
+ id_token=self.id_token,
+ token_uri=self.token_uri,
+ client_id=self.client_id,
+ client_secret=self.client_secret,
+ scopes=self.scopes,
+ default_scopes=self.default_scopes,
+ granted_scopes=self.granted_scopes,
+ quota_project_id=self.quota_project_id,
+ rapt_token=self.rapt_token,
+ enable_reauth_refresh=self._enable_reauth_refresh,
+ trust_boundary=self._trust_boundary,
+ universe_domain=self._universe_domain,
+ account=self._account,
+ )
+ cred._cred_file_path = self._cred_file_path
+ return cred
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def get_cred_info(self):
+ if self._cred_file_path:
+ cred_info = {
+ "credential_source": self._cred_file_path,
+ "credential_type": "user credentials",
+ }
+ if self.account:
+ cred_info["principal"] = self.account
+ return cred_info
+ return None
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ cred = self._make_copy()
+ cred._quota_project_id = quota_project_id
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
+ def with_token_uri(self, token_uri):
+ cred = self._make_copy()
+ cred._token_uri = token_uri
+ return cred
+
+ def with_account(self, account):
+ """Returns a copy of these credentials with a modified account.
+
+ Args:
+ account (str): The account to set
+
+ Returns:
+ google.oauth2.credentials.Credentials: A new credentials instance.
+ """
+ cred = self._make_copy()
+ cred._account = account
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
+ def with_universe_domain(self, universe_domain):
+ cred = self._make_copy()
+ cred._universe_domain = universe_domain
+ return cred
+
+ def _metric_header_for_usage(self):
+ return metrics.CRED_TYPE_USER
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
+ raise exceptions.RefreshError(
+ "User credential refresh is only supported in the default "
+ "googleapis.com universe domain, but the current universe "
+ "domain is {}. If you created the credential with an access "
+ "token, it's likely that the provided token is expired now, "
+ "please update your code with a valid token.".format(
+ self._universe_domain
+ )
+ )
+
+ scopes = self._scopes if self._scopes is not None else self._default_scopes
+ # Use refresh handler if available and no refresh token is
+ # available. This is useful in general when tokens are obtained by calling
+ # some external process on demand. It is particularly useful for retrieving
+ # downscoped tokens from a token broker.
+ if self._refresh_token is None and self.refresh_handler:
+ token, expiry = self.refresh_handler(request, scopes=scopes)
+ # Validate returned data.
+ if not isinstance(token, str):
+ raise exceptions.RefreshError(
+ "The refresh_handler returned token is not a string."
+ )
+ if not isinstance(expiry, datetime):
+ raise exceptions.RefreshError(
+ "The refresh_handler returned expiry is not a datetime object."
+ )
+ if _helpers.utcnow() >= expiry - _helpers.REFRESH_THRESHOLD:
+ raise exceptions.RefreshError(
+ "The credentials returned by the refresh_handler are "
+ "already expired."
+ )
+ self.token = token
+ self.expiry = expiry
+ return
+
+ if (
+ self._refresh_token is None
+ or self._token_uri is None
+ or self._client_id is None
+ or self._client_secret is None
+ ):
+ raise exceptions.RefreshError(
+ "The credentials do not contain the necessary fields need to "
+ "refresh the access token. You must specify refresh_token, "
+ "token_uri, client_id, and client_secret."
+ )
+
+ (
+ access_token,
+ refresh_token,
+ expiry,
+ grant_response,
+ rapt_token,
+ ) = reauth.refresh_grant(
+ request,
+ self._token_uri,
+ self._refresh_token,
+ self._client_id,
+ self._client_secret,
+ scopes=scopes,
+ rapt_token=self._rapt_token,
+ enable_reauth_refresh=self._enable_reauth_refresh,
+ )
+
+ self.token = access_token
+ self.expiry = expiry
+ self._refresh_token = refresh_token
+ self._id_token = grant_response.get("id_token")
+ self._rapt_token = rapt_token
+
+ if scopes and "scope" in grant_response:
+ requested_scopes = frozenset(scopes)
+ self._granted_scopes = grant_response["scope"].split()
+ granted_scopes = frozenset(self._granted_scopes)
+ scopes_requested_but_not_granted = requested_scopes - granted_scopes
+ if scopes_requested_but_not_granted:
+ # User might be presented with unbundled scopes at the time of
+ # consent. So it is a valid scenario to not have all the requested
+ # scopes as part of granted scopes but log a warning in case the
+ # developer wants to debug the scenario.
+ _LOGGER.warning(
+ "Not all requested scopes were granted by the "
+ "authorization server, missing scopes {}.".format(
+ ", ".join(scopes_requested_but_not_granted)
+ )
+ )
+
+ @classmethod
+ def from_authorized_user_info(cls, info, scopes=None):
+ """Creates a Credentials instance from parsed authorized user info.
+
+ Args:
+ info (Mapping[str, str]): The authorized user info in Google
+ format.
+ scopes (Sequence[str]): Optional list of scopes to include in the
+ credentials.
+
+ Returns:
+ google.oauth2.credentials.Credentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ keys_needed = set(("refresh_token", "client_id", "client_secret"))
+ missing = keys_needed.difference(info.keys())
+
+ if missing:
+ raise ValueError(
+ "Authorized user info was not in the expected format, missing "
+ "fields {}.".format(", ".join(missing))
+ )
+
+ # access token expiry (datetime obj); auto-expire if not saved
+ expiry = info.get("expiry")
+ if expiry:
+ expiry = datetime.strptime(
+ expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S"
+ )
+ else:
+ expiry = _helpers.utcnow() - _helpers.REFRESH_THRESHOLD
+
+ # process scopes, which needs to be a seq
+ if scopes is None and "scopes" in info:
+ scopes = info.get("scopes")
+ if isinstance(scopes, str):
+ scopes = scopes.split(" ")
+
+ return cls(
+ token=info.get("token"),
+ refresh_token=info.get("refresh_token"),
+ token_uri=_GOOGLE_OAUTH2_TOKEN_ENDPOINT, # always overrides
+ scopes=scopes,
+ client_id=info.get("client_id"),
+ client_secret=info.get("client_secret"),
+ quota_project_id=info.get("quota_project_id"), # may not exist
+ expiry=expiry,
+ rapt_token=info.get("rapt_token"), # may not exist
+ trust_boundary=info.get("trust_boundary"), # may not exist
+ universe_domain=info.get("universe_domain"), # may not exist
+ account=info.get("account", ""), # may not exist
+ )
+
+ @classmethod
+ def from_authorized_user_file(cls, filename, scopes=None):
+ """Creates a Credentials instance from an authorized user json file.
+
+ Args:
+ filename (str): The path to the authorized user json file.
+ scopes (Sequence[str]): Optional list of scopes to include in the
+ credentials.
+
+ Returns:
+ google.oauth2.credentials.Credentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the file is not in the expected format.
+ """
+ with io.open(filename, "r", encoding="utf-8") as json_file:
+ data = json.load(json_file)
+ return cls.from_authorized_user_info(data, scopes)
+
+ def to_json(self, strip=None):
+ """Utility function that creates a JSON representation of a Credentials
+ object.
+
+ Args:
+ strip (Sequence[str]): Optional list of members to exclude from the
+ generated JSON.
+
+ Returns:
+ str: A JSON representation of this instance. When converted into
+ a dictionary, it can be passed to from_authorized_user_info()
+ to create a new credential instance.
+ """
+ prep = {
+ "token": self.token,
+ "refresh_token": self.refresh_token,
+ "token_uri": self.token_uri,
+ "client_id": self.client_id,
+ "client_secret": self.client_secret,
+ "scopes": self.scopes,
+ "rapt_token": self.rapt_token,
+ "universe_domain": self._universe_domain,
+ "account": self._account,
+ }
+ if self.expiry: # flatten expiry timestamp
+ prep["expiry"] = self.expiry.isoformat() + "Z"
+
+ # Remove empty entries (those which are None)
+ prep = {k: v for k, v in prep.items() if v is not None}
+
+ # Remove entries that explicitely need to be removed
+ if strip is not None:
+ prep = {k: v for k, v in prep.items() if k not in strip}
+
+ return json.dumps(prep)
+
+
+class UserAccessTokenCredentials(credentials.CredentialsWithQuotaProject):
+ """Access token credentials for user account.
+
+ Obtain the access token for a given user account or the current active
+ user account with the ``gcloud auth print-access-token`` command.
+
+ Args:
+ account (Optional[str]): Account to get the access token for. If not
+ specified, the current active account will be used.
+ quota_project_id (Optional[str]): The project ID used for quota
+ and billing.
+ """
+
+ def __init__(self, account=None, quota_project_id=None):
+ warnings.warn(
+ "UserAccessTokenCredentials is deprecated, please use "
+ "google.oauth2.credentials.Credentials instead. To use "
+ "that credential type, simply run "
+ "`gcloud auth application-default login` and let the "
+ "client libraries pick up the application default credentials."
+ )
+ super(UserAccessTokenCredentials, self).__init__()
+ self._account = account
+ self._quota_project_id = quota_project_id
+
+ def with_account(self, account):
+ """Create a new instance with the given account.
+
+ Args:
+ account (str): Account to get the access token for.
+
+ Returns:
+ google.oauth2.credentials.UserAccessTokenCredentials: The created
+ credentials with the given account.
+ """
+ return self.__class__(account=account, quota_project_id=self._quota_project_id)
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ return self.__class__(account=self._account, quota_project_id=quota_project_id)
+
+ def refresh(self, request):
+ """Refreshes the access token.
+
+ Args:
+ request (google.auth.transport.Request): This argument is required
+ by the base class interface but not used in this implementation,
+ so just set it to `None`.
+
+ Raises:
+ google.auth.exceptions.UserAccessTokenError: If the access token
+ refresh failed.
+ """
+ self.token = _cloud_sdk.get_auth_access_token(self._account)
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def before_request(self, request, method, url, headers):
+ self.refresh(request)
+ self.apply(headers)
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py b/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py
new file mode 100644
index 00000000..7410cfc2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py
@@ -0,0 +1,251 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Experimental GDCH credentials support.
+"""
+
+import datetime
+
+from google.auth import _helpers
+from google.auth import _service_account_info
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import jwt
+from google.oauth2 import _client
+
+
+TOKEN_EXCHANGE_TYPE = "urn:ietf:params:oauth:token-type:token-exchange"
+ACCESS_TOKEN_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+SERVICE_ACCOUNT_TOKEN_TYPE = "urn:k8s:params:oauth:token-type:serviceaccount"
+JWT_LIFETIME = datetime.timedelta(seconds=3600) # 1 hour
+
+
+class ServiceAccountCredentials(credentials.Credentials):
+ """Credentials for GDCH (`Google Distributed Cloud Hosted`_) for service
+ account users.
+
+ .. _Google Distributed Cloud Hosted:
+ https://cloud.google.com/blog/topics/hybrid-cloud/\
+ announcing-google-distributed-cloud-edge-and-hosted
+
+ To create a GDCH service account credential, first create a JSON file of
+ the following format::
+
+ {
+ "type": "gdch_service_account",
+ "format_version": "1",
+ "project": "<project name>",
+ "private_key_id": "<key id>",
+ "private_key": "-----BEGIN EC PRIVATE KEY-----\n<key bytes>\n-----END EC PRIVATE KEY-----\n",
+ "name": "<service identity name>",
+ "ca_cert_path": "<CA cert path>",
+ "token_uri": "https://service-identity.<Domain>/authenticate"
+ }
+
+ The "format_version" field stands for the format of the JSON file. For now
+ it is always "1". The `private_key_id` and `private_key` is used for signing.
+ The `ca_cert_path` is used for token server TLS certificate verification.
+
+ After the JSON file is created, set `GOOGLE_APPLICATION_CREDENTIALS` environment
+ variable to the JSON file path, then use the following code to create the
+ credential::
+
+ import google.auth
+
+ credential, _ = google.auth.default()
+ credential = credential.with_gdch_audience("<the audience>")
+
+ We can also create the credential directly::
+
+ from google.oauth import gdch_credentials
+
+ credential = gdch_credentials.ServiceAccountCredentials.from_service_account_file("<the json file path>")
+ credential = credential.with_gdch_audience("<the audience>")
+
+ The token is obtained in the following way. This class first creates a
+ self signed JWT. It uses the `name` value as the `iss` and `sub` claim, and
+ the `token_uri` as the `aud` claim, and signs the JWT with the `private_key`.
+ It then sends the JWT to the `token_uri` to exchange a final token for
+ `audience`.
+ """
+
+ def __init__(
+ self, signer, service_identity_name, project, audience, token_uri, ca_cert_path
+ ):
+ """
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ service_identity_name (str): The service identity name. It will be
+ used as the `iss` and `sub` claim in the self signed JWT.
+ project (str): The project.
+ audience (str): The audience for the final token.
+ token_uri (str): The token server uri.
+ ca_cert_path (str): The CA cert path for token server side TLS
+ certificate verification. If the token server uses well known
+ CA, then this parameter can be `None`.
+ """
+ super(ServiceAccountCredentials, self).__init__()
+ self._signer = signer
+ self._service_identity_name = service_identity_name
+ self._project = project
+ self._audience = audience
+ self._token_uri = token_uri
+ self._ca_cert_path = ca_cert_path
+
+ def _create_jwt(self):
+ now = _helpers.utcnow()
+ expiry = now + JWT_LIFETIME
+ iss_sub_value = "system:serviceaccount:{}:{}".format(
+ self._project, self._service_identity_name
+ )
+
+ payload = {
+ "iss": iss_sub_value,
+ "sub": iss_sub_value,
+ "aud": self._token_uri,
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(expiry),
+ }
+
+ return _helpers.from_bytes(jwt.encode(self._signer, payload))
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ import google.auth.transport.requests
+
+ if not isinstance(request, google.auth.transport.requests.Request):
+ raise exceptions.RefreshError(
+ "For GDCH service account credentials, request must be a google.auth.transport.requests.Request object"
+ )
+
+ # Create a self signed JWT, and do token exchange.
+ jwt_token = self._create_jwt()
+ request_body = {
+ "grant_type": TOKEN_EXCHANGE_TYPE,
+ "audience": self._audience,
+ "requested_token_type": ACCESS_TOKEN_TOKEN_TYPE,
+ "subject_token": jwt_token,
+ "subject_token_type": SERVICE_ACCOUNT_TOKEN_TYPE,
+ }
+ response_data = _client._token_endpoint_request(
+ request,
+ self._token_uri,
+ request_body,
+ access_token=None,
+ use_json=True,
+ verify=self._ca_cert_path,
+ )
+
+ self.token, _, self.expiry, _ = _client._handle_refresh_grant_response(
+ response_data, None
+ )
+
+ def with_gdch_audience(self, audience):
+ """Create a copy of GDCH credentials with the specified audience.
+
+ Args:
+ audience (str): The intended audience for GDCH credentials.
+ """
+ return self.__class__(
+ self._signer,
+ self._service_identity_name,
+ self._project,
+ audience,
+ self._token_uri,
+ self._ca_cert_path,
+ )
+
+ @classmethod
+ def _from_signer_and_info(cls, signer, info):
+ """Creates a Credentials instance from a signer and service account
+ info.
+
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ info (Mapping[str, str]): The service account info.
+
+ Returns:
+ google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ if info["format_version"] != "1":
+ raise ValueError("Only format version 1 is supported")
+
+ return cls(
+ signer,
+ info["name"], # service_identity_name
+ info["project"],
+ None, # audience
+ info["token_uri"],
+ info.get("ca_cert_path", None),
+ )
+
+ @classmethod
+ def from_service_account_info(cls, info):
+ """Creates a Credentials instance from parsed service account info.
+
+ Args:
+ info (Mapping[str, str]): The service account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ signer = _service_account_info.from_dict(
+ info,
+ require=[
+ "format_version",
+ "private_key_id",
+ "private_key",
+ "name",
+ "project",
+ "token_uri",
+ ],
+ use_rsa_signer=False,
+ )
+ return cls._from_signer_and_info(signer, info)
+
+ @classmethod
+ def from_service_account_file(cls, filename):
+ """Creates a Credentials instance from a service account json file.
+
+ Args:
+ filename (str): The path to the service account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
+ credentials.
+ """
+ info, signer = _service_account_info.from_filename(
+ filename,
+ require=[
+ "format_version",
+ "private_key_id",
+ "private_key",
+ "name",
+ "project",
+ "token_uri",
+ ],
+ use_rsa_signer=False,
+ )
+ return cls._from_signer_and_info(signer, info)
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/id_token.py b/.venv/lib/python3.12/site-packages/google/oauth2/id_token.py
new file mode 100644
index 00000000..b68ab6b3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/id_token.py
@@ -0,0 +1,358 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google ID Token helpers.
+
+Provides support for verifying `OpenID Connect ID Tokens`_, especially ones
+generated by Google infrastructure.
+
+To parse and verify an ID Token issued by Google's OAuth 2.0 authorization
+server use :func:`verify_oauth2_token`. To verify an ID Token issued by
+Firebase, use :func:`verify_firebase_token`.
+
+A general purpose ID Token verifier is available as :func:`verify_token`.
+
+Example::
+
+ from google.oauth2 import id_token
+ from google.auth.transport import requests
+
+ request = requests.Request()
+
+ id_info = id_token.verify_oauth2_token(
+ token, request, 'my-client-id.example.com')
+
+ userid = id_info['sub']
+
+By default, this will re-fetch certificates for each verification. Because
+Google's public keys are only changed infrequently (on the order of once per
+day), you may wish to take advantage of caching to reduce latency and the
+potential for network errors. This can be accomplished using an external
+library like `CacheControl`_ to create a cache-aware
+:class:`google.auth.transport.Request`::
+
+ import cachecontrol
+ import google.auth.transport.requests
+ import requests
+
+ session = requests.session()
+ cached_session = cachecontrol.CacheControl(session)
+ request = google.auth.transport.requests.Request(session=cached_session)
+
+.. _OpenID Connect ID Tokens:
+ http://openid.net/specs/openid-connect-core-1_0.html#IDToken
+.. _CacheControl: https://cachecontrol.readthedocs.io
+"""
+
+import http.client as http_client
+import json
+import os
+
+from google.auth import environment_vars
+from google.auth import exceptions
+from google.auth import jwt
+
+
+# The URL that provides public certificates for verifying ID tokens issued
+# by Google's OAuth 2.0 authorization server.
+_GOOGLE_OAUTH2_CERTS_URL = "https://www.googleapis.com/oauth2/v1/certs"
+
+# The URL that provides public certificates for verifying ID tokens issued
+# by Firebase and the Google APIs infrastructure
+_GOOGLE_APIS_CERTS_URL = (
+ "https://www.googleapis.com/robot/v1/metadata/x509"
+ "/securetoken@system.gserviceaccount.com"
+)
+
+_GOOGLE_ISSUERS = ["accounts.google.com", "https://accounts.google.com"]
+
+
+def _fetch_certs(request, certs_url):
+ """Fetches certificates.
+
+ Google-style cerificate endpoints return JSON in the format of
+ ``{'key id': 'x509 certificate'}`` or a certificate array according
+ to the JWK spec (see https://tools.ietf.org/html/rfc7517).
+
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+ certs_url (str): The certificate endpoint URL.
+
+ Returns:
+ Mapping[str, str] | Mapping[str, list]: A mapping of public keys
+ in x.509 or JWK spec.
+ """
+ response = request(certs_url, method="GET")
+
+ if response.status != http_client.OK:
+ raise exceptions.TransportError(
+ "Could not fetch certificates at {}".format(certs_url)
+ )
+
+ return json.loads(response.data.decode("utf-8"))
+
+
+def verify_token(
+ id_token,
+ request,
+ audience=None,
+ certs_url=_GOOGLE_OAUTH2_CERTS_URL,
+ clock_skew_in_seconds=0,
+):
+ """Verifies an ID token and returns the decoded token.
+
+ Args:
+ id_token (Union[str, bytes]): The encoded token.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+ audience (str or list): The audience or audiences that this token is
+ intended for. If None then the audience is not verified.
+ certs_url (str): The URL that specifies the certificates to use to
+ verify the token. This URL should return JSON in the format of
+ ``{'key id': 'x509 certificate'}`` or a certificate array according to
+ the JWK spec (see https://tools.ietf.org/html/rfc7517).
+ clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
+ validation.
+
+ Returns:
+ Mapping[str, Any]: The decoded token.
+ """
+ certs = _fetch_certs(request, certs_url)
+
+ if "keys" in certs:
+ try:
+ import jwt as jwt_lib # type: ignore
+ except ImportError as caught_exc: # pragma: NO COVER
+ raise ImportError(
+ "The pyjwt library is not installed, please install the pyjwt package to use the jwk certs format."
+ ) from caught_exc
+ jwks_client = jwt_lib.PyJWKClient(certs_url)
+ signing_key = jwks_client.get_signing_key_from_jwt(id_token)
+ return jwt_lib.decode(
+ id_token,
+ signing_key.key,
+ algorithms=[signing_key.algorithm_name],
+ audience=audience,
+ )
+ else:
+ return jwt.decode(
+ id_token,
+ certs=certs,
+ audience=audience,
+ clock_skew_in_seconds=clock_skew_in_seconds,
+ )
+
+
+def verify_oauth2_token(id_token, request, audience=None, clock_skew_in_seconds=0):
+ """Verifies an ID Token issued by Google's OAuth 2.0 authorization server.
+
+ Args:
+ id_token (Union[str, bytes]): The encoded token.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+ audience (str): The audience that this token is intended for. This is
+ typically your application's OAuth 2.0 client ID. If None then the
+ audience is not verified.
+ clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
+ validation.
+
+ Returns:
+ Mapping[str, Any]: The decoded token.
+
+ Raises:
+ exceptions.GoogleAuthError: If the issuer is invalid.
+ ValueError: If token verification fails
+ """
+ idinfo = verify_token(
+ id_token,
+ request,
+ audience=audience,
+ certs_url=_GOOGLE_OAUTH2_CERTS_URL,
+ clock_skew_in_seconds=clock_skew_in_seconds,
+ )
+
+ if idinfo["iss"] not in _GOOGLE_ISSUERS:
+ raise exceptions.GoogleAuthError(
+ "Wrong issuer. 'iss' should be one of the following: {}".format(
+ _GOOGLE_ISSUERS
+ )
+ )
+
+ return idinfo
+
+
+def verify_firebase_token(id_token, request, audience=None, clock_skew_in_seconds=0):
+ """Verifies an ID Token issued by Firebase Authentication.
+
+ Args:
+ id_token (Union[str, bytes]): The encoded token.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+ audience (str): The audience that this token is intended for. This is
+ typically your Firebase application ID. If None then the audience
+ is not verified.
+ clock_skew_in_seconds (int): The clock skew used for `iat` and `exp`
+ validation.
+
+ Returns:
+ Mapping[str, Any]: The decoded token.
+ """
+ return verify_token(
+ id_token,
+ request,
+ audience=audience,
+ certs_url=_GOOGLE_APIS_CERTS_URL,
+ clock_skew_in_seconds=clock_skew_in_seconds,
+ )
+
+
+def fetch_id_token_credentials(audience, request=None):
+ """Create the ID Token credentials from the current environment.
+
+ This function acquires ID token from the environment in the following order.
+ See https://google.aip.dev/auth/4110.
+
+ 1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set
+ to the path of a valid service account JSON file, then ID token is
+ acquired using this service account credentials.
+ 2. If the application is running in Compute Engine, App Engine or Cloud Run,
+ then the ID token are obtained from the metadata server.
+ 3. If metadata server doesn't exist and no valid service account credentials
+ are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will
+ be raised.
+
+ Example::
+
+ import google.oauth2.id_token
+ import google.auth.transport.requests
+
+ request = google.auth.transport.requests.Request()
+ target_audience = "https://pubsub.googleapis.com"
+
+ # Create ID token credentials.
+ credentials = google.oauth2.id_token.fetch_id_token_credentials(target_audience, request=request)
+
+ # Refresh the credential to obtain an ID token.
+ credentials.refresh(request)
+
+ id_token = credentials.token
+ id_token_expiry = credentials.expiry
+
+ Args:
+ audience (str): The audience that this ID token is intended for.
+ request (Optional[google.auth.transport.Request]): A callable used to make
+ HTTP requests. A request object will be created if not provided.
+
+ Returns:
+ google.auth.credentials.Credentials: The ID token credentials.
+
+ Raises:
+ ~google.auth.exceptions.DefaultCredentialsError:
+ If metadata server doesn't exist and no valid service account
+ credentials are found.
+ """
+ # 1. Try to get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment
+ # variable.
+ credentials_filename = os.environ.get(environment_vars.CREDENTIALS)
+ if credentials_filename:
+ if not (
+ os.path.exists(credentials_filename)
+ and os.path.isfile(credentials_filename)
+ ):
+ raise exceptions.DefaultCredentialsError(
+ "GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
+ )
+
+ try:
+ with open(credentials_filename, "r") as f:
+ from google.oauth2 import service_account
+
+ info = json.load(f)
+ if info.get("type") == "service_account":
+ return service_account.IDTokenCredentials.from_service_account_info(
+ info, target_audience=audience
+ )
+ except ValueError as caught_exc:
+ new_exc = exceptions.DefaultCredentialsError(
+ "GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.",
+ caught_exc,
+ )
+ raise new_exc from caught_exc
+
+ # 2. Try to fetch ID token from metada server if it exists. The code
+ # works for GAE and Cloud Run metadata server as well.
+ try:
+ from google.auth import compute_engine
+ from google.auth.compute_engine import _metadata
+
+ # Create a request object if not provided.
+ if not request:
+ import google.auth.transport.requests
+
+ request = google.auth.transport.requests.Request()
+
+ if _metadata.ping(request):
+ return compute_engine.IDTokenCredentials(
+ request, audience, use_metadata_identity_endpoint=True
+ )
+ except (ImportError, exceptions.TransportError):
+ pass
+
+ raise exceptions.DefaultCredentialsError(
+ "Neither metadata server or valid service account credentials are found."
+ )
+
+
+def fetch_id_token(request, audience):
+ """Fetch the ID Token from the current environment.
+
+ This function acquires ID token from the environment in the following order.
+ See https://google.aip.dev/auth/4110.
+
+ 1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set
+ to the path of a valid service account JSON file, then ID token is
+ acquired using this service account credentials.
+ 2. If the application is running in Compute Engine, App Engine or Cloud Run,
+ then the ID token are obtained from the metadata server.
+ 3. If metadata server doesn't exist and no valid service account credentials
+ are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will
+ be raised.
+
+ Example::
+
+ import google.oauth2.id_token
+ import google.auth.transport.requests
+
+ request = google.auth.transport.requests.Request()
+ target_audience = "https://pubsub.googleapis.com"
+
+ id_token = google.oauth2.id_token.fetch_id_token(request, target_audience)
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ audience (str): The audience that this ID token is intended for.
+
+ Returns:
+ str: The ID token.
+
+ Raises:
+ ~google.auth.exceptions.DefaultCredentialsError:
+ If metadata server doesn't exist and no valid service account
+ credentials are found.
+ """
+ id_token_credentials = fetch_id_token_credentials(audience, request=request)
+ id_token_credentials.refresh(request)
+ return id_token_credentials.token
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/py.typed b/.venv/lib/python3.12/site-packages/google/oauth2/py.typed
new file mode 100644
index 00000000..d82ed62c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/py.typed
@@ -0,0 +1,2 @@
+# Marker file for PEP 561.
+# The google-oauth2 package uses inline types.
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/reauth.py b/.venv/lib/python3.12/site-packages/google/oauth2/reauth.py
new file mode 100644
index 00000000..1e39e0bc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/reauth.py
@@ -0,0 +1,369 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A module that provides functions for handling rapt authentication.
+
+Reauth is a process of obtaining additional authentication (such as password,
+security token, etc.) while refreshing OAuth 2.0 credentials for a user.
+
+Credentials that use the Reauth flow must have the reauth scope,
+``https://www.googleapis.com/auth/accounts.reauth``.
+
+This module provides a high-level function for executing the Reauth process,
+:func:`refresh_grant`, and lower-level helpers for doing the individual
+steps of the reauth process.
+
+Those steps are:
+
+1. Obtaining a list of challenges from the reauth server.
+2. Running through each challenge and sending the result back to the reauth
+ server.
+3. Refreshing the access token using the returned rapt token.
+"""
+
+import sys
+
+from google.auth import exceptions
+from google.auth import metrics
+from google.oauth2 import _client
+from google.oauth2 import challenges
+
+
+_REAUTH_SCOPE = "https://www.googleapis.com/auth/accounts.reauth"
+_REAUTH_API = "https://reauth.googleapis.com/v2/sessions"
+
+_REAUTH_NEEDED_ERROR = "invalid_grant"
+_REAUTH_NEEDED_ERROR_INVALID_RAPT = "invalid_rapt"
+_REAUTH_NEEDED_ERROR_RAPT_REQUIRED = "rapt_required"
+
+_AUTHENTICATED = "AUTHENTICATED"
+_CHALLENGE_REQUIRED = "CHALLENGE_REQUIRED"
+_CHALLENGE_PENDING = "CHALLENGE_PENDING"
+
+
+# Override this global variable to set custom max number of rounds of reauth
+# challenges should be run.
+RUN_CHALLENGE_RETRY_LIMIT = 5
+
+
+def is_interactive():
+ """Check if we are in an interractive environment.
+
+ Override this function with a different logic if you are using this library
+ outside a CLI.
+
+ If the rapt token needs refreshing, the user needs to answer the challenges.
+ If the user is not in an interractive environment, the challenges can not
+ be answered and we just wait for timeout for no reason.
+
+ Returns:
+ bool: True if is interactive environment, False otherwise.
+ """
+
+ return sys.stdin.isatty()
+
+
+def _get_challenges(
+ request, supported_challenge_types, access_token, requested_scopes=None
+):
+ """Does initial request to reauth API to get the challenges.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ supported_challenge_types (Sequence[str]): list of challenge names
+ supported by the manager.
+ access_token (str): Access token with reauth scopes.
+ requested_scopes (Optional(Sequence[str])): Authorized scopes for the credentials.
+
+ Returns:
+ dict: The response from the reauth API.
+ """
+ body = {"supportedChallengeTypes": supported_challenge_types}
+ if requested_scopes:
+ body["oauthScopesForDomainPolicyLookup"] = requested_scopes
+ metrics_header = {metrics.API_CLIENT_HEADER: metrics.reauth_start()}
+
+ return _client._token_endpoint_request(
+ request,
+ _REAUTH_API + ":start",
+ body,
+ access_token=access_token,
+ use_json=True,
+ headers=metrics_header,
+ )
+
+
+def _send_challenge_result(
+ request, session_id, challenge_id, client_input, access_token
+):
+ """Attempt to refresh access token by sending next challenge result.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ session_id (str): session id returned by the initial reauth call.
+ challenge_id (str): challenge id returned by the initial reauth call.
+ client_input: dict with a challenge-specific client input. For example:
+ ``{'credential': password}`` for password challenge.
+ access_token (str): Access token with reauth scopes.
+
+ Returns:
+ dict: The response from the reauth API.
+ """
+ body = {
+ "sessionId": session_id,
+ "challengeId": challenge_id,
+ "action": "RESPOND",
+ "proposalResponse": client_input,
+ }
+ metrics_header = {metrics.API_CLIENT_HEADER: metrics.reauth_continue()}
+
+ return _client._token_endpoint_request(
+ request,
+ _REAUTH_API + "/{}:continue".format(session_id),
+ body,
+ access_token=access_token,
+ use_json=True,
+ headers=metrics_header,
+ )
+
+
+def _run_next_challenge(msg, request, access_token):
+ """Get the next challenge from msg and run it.
+
+ Args:
+ msg (dict): Reauth API response body (either from the initial request to
+ https://reauth.googleapis.com/v2/sessions:start or from sending the
+ previous challenge response to
+ https://reauth.googleapis.com/v2/sessions/id:continue)
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ access_token (str): reauth access token
+
+ Returns:
+ dict: The response from the reauth API.
+
+ Raises:
+ google.auth.exceptions.ReauthError: if reauth failed.
+ """
+ for challenge in msg["challenges"]:
+ if challenge["status"] != "READY":
+ # Skip non-activated challenges.
+ continue
+ c = challenges.AVAILABLE_CHALLENGES.get(challenge["challengeType"], None)
+ if not c:
+ raise exceptions.ReauthFailError(
+ "Unsupported challenge type {0}. Supported types: {1}".format(
+ challenge["challengeType"],
+ ",".join(list(challenges.AVAILABLE_CHALLENGES.keys())),
+ )
+ )
+ if not c.is_locally_eligible:
+ raise exceptions.ReauthFailError(
+ "Challenge {0} is not locally eligible".format(
+ challenge["challengeType"]
+ )
+ )
+ client_input = c.obtain_challenge_input(challenge)
+ if not client_input:
+ return None
+ return _send_challenge_result(
+ request,
+ msg["sessionId"],
+ challenge["challengeId"],
+ client_input,
+ access_token,
+ )
+ return None
+
+
+def _obtain_rapt(request, access_token, requested_scopes):
+ """Given an http request method and reauth access token, get rapt token.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ access_token (str): reauth access token
+ requested_scopes (Sequence[str]): scopes required by the client application
+
+ Returns:
+ str: The rapt token.
+
+ Raises:
+ google.auth.exceptions.ReauthError: if reauth failed
+ """
+ msg = _get_challenges(
+ request,
+ list(challenges.AVAILABLE_CHALLENGES.keys()),
+ access_token,
+ requested_scopes,
+ )
+
+ if msg["status"] == _AUTHENTICATED:
+ return msg["encodedProofOfReauthToken"]
+
+ for _ in range(0, RUN_CHALLENGE_RETRY_LIMIT):
+ if not (
+ msg["status"] == _CHALLENGE_REQUIRED or msg["status"] == _CHALLENGE_PENDING
+ ):
+ raise exceptions.ReauthFailError(
+ "Reauthentication challenge failed due to API error: {}".format(
+ msg["status"]
+ )
+ )
+
+ if not is_interactive():
+ raise exceptions.ReauthFailError(
+ "Reauthentication challenge could not be answered because you are not"
+ " in an interactive session."
+ )
+
+ msg = _run_next_challenge(msg, request, access_token)
+
+ if not msg:
+ raise exceptions.ReauthFailError("Failed to obtain rapt token.")
+ if msg["status"] == _AUTHENTICATED:
+ return msg["encodedProofOfReauthToken"]
+
+ # If we got here it means we didn't get authenticated.
+ raise exceptions.ReauthFailError("Failed to obtain rapt token.")
+
+
+def get_rapt_token(
+ request, client_id, client_secret, refresh_token, token_uri, scopes=None
+):
+ """Given an http request method and refresh_token, get rapt token.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ client_id (str): client id to get access token for reauth scope.
+ client_secret (str): client secret for the client_id
+ refresh_token (str): refresh token to refresh access token
+ token_uri (str): uri to refresh access token
+ scopes (Optional(Sequence[str])): scopes required by the client application
+
+ Returns:
+ str: The rapt token.
+ Raises:
+ google.auth.exceptions.RefreshError: If reauth failed.
+ """
+ sys.stderr.write("Reauthentication required.\n")
+
+ # Get access token for reauth.
+ access_token, _, _, _ = _client.refresh_grant(
+ request=request,
+ client_id=client_id,
+ client_secret=client_secret,
+ refresh_token=refresh_token,
+ token_uri=token_uri,
+ scopes=[_REAUTH_SCOPE],
+ )
+
+ # Get rapt token from reauth API.
+ rapt_token = _obtain_rapt(request, access_token, requested_scopes=scopes)
+ sys.stderr.write("Reauthentication successful.\n")
+
+ return rapt_token
+
+
+def refresh_grant(
+ request,
+ token_uri,
+ refresh_token,
+ client_id,
+ client_secret,
+ scopes=None,
+ rapt_token=None,
+ enable_reauth_refresh=False,
+):
+ """Implements the reauthentication flow.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorizations server's token endpoint
+ URI.
+ refresh_token (str): The refresh token to use to get a new access
+ token.
+ client_id (str): The OAuth 2.0 application's client ID.
+ client_secret (str): The Oauth 2.0 appliaction's client secret.
+ scopes (Optional(Sequence[str])): Scopes to request. If present, all
+ scopes must be authorized for the refresh token. Useful if refresh
+ token has a wild card scope (e.g.
+ 'https://www.googleapis.com/auth/any-api').
+ rapt_token (Optional(str)): The rapt token for reauth.
+ enable_reauth_refresh (Optional[bool]): Whether reauth refresh flow
+ should be used. The default value is False. This option is for
+ gcloud only, other users should use the default value.
+
+ Returns:
+ Tuple[str, Optional[str], Optional[datetime], Mapping[str, str], str]: The
+ access token, new refresh token, expiration, the additional data
+ returned by the token endpoint, and the rapt token.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+ body = {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "client_id": client_id,
+ "client_secret": client_secret,
+ "refresh_token": refresh_token,
+ }
+ if scopes:
+ body["scope"] = " ".join(scopes)
+ if rapt_token:
+ body["rapt"] = rapt_token
+ metrics_header = {metrics.API_CLIENT_HEADER: metrics.token_request_user()}
+
+ response_status_ok, response_data, retryable_error = _client._token_endpoint_request_no_throw(
+ request, token_uri, body, headers=metrics_header
+ )
+
+ if not response_status_ok and isinstance(response_data, str):
+ raise exceptions.RefreshError(response_data, retryable=False)
+
+ if (
+ not response_status_ok
+ and response_data.get("error") == _REAUTH_NEEDED_ERROR
+ and (
+ response_data.get("error_subtype") == _REAUTH_NEEDED_ERROR_INVALID_RAPT
+ or response_data.get("error_subtype") == _REAUTH_NEEDED_ERROR_RAPT_REQUIRED
+ )
+ ):
+ if not enable_reauth_refresh:
+ raise exceptions.RefreshError(
+ "Reauthentication is needed. Please run `gcloud auth application-default login` to reauthenticate."
+ )
+
+ rapt_token = get_rapt_token(
+ request, client_id, client_secret, refresh_token, token_uri, scopes=scopes
+ )
+ body["rapt"] = rapt_token
+ (
+ response_status_ok,
+ response_data,
+ retryable_error,
+ ) = _client._token_endpoint_request_no_throw(
+ request, token_uri, body, headers=metrics_header
+ )
+
+ if not response_status_ok:
+ _client._handle_error_response(response_data, retryable_error)
+ return _client._handle_refresh_grant_response(response_data, refresh_token) + (
+ rapt_token,
+ )
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/service_account.py b/.venv/lib/python3.12/site-packages/google/oauth2/service_account.py
new file mode 100644
index 00000000..3e84194a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/service_account.py
@@ -0,0 +1,847 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0
+
+This module implements the JWT Profile for OAuth 2.0 Authorization Grants
+as defined by `RFC 7523`_ with particular support for how this RFC is
+implemented in Google's infrastructure. Google refers to these credentials
+as *Service Accounts*.
+
+Service accounts are used for server-to-server communication, such as
+interactions between a web application server and a Google service. The
+service account belongs to your application instead of to an individual end
+user. In contrast to other OAuth 2.0 profiles, no users are involved and your
+application "acts" as the service account.
+
+Typically an application uses a service account when the application uses
+Google APIs to work with its own data rather than a user's data. For example,
+an application that uses Google Cloud Datastore for data persistence would use
+a service account to authenticate its calls to the Google Cloud Datastore API.
+However, an application that needs to access a user's Drive documents would
+use the normal OAuth 2.0 profile.
+
+Additionally, Google Apps domain administrators can grant service accounts
+`domain-wide delegation`_ authority to access user data on behalf of users in
+the domain.
+
+This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used
+in place of the usual authorization token returned during the standard
+OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as
+the acquired access token is used as the bearer token when making requests
+using these credentials.
+
+This profile differs from normal OAuth 2.0 profile because no user consent
+step is required. The use of the private key allows this profile to assert
+identity directly.
+
+This profile also differs from the :mod:`google.auth.jwt` authentication
+because the JWT credentials use the JWT directly as the bearer token. This
+profile instead only uses the JWT to obtain an OAuth 2.0 access token. The
+obtained OAuth 2.0 access token is used as the bearer token.
+
+Domain-wide delegation
+----------------------
+
+Domain-wide delegation allows a service account to access user data on
+behalf of any user in a Google Apps domain without consent from the user.
+For example, an application that uses the Google Calendar API to add events to
+the calendars of all users in a Google Apps domain would use a service account
+to access the Google Calendar API on behalf of users.
+
+The Google Apps administrator must explicitly authorize the service account to
+do this. This authorization step is referred to as "delegating domain-wide
+authority" to a service account.
+
+You can use domain-wise delegation by creating a set of credentials with a
+specific subject using :meth:`~Credentials.with_subject`.
+
+.. _RFC 7523: https://tools.ietf.org/html/rfc7523
+"""
+
+import copy
+import datetime
+
+from google.auth import _helpers
+from google.auth import _service_account_info
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import iam
+from google.auth import jwt
+from google.auth import metrics
+from google.oauth2 import _client
+
+_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
+_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
+
+
+class Credentials(
+ credentials.Signing,
+ credentials.Scoped,
+ credentials.CredentialsWithQuotaProject,
+ credentials.CredentialsWithTokenUri,
+):
+ """Service account credentials
+
+ Usually, you'll create these credentials with one of the helper
+ constructors. To create credentials using a Google service account
+ private key JSON file::
+
+ credentials = service_account.Credentials.from_service_account_file(
+ 'service-account.json')
+
+ Or if you already have the service account file loaded::
+
+ service_account_info = json.load(open('service_account.json'))
+ credentials = service_account.Credentials.from_service_account_info(
+ service_account_info)
+
+ Both helper methods pass on arguments to the constructor, so you can
+ specify additional scopes and a subject if necessary::
+
+ credentials = service_account.Credentials.from_service_account_file(
+ 'service-account.json',
+ scopes=['email'],
+ subject='user@example.com')
+
+ The credentials are considered immutable. If you want to modify the scopes
+ or the subject used for delegation, use :meth:`with_scopes` or
+ :meth:`with_subject`::
+
+ scoped_credentials = credentials.with_scopes(['email'])
+ delegated_credentials = credentials.with_subject(subject)
+
+ To add a quota project, use :meth:`with_quota_project`::
+
+ credentials = credentials.with_quota_project('myproject-123')
+ """
+
+ def __init__(
+ self,
+ signer,
+ service_account_email,
+ token_uri,
+ scopes=None,
+ default_scopes=None,
+ subject=None,
+ project_id=None,
+ quota_project_id=None,
+ additional_claims=None,
+ always_use_jwt_access=False,
+ universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+ trust_boundary=None,
+ ):
+ """
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ service_account_email (str): The service account's email.
+ scopes (Sequence[str]): User-defined scopes to request during the
+ authorization grant.
+ default_scopes (Sequence[str]): Default scopes passed by a
+ Google client library. Use 'scopes' for user-defined scopes.
+ token_uri (str): The OAuth 2.0 Token URI.
+ subject (str): For domain-wide delegation, the email address of the
+ user to for which to request delegated access.
+ project_id (str): Project ID associated with the service account
+ credential.
+ quota_project_id (Optional[str]): The project ID used for quota and
+ billing.
+ additional_claims (Mapping[str, str]): Any additional claims for
+ the JWT assertion used in the authorization grant.
+ always_use_jwt_access (Optional[bool]): Whether self signed JWT should
+ be always used.
+ universe_domain (str): The universe domain. The default
+ universe domain is googleapis.com. For default value self
+ signed jwt is used for token refresh.
+ trust_boundary (str): String representation of trust boundary meta.
+
+ .. note:: Typically one of the helper constructors
+ :meth:`from_service_account_file` or
+ :meth:`from_service_account_info` are used instead of calling the
+ constructor directly.
+ """
+ super(Credentials, self).__init__()
+
+ self._cred_file_path = None
+ self._scopes = scopes
+ self._default_scopes = default_scopes
+ self._signer = signer
+ self._service_account_email = service_account_email
+ self._subject = subject
+ self._project_id = project_id
+ self._quota_project_id = quota_project_id
+ self._token_uri = token_uri
+ self._always_use_jwt_access = always_use_jwt_access
+ self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
+
+ if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
+ self._always_use_jwt_access = True
+
+ self._jwt_credentials = None
+
+ if additional_claims is not None:
+ self._additional_claims = additional_claims
+ else:
+ self._additional_claims = {}
+ self._trust_boundary = {"locations": [], "encoded_locations": "0x0"}
+
+ @classmethod
+ def _from_signer_and_info(cls, signer, info, **kwargs):
+ """Creates a Credentials instance from a signer and service account
+ info.
+
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ info (Mapping[str, str]): The service account info.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.jwt.Credentials: The constructed credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ return cls(
+ signer,
+ service_account_email=info["client_email"],
+ token_uri=info["token_uri"],
+ project_id=info.get("project_id"),
+ universe_domain=info.get(
+ "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
+ ),
+ trust_boundary=info.get("trust_boundary"),
+ **kwargs,
+ )
+
+ @classmethod
+ def from_service_account_info(cls, info, **kwargs):
+ """Creates a Credentials instance from parsed service account info.
+
+ Args:
+ info (Mapping[str, str]): The service account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.Credentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ signer = _service_account_info.from_dict(
+ info, require=["client_email", "token_uri"]
+ )
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ @classmethod
+ def from_service_account_file(cls, filename, **kwargs):
+ """Creates a Credentials instance from a service account json file.
+
+ Args:
+ filename (str): The path to the service account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.Credentials: The constructed
+ credentials.
+ """
+ info, signer = _service_account_info.from_filename(
+ filename, require=["client_email", "token_uri"]
+ )
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ @property
+ def service_account_email(self):
+ """The service account email."""
+ return self._service_account_email
+
+ @property
+ def project_id(self):
+ """Project ID associated with this credential."""
+ return self._project_id
+
+ @property
+ def requires_scopes(self):
+ """Checks if the credentials requires scopes.
+
+ Returns:
+ bool: True if there are no scopes set otherwise False.
+ """
+ return True if not self._scopes else False
+
+ def _make_copy(self):
+ cred = self.__class__(
+ self._signer,
+ service_account_email=self._service_account_email,
+ scopes=copy.copy(self._scopes),
+ default_scopes=copy.copy(self._default_scopes),
+ token_uri=self._token_uri,
+ subject=self._subject,
+ project_id=self._project_id,
+ quota_project_id=self._quota_project_id,
+ additional_claims=self._additional_claims.copy(),
+ always_use_jwt_access=self._always_use_jwt_access,
+ universe_domain=self._universe_domain,
+ )
+ cred._cred_file_path = self._cred_file_path
+ return cred
+
+ @_helpers.copy_docstring(credentials.Scoped)
+ def with_scopes(self, scopes, default_scopes=None):
+ cred = self._make_copy()
+ cred._scopes = scopes
+ cred._default_scopes = default_scopes
+ return cred
+
+ def with_always_use_jwt_access(self, always_use_jwt_access):
+ """Create a copy of these credentials with the specified always_use_jwt_access value.
+
+ Args:
+ always_use_jwt_access (bool): Whether always use self signed JWT or not.
+
+ Returns:
+ google.auth.service_account.Credentials: A new credentials
+ instance.
+ Raises:
+ google.auth.exceptions.InvalidValue: If the universe domain is not
+ default and always_use_jwt_access is False.
+ """
+ cred = self._make_copy()
+ if (
+ cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
+ and not always_use_jwt_access
+ ):
+ raise exceptions.InvalidValue(
+ "always_use_jwt_access should be True for non-default universe domain"
+ )
+ cred._always_use_jwt_access = always_use_jwt_access
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
+ def with_universe_domain(self, universe_domain):
+ cred = self._make_copy()
+ cred._universe_domain = universe_domain
+ if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
+ cred._always_use_jwt_access = True
+ return cred
+
+ def with_subject(self, subject):
+ """Create a copy of these credentials with the specified subject.
+
+ Args:
+ subject (str): The subject claim.
+
+ Returns:
+ google.auth.service_account.Credentials: A new credentials
+ instance.
+ """
+ cred = self._make_copy()
+ cred._subject = subject
+ return cred
+
+ def with_claims(self, additional_claims):
+ """Returns a copy of these credentials with modified claims.
+
+ Args:
+ additional_claims (Mapping[str, str]): Any additional claims for
+ the JWT payload. This will be merged with the current
+ additional claims.
+
+ Returns:
+ google.auth.service_account.Credentials: A new credentials
+ instance.
+ """
+ new_additional_claims = copy.deepcopy(self._additional_claims)
+ new_additional_claims.update(additional_claims or {})
+ cred = self._make_copy()
+ cred._additional_claims = new_additional_claims
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ cred = self._make_copy()
+ cred._quota_project_id = quota_project_id
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
+ def with_token_uri(self, token_uri):
+ cred = self._make_copy()
+ cred._token_uri = token_uri
+ return cred
+
+ def _make_authorization_grant_assertion(self):
+ """Create the OAuth 2.0 assertion.
+
+ This assertion is used during the OAuth 2.0 grant to acquire an
+ access token.
+
+ Returns:
+ bytes: The authorization grant assertion.
+ """
+ now = _helpers.utcnow()
+ lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
+ expiry = now + lifetime
+
+ payload = {
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(expiry),
+ # The issuer must be the service account email.
+ "iss": self._service_account_email,
+ # The audience must be the auth token endpoint's URI
+ "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
+ "scope": _helpers.scopes_to_string(self._scopes or ()),
+ }
+
+ payload.update(self._additional_claims)
+
+ # The subject can be a user email for domain-wide delegation.
+ if self._subject:
+ payload.setdefault("sub", self._subject)
+
+ token = jwt.encode(self._signer, payload)
+
+ return token
+
+ def _use_self_signed_jwt(self):
+ # Since domain wide delegation doesn't work with self signed JWT. If
+ # subject exists, then we should not use self signed JWT.
+ return self._subject is None and self._jwt_credentials is not None
+
+ def _metric_header_for_usage(self):
+ if self._use_self_signed_jwt():
+ return metrics.CRED_TYPE_SA_JWT
+ return metrics.CRED_TYPE_SA_ASSERTION
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ if self._always_use_jwt_access and not self._jwt_credentials:
+ # If self signed jwt should be used but jwt credential is not
+ # created, try to create one with scopes
+ self._create_self_signed_jwt(None)
+
+ if (
+ self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
+ and self._subject
+ ):
+ raise exceptions.RefreshError(
+ "domain wide delegation is not supported for non-default universe domain"
+ )
+
+ if self._use_self_signed_jwt():
+ self._jwt_credentials.refresh(request)
+ self.token = self._jwt_credentials.token.decode()
+ self.expiry = self._jwt_credentials.expiry
+ else:
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = _client.jwt_grant(
+ request, self._token_uri, assertion
+ )
+ self.token = access_token
+ self.expiry = expiry
+
+ def _create_self_signed_jwt(self, audience):
+ """Create a self-signed JWT from the credentials if requirements are met.
+
+ Args:
+ audience (str): The service URL. ``https://[API_ENDPOINT]/``
+ """
+ # https://google.aip.dev/auth/4111
+ if self._always_use_jwt_access:
+ if self._scopes:
+ additional_claims = {"scope": " ".join(self._scopes)}
+ if (
+ self._jwt_credentials is None
+ or self._jwt_credentials.additional_claims != additional_claims
+ ):
+ self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+ self, None, additional_claims=additional_claims
+ )
+ elif audience:
+ if (
+ self._jwt_credentials is None
+ or self._jwt_credentials._audience != audience
+ ):
+
+ self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+ self, audience
+ )
+ elif self._default_scopes:
+ additional_claims = {"scope": " ".join(self._default_scopes)}
+ if (
+ self._jwt_credentials is None
+ or additional_claims != self._jwt_credentials.additional_claims
+ ):
+ self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+ self, None, additional_claims=additional_claims
+ )
+ elif not self._scopes and audience:
+ self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+ self, audience
+ )
+
+ @_helpers.copy_docstring(credentials.Signing)
+ def sign_bytes(self, message):
+ return self._signer.sign(message)
+
+ @property # type: ignore
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer(self):
+ return self._signer
+
+ @property # type: ignore
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer_email(self):
+ return self._service_account_email
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def get_cred_info(self):
+ if self._cred_file_path:
+ return {
+ "credential_source": self._cred_file_path,
+ "credential_type": "service account credentials",
+ "principal": self.service_account_email,
+ }
+ return None
+
+
+class IDTokenCredentials(
+ credentials.Signing,
+ credentials.CredentialsWithQuotaProject,
+ credentials.CredentialsWithTokenUri,
+):
+ """Open ID Connect ID Token-based service account credentials.
+
+ These credentials are largely similar to :class:`.Credentials`, but instead
+ of using an OAuth 2.0 Access Token as the bearer token, they use an Open
+ ID Connect ID Token as the bearer token. These credentials are useful when
+ communicating to services that require ID Tokens and can not accept access
+ tokens.
+
+ Usually, you'll create these credentials with one of the helper
+ constructors. To create credentials using a Google service account
+ private key JSON file::
+
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_file(
+ 'service-account.json'))
+
+
+ Or if you already have the service account file loaded::
+
+ service_account_info = json.load(open('service_account.json'))
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_info(
+ service_account_info))
+
+
+ Both helper methods pass on arguments to the constructor, so you can
+ specify additional scopes and a subject if necessary::
+
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_file(
+ 'service-account.json',
+ scopes=['email'],
+ subject='user@example.com'))
+
+
+ The credentials are considered immutable. If you want to modify the scopes
+ or the subject used for delegation, use :meth:`with_scopes` or
+ :meth:`with_subject`::
+
+ scoped_credentials = credentials.with_scopes(['email'])
+ delegated_credentials = credentials.with_subject(subject)
+
+ """
+
+ def __init__(
+ self,
+ signer,
+ service_account_email,
+ token_uri,
+ target_audience,
+ additional_claims=None,
+ quota_project_id=None,
+ universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+ ):
+ """
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ service_account_email (str): The service account's email.
+ token_uri (str): The OAuth 2.0 Token URI.
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token. The ID Token's ``aud`` claim
+ will be set to this string.
+ additional_claims (Mapping[str, str]): Any additional claims for
+ the JWT assertion used in the authorization grant.
+ quota_project_id (Optional[str]): The project ID used for quota and billing.
+ universe_domain (str): The universe domain. The default
+ universe domain is googleapis.com. For default value IAM ID
+ token endponint is used for token refresh. Note that
+ iam.serviceAccountTokenCreator role is required to use the IAM
+ endpoint.
+ .. note:: Typically one of the helper constructors
+ :meth:`from_service_account_file` or
+ :meth:`from_service_account_info` are used instead of calling the
+ constructor directly.
+ """
+ super(IDTokenCredentials, self).__init__()
+ self._signer = signer
+ self._service_account_email = service_account_email
+ self._token_uri = token_uri
+ self._target_audience = target_audience
+ self._quota_project_id = quota_project_id
+ self._use_iam_endpoint = False
+
+ if not universe_domain:
+ self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN
+ else:
+ self._universe_domain = universe_domain
+ self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
+ "googleapis.com", self._universe_domain
+ )
+
+ if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
+ self._use_iam_endpoint = True
+
+ if additional_claims is not None:
+ self._additional_claims = additional_claims
+ else:
+ self._additional_claims = {}
+
+ @classmethod
+ def _from_signer_and_info(cls, signer, info, **kwargs):
+ """Creates a credentials instance from a signer and service account
+ info.
+
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ info (Mapping[str, str]): The service account info.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.jwt.IDTokenCredentials: The constructed credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ kwargs.setdefault("service_account_email", info["client_email"])
+ kwargs.setdefault("token_uri", info["token_uri"])
+ if "universe_domain" in info:
+ kwargs["universe_domain"] = info["universe_domain"]
+ return cls(signer, **kwargs)
+
+ @classmethod
+ def from_service_account_info(cls, info, **kwargs):
+ """Creates a credentials instance from parsed service account info.
+
+ Args:
+ info (Mapping[str, str]): The service account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ signer = _service_account_info.from_dict(
+ info, require=["client_email", "token_uri"]
+ )
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ @classmethod
+ def from_service_account_file(cls, filename, **kwargs):
+ """Creates a credentials instance from a service account json file.
+
+ Args:
+ filename (str): The path to the service account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: The constructed
+ credentials.
+ """
+ info, signer = _service_account_info.from_filename(
+ filename, require=["client_email", "token_uri"]
+ )
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ def _make_copy(self):
+ cred = self.__class__(
+ self._signer,
+ service_account_email=self._service_account_email,
+ token_uri=self._token_uri,
+ target_audience=self._target_audience,
+ additional_claims=self._additional_claims.copy(),
+ quota_project_id=self.quota_project_id,
+ universe_domain=self._universe_domain,
+ )
+ # _use_iam_endpoint is not exposed in the constructor
+ cred._use_iam_endpoint = self._use_iam_endpoint
+ return cred
+
+ def with_target_audience(self, target_audience):
+ """Create a copy of these credentials with the specified target
+ audience.
+
+ Args:
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: A new credentials
+ instance.
+ """
+ cred = self._make_copy()
+ cred._target_audience = target_audience
+ return cred
+
+ def _with_use_iam_endpoint(self, use_iam_endpoint):
+ """Create a copy of these credentials with the use_iam_endpoint value.
+
+ Args:
+ use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will
+ be used instead of the token_uri. Note that
+ iam.serviceAccountTokenCreator role is required to use the IAM
+ endpoint. The default value is False. This feature is currently
+ experimental and subject to change without notice.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: A new credentials
+ instance.
+ Raises:
+ google.auth.exceptions.InvalidValue: If the universe domain is not
+ default and use_iam_endpoint is False.
+ """
+ cred = self._make_copy()
+ if (
+ cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN
+ and not use_iam_endpoint
+ ):
+ raise exceptions.InvalidValue(
+ "use_iam_endpoint should be True for non-default universe domain"
+ )
+ cred._use_iam_endpoint = use_iam_endpoint
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ cred = self._make_copy()
+ cred._quota_project_id = quota_project_id
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
+ def with_token_uri(self, token_uri):
+ cred = self._make_copy()
+ cred._token_uri = token_uri
+ return cred
+
+ def _make_authorization_grant_assertion(self):
+ """Create the OAuth 2.0 assertion.
+
+ This assertion is used during the OAuth 2.0 grant to acquire an
+ ID token.
+
+ Returns:
+ bytes: The authorization grant assertion.
+ """
+ now = _helpers.utcnow()
+ lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
+ expiry = now + lifetime
+
+ payload = {
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(expiry),
+ # The issuer must be the service account email.
+ "iss": self.service_account_email,
+ # The audience must be the auth token endpoint's URI
+ "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
+ # The target audience specifies which service the ID token is
+ # intended for.
+ "target_audience": self._target_audience,
+ }
+
+ payload.update(self._additional_claims)
+
+ token = jwt.encode(self._signer, payload)
+
+ return token
+
+ def _refresh_with_iam_endpoint(self, request):
+ """Use IAM generateIdToken endpoint to obtain an ID token.
+
+ It works as follows:
+
+ 1. First we create a self signed jwt with
+ https://www.googleapis.com/auth/iam being the scope.
+
+ 2. Next we use the self signed jwt as the access token, and make a POST
+ request to IAM generateIdToken endpoint. The request body is:
+ {
+ "audience": self._target_audience,
+ "includeEmail": "true",
+ "useEmailAzp": "true",
+ }
+
+ If the request is succesfully, it will return {"token":"the ID token"},
+ and we can extract the ID token and compute its expiry.
+ """
+ jwt_credentials = jwt.Credentials.from_signing_credentials(
+ self,
+ None,
+ additional_claims={"scope": "https://www.googleapis.com/auth/iam"},
+ )
+ jwt_credentials.refresh(request)
+ self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(
+ request,
+ self._iam_id_token_endpoint,
+ self.signer_email,
+ self._target_audience,
+ jwt_credentials.token.decode(),
+ self._universe_domain,
+ )
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ if self._use_iam_endpoint:
+ self._refresh_with_iam_endpoint(request)
+ else:
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = _client.id_token_jwt_grant(
+ request, self._token_uri, assertion
+ )
+ self.token = access_token
+ self.expiry = expiry
+
+ @property
+ def service_account_email(self):
+ """The service account email."""
+ return self._service_account_email
+
+ @_helpers.copy_docstring(credentials.Signing)
+ def sign_bytes(self, message):
+ return self._signer.sign(message)
+
+ @property # type: ignore
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer(self):
+ return self._signer
+
+ @property # type: ignore
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer_email(self):
+ return self._service_account_email
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/sts.py b/.venv/lib/python3.12/site-packages/google/oauth2/sts.py
new file mode 100644
index 00000000..ad396273
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/sts.py
@@ -0,0 +1,176 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""OAuth 2.0 Token Exchange Spec.
+
+This module defines a token exchange utility based on the `OAuth 2.0 Token
+Exchange`_ spec. This will be mainly used to exchange external credentials
+for GCP access tokens in workload identity pools to access Google APIs.
+
+The implementation will support various types of client authentication as
+allowed in the spec.
+
+A deviation on the spec will be for additional Google specific options that
+cannot be easily mapped to parameters defined in the RFC.
+
+The returned dictionary response will be based on the `rfc8693 section 2.2.1`_
+spec JSON response.
+
+.. _OAuth 2.0 Token Exchange: https://tools.ietf.org/html/rfc8693
+.. _rfc8693 section 2.2.1: https://tools.ietf.org/html/rfc8693#section-2.2.1
+"""
+
+import http.client as http_client
+import json
+import urllib
+
+from google.oauth2 import utils
+
+
+_URLENCODED_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"}
+
+
+class Client(utils.OAuthClientAuthHandler):
+ """Implements the OAuth 2.0 token exchange spec based on
+ https://tools.ietf.org/html/rfc8693.
+ """
+
+ def __init__(self, token_exchange_endpoint, client_authentication=None):
+ """Initializes an STS client instance.
+
+ Args:
+ token_exchange_endpoint (str): The token exchange endpoint.
+ client_authentication (Optional(google.oauth2.oauth2_utils.ClientAuthentication)):
+ The optional OAuth client authentication credentials if available.
+ """
+ super(Client, self).__init__(client_authentication)
+ self._token_exchange_endpoint = token_exchange_endpoint
+
+ def _make_request(self, request, headers, request_body):
+ # Initialize request headers.
+ request_headers = _URLENCODED_HEADERS.copy()
+
+ # Inject additional headers.
+ if headers:
+ for k, v in dict(headers).items():
+ request_headers[k] = v
+
+ # Apply OAuth client authentication.
+ self.apply_client_authentication_options(request_headers, request_body)
+
+ # Execute request.
+ response = request(
+ url=self._token_exchange_endpoint,
+ method="POST",
+ headers=request_headers,
+ body=urllib.parse.urlencode(request_body).encode("utf-8"),
+ )
+
+ response_body = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+
+ # If non-200 response received, translate to OAuthError exception.
+ if response.status != http_client.OK:
+ utils.handle_error_response(response_body)
+
+ response_data = json.loads(response_body)
+
+ # Return successful response.
+ return response_data
+
+ def exchange_token(
+ self,
+ request,
+ grant_type,
+ subject_token,
+ subject_token_type,
+ resource=None,
+ audience=None,
+ scopes=None,
+ requested_token_type=None,
+ actor_token=None,
+ actor_token_type=None,
+ additional_options=None,
+ additional_headers=None,
+ ):
+ """Exchanges the provided token for another type of token based on the
+ rfc8693 spec.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ grant_type (str): The OAuth 2.0 token exchange grant type.
+ subject_token (str): The OAuth 2.0 token exchange subject token.
+ subject_token_type (str): The OAuth 2.0 token exchange subject token type.
+ resource (Optional[str]): The optional OAuth 2.0 token exchange resource field.
+ audience (Optional[str]): The optional OAuth 2.0 token exchange audience field.
+ scopes (Optional[Sequence[str]]): The optional list of scopes to use.
+ requested_token_type (Optional[str]): The optional OAuth 2.0 token exchange requested
+ token type.
+ actor_token (Optional[str]): The optional OAuth 2.0 token exchange actor token.
+ actor_token_type (Optional[str]): The optional OAuth 2.0 token exchange actor token type.
+ additional_options (Optional[Mapping[str, str]]): The optional additional
+ non-standard Google specific options.
+ additional_headers (Optional[Mapping[str, str]]): The optional additional
+ headers to pass to the token exchange endpoint.
+
+ Returns:
+ Mapping[str, str]: The token exchange JSON-decoded response data containing
+ the requested token and its expiration time.
+
+ Raises:
+ google.auth.exceptions.OAuthError: If the token endpoint returned
+ an error.
+ """
+ # Initialize request body.
+ request_body = {
+ "grant_type": grant_type,
+ "resource": resource,
+ "audience": audience,
+ "scope": " ".join(scopes or []),
+ "requested_token_type": requested_token_type,
+ "subject_token": subject_token,
+ "subject_token_type": subject_token_type,
+ "actor_token": actor_token,
+ "actor_token_type": actor_token_type,
+ "options": None,
+ }
+ # Add additional non-standard options.
+ if additional_options:
+ request_body["options"] = urllib.parse.quote(json.dumps(additional_options))
+ # Remove empty fields in request body.
+ for k, v in dict(request_body).items():
+ if v is None or v == "":
+ del request_body[k]
+
+ return self._make_request(request, additional_headers, request_body)
+
+ def refresh_token(self, request, refresh_token):
+ """Exchanges a refresh token for an access token based on the
+ RFC6749 spec.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ subject_token (str): The OAuth 2.0 refresh token.
+ """
+
+ return self._make_request(
+ request,
+ None,
+ {"grant_type": "refresh_token", "refresh_token": refresh_token},
+ )
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/utils.py b/.venv/lib/python3.12/site-packages/google/oauth2/utils.py
new file mode 100644
index 00000000..d72ff191
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/utils.py
@@ -0,0 +1,168 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""OAuth 2.0 Utilities.
+
+This module provides implementations for various OAuth 2.0 utilities.
+This includes `OAuth error handling`_ and
+`Client authentication for OAuth flows`_.
+
+OAuth error handling
+--------------------
+This will define interfaces for handling OAuth related error responses as
+stated in `RFC 6749 section 5.2`_.
+This will include a common function to convert these HTTP error responses to a
+:class:`google.auth.exceptions.OAuthError` exception.
+
+
+Client authentication for OAuth flows
+-------------------------------------
+We introduce an interface for defining client authentication credentials based
+on `RFC 6749 section 2.3.1`_. This will expose the following
+capabilities:
+
+ * Ability to support basic authentication via request header.
+ * Ability to support bearer token authentication via request header.
+ * Ability to support client ID / secret authentication via request body.
+
+.. _RFC 6749 section 2.3.1: https://tools.ietf.org/html/rfc6749#section-2.3.1
+.. _RFC 6749 section 5.2: https://tools.ietf.org/html/rfc6749#section-5.2
+"""
+
+import abc
+import base64
+import enum
+import json
+
+from google.auth import exceptions
+
+
+# OAuth client authentication based on
+# https://tools.ietf.org/html/rfc6749#section-2.3.
+class ClientAuthType(enum.Enum):
+ basic = 1
+ request_body = 2
+
+
+class ClientAuthentication(object):
+ """Defines the client authentication credentials for basic and request-body
+ types based on https://tools.ietf.org/html/rfc6749#section-2.3.1.
+ """
+
+ def __init__(self, client_auth_type, client_id, client_secret=None):
+ """Instantiates a client authentication object containing the client ID
+ and secret credentials for basic and response-body auth.
+
+ Args:
+ client_auth_type (google.oauth2.oauth_utils.ClientAuthType): The
+ client authentication type.
+ client_id (str): The client ID.
+ client_secret (Optional[str]): The client secret.
+ """
+ self.client_auth_type = client_auth_type
+ self.client_id = client_id
+ self.client_secret = client_secret
+
+
+class OAuthClientAuthHandler(metaclass=abc.ABCMeta):
+ """Abstract class for handling client authentication in OAuth-based
+ operations.
+ """
+
+ def __init__(self, client_authentication=None):
+ """Instantiates an OAuth client authentication handler.
+
+ Args:
+ client_authentication (Optional[google.oauth2.utils.ClientAuthentication]):
+ The OAuth client authentication credentials if available.
+ """
+ super(OAuthClientAuthHandler, self).__init__()
+ self._client_authentication = client_authentication
+
+ def apply_client_authentication_options(
+ self, headers, request_body=None, bearer_token=None
+ ):
+ """Applies client authentication on the OAuth request's headers or POST
+ body.
+
+ Args:
+ headers (Mapping[str, str]): The HTTP request header.
+ request_body (Optional[Mapping[str, str]]): The HTTP request body
+ dictionary. For requests that do not support request body, this
+ is None and will be ignored.
+ bearer_token (Optional[str]): The optional bearer token.
+ """
+ # Inject authenticated header.
+ self._inject_authenticated_headers(headers, bearer_token)
+ # Inject authenticated request body.
+ if bearer_token is None:
+ self._inject_authenticated_request_body(request_body)
+
+ def _inject_authenticated_headers(self, headers, bearer_token=None):
+ if bearer_token is not None:
+ headers["Authorization"] = "Bearer %s" % bearer_token
+ elif (
+ self._client_authentication is not None
+ and self._client_authentication.client_auth_type is ClientAuthType.basic
+ ):
+ username = self._client_authentication.client_id
+ password = self._client_authentication.client_secret or ""
+
+ credentials = base64.b64encode(
+ ("%s:%s" % (username, password)).encode()
+ ).decode()
+ headers["Authorization"] = "Basic %s" % credentials
+
+ def _inject_authenticated_request_body(self, request_body):
+ if (
+ self._client_authentication is not None
+ and self._client_authentication.client_auth_type
+ is ClientAuthType.request_body
+ ):
+ if request_body is None:
+ raise exceptions.OAuthError(
+ "HTTP request does not support request-body"
+ )
+ else:
+ request_body["client_id"] = self._client_authentication.client_id
+ request_body["client_secret"] = (
+ self._client_authentication.client_secret or ""
+ )
+
+
+def handle_error_response(response_body):
+ """Translates an error response from an OAuth operation into an
+ OAuthError exception.
+
+ Args:
+ response_body (str): The decoded response data.
+
+ Raises:
+ google.auth.exceptions.OAuthError
+ """
+ try:
+ error_components = []
+ error_data = json.loads(response_body)
+
+ error_components.append("Error code {}".format(error_data["error"]))
+ if "error_description" in error_data:
+ error_components.append(": {}".format(error_data["error_description"]))
+ if "error_uri" in error_data:
+ error_components.append(" - {}".format(error_data["error_uri"]))
+ error_details = "".join(error_components)
+ # If no details could be extracted, use the response data.
+ except (KeyError, ValueError):
+ error_details = response_body
+
+ raise exceptions.OAuthError(error_details, response_body)
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
new file mode 100644
index 00000000..e27c7e09
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py
@@ -0,0 +1,82 @@
+import abc
+import os
+import struct
+import subprocess
+
+from google.auth import exceptions
+from google.oauth2.webauthn_types import GetRequest, GetResponse
+
+
+class WebAuthnHandler(abc.ABC):
+ @abc.abstractmethod
+ def is_available(self) -> bool:
+ """Check whether this WebAuthn handler is available"""
+ raise NotImplementedError("is_available method must be implemented")
+
+ @abc.abstractmethod
+ def get(self, get_request: GetRequest) -> GetResponse:
+ """WebAuthn get (assertion)"""
+ raise NotImplementedError("get method must be implemented")
+
+
+class PluginHandler(WebAuthnHandler):
+ """Offloads WebAuthn get reqeust to a pluggable command-line tool.
+
+ Offloads WebAuthn get to a plugin which takes the form of a
+ command-line tool. The command-line tool is configurable via the
+ PluginHandler._ENV_VAR environment variable.
+
+ The WebAuthn plugin should implement the following interface:
+
+ Communication occurs over stdin/stdout, and messages are both sent and
+ received in the form:
+
+ [4 bytes - payload size (little-endian)][variable bytes - json payload]
+ """
+
+ _ENV_VAR = "GOOGLE_AUTH_WEBAUTHN_PLUGIN"
+
+ def is_available(self) -> bool:
+ try:
+ self._find_plugin()
+ except Exception:
+ return False
+ else:
+ return True
+
+ def get(self, get_request: GetRequest) -> GetResponse:
+ request_json = get_request.to_json()
+ cmd = self._find_plugin()
+ response_json = self._call_plugin(cmd, request_json)
+ return GetResponse.from_json(response_json)
+
+ def _call_plugin(self, cmd: str, input_json: str) -> str:
+ # Calculate length of input
+ input_length = len(input_json)
+ length_bytes_le = struct.pack("<I", input_length)
+ request = length_bytes_le + input_json.encode()
+
+ # Call plugin
+ process_result = subprocess.run(
+ [cmd], input=request, capture_output=True, check=True
+ )
+
+ # Check length of response
+ response_len_le = process_result.stdout[:4]
+ response_len = struct.unpack("<I", response_len_le)[0]
+ response = process_result.stdout[4:]
+ if response_len != len(response):
+ raise exceptions.MalformedError(
+ "Plugin response length {} does not match data {}".format(
+ response_len, len(response)
+ )
+ )
+ return response.decode()
+
+ def _find_plugin(self) -> str:
+ plugin_cmd = os.environ.get(PluginHandler._ENV_VAR)
+ if plugin_cmd is None:
+ raise exceptions.InvalidResource(
+ "{} env var is not set".format(PluginHandler._ENV_VAR)
+ )
+ return plugin_cmd
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler_factory.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler_factory.py
new file mode 100644
index 00000000..184329fe
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler_factory.py
@@ -0,0 +1,16 @@
+from typing import List, Optional
+
+from google.oauth2.webauthn_handler import PluginHandler, WebAuthnHandler
+
+
+class WebauthnHandlerFactory:
+ handlers: List[WebAuthnHandler]
+
+ def __init__(self):
+ self.handlers = [PluginHandler()]
+
+ def get_handler(self) -> Optional[WebAuthnHandler]:
+ for handler in self.handlers:
+ if handler.is_available():
+ return handler
+ return None
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_types.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_types.py
new file mode 100644
index 00000000..7784e83d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_types.py
@@ -0,0 +1,156 @@
+from dataclasses import dataclass
+import json
+from typing import Any, Dict, List, Optional
+
+from google.auth import exceptions
+
+
+@dataclass(frozen=True)
+class PublicKeyCredentialDescriptor:
+ """Descriptor for a security key based credential.
+
+ https://www.w3.org/TR/webauthn-3/#dictionary-credential-descriptor
+
+ Args:
+ id: <url-safe base64-encoded> credential id (key handle).
+ transports: <'usb'|'nfc'|'ble'|'internal'> List of supported transports.
+ """
+
+ id: str
+ transports: Optional[List[str]] = None
+
+ def to_dict(self):
+ cred = {"type": "public-key", "id": self.id}
+ if self.transports:
+ cred["transports"] = self.transports
+ return cred
+
+
+@dataclass
+class AuthenticationExtensionsClientInputs:
+ """Client extensions inputs for WebAuthn extensions.
+
+ Args:
+ appid: app id that can be asserted with in addition to rpid.
+ https://www.w3.org/TR/webauthn-3/#sctn-appid-extension
+ """
+
+ appid: Optional[str] = None
+
+ def to_dict(self):
+ extensions = {}
+ if self.appid:
+ extensions["appid"] = self.appid
+ return extensions
+
+
+@dataclass
+class GetRequest:
+ """WebAuthn get request
+
+ Args:
+ origin: Origin where the WebAuthn get assertion takes place.
+ rpid: Relying Party ID.
+ challenge: <url-safe base64-encoded> raw challenge.
+ timeout_ms: Timeout number in millisecond.
+ allow_credentials: List of allowed credentials.
+ user_verification: <'required'|'preferred'|'discouraged'> User verification requirement.
+ extensions: WebAuthn authentication extensions inputs.
+ """
+
+ origin: str
+ rpid: str
+ challenge: str
+ timeout_ms: Optional[int] = None
+ allow_credentials: Optional[List[PublicKeyCredentialDescriptor]] = None
+ user_verification: Optional[str] = None
+ extensions: Optional[AuthenticationExtensionsClientInputs] = None
+
+ def to_json(self) -> str:
+ req_options: Dict[str, Any] = {"rpid": self.rpid, "challenge": self.challenge}
+ if self.timeout_ms:
+ req_options["timeout"] = self.timeout_ms
+ if self.allow_credentials:
+ req_options["allowCredentials"] = [
+ c.to_dict() for c in self.allow_credentials
+ ]
+ if self.user_verification:
+ req_options["userVerification"] = self.user_verification
+ if self.extensions:
+ req_options["extensions"] = self.extensions.to_dict()
+ return json.dumps(
+ {"type": "get", "origin": self.origin, "requestData": req_options}
+ )
+
+
+@dataclass(frozen=True)
+class AuthenticatorAssertionResponse:
+ """Authenticator response to a WebAuthn get (assertion) request.
+
+ https://www.w3.org/TR/webauthn-3/#authenticatorassertionresponse
+
+ Args:
+ client_data_json: <url-safe base64-encoded> client data JSON.
+ authenticator_data: <url-safe base64-encoded> authenticator data.
+ signature: <url-safe base64-encoded> signature.
+ user_handle: <url-safe base64-encoded> user handle.
+ """
+
+ client_data_json: str
+ authenticator_data: str
+ signature: str
+ user_handle: Optional[str]
+
+
+@dataclass(frozen=True)
+class GetResponse:
+ """WebAuthn get (assertion) response.
+
+ Args:
+ id: <url-safe base64-encoded> credential id (key handle).
+ response: The authenticator assertion response.
+ authenticator_attachment: <'cross-platform'|'platform'> The attachment status of the authenticator.
+ client_extension_results: WebAuthn authentication extensions output results in a dictionary.
+ """
+
+ id: str
+ response: AuthenticatorAssertionResponse
+ authenticator_attachment: Optional[str]
+ client_extension_results: Optional[Dict]
+
+ @staticmethod
+ def from_json(json_str: str):
+ """Verify and construct GetResponse from a JSON string."""
+ try:
+ resp_json = json.loads(json_str)
+ except ValueError:
+ raise exceptions.MalformedError("Invalid Get JSON response")
+ if resp_json.get("type") != "getResponse":
+ raise exceptions.MalformedError(
+ "Invalid Get response type: {}".format(resp_json.get("type"))
+ )
+ pk_cred = resp_json.get("responseData")
+ if pk_cred is None:
+ if resp_json.get("error"):
+ raise exceptions.ReauthFailError(
+ "WebAuthn.get failure: {}".format(resp_json["error"])
+ )
+ else:
+ raise exceptions.MalformedError("Get response is empty")
+ if pk_cred.get("type") != "public-key":
+ raise exceptions.MalformedError(
+ "Invalid credential type: {}".format(pk_cred.get("type"))
+ )
+ assertion_json = pk_cred["response"]
+ assertion_resp = AuthenticatorAssertionResponse(
+ client_data_json=assertion_json["clientDataJSON"],
+ authenticator_data=assertion_json["authenticatorData"],
+ signature=assertion_json["signature"],
+ user_handle=assertion_json.get("userHandle"),
+ )
+ return GetResponse(
+ id=pk_cred["id"],
+ response=assertion_resp,
+ authenticator_attachment=pk_cred.get("authenticatorAttachment"),
+ client_extension_results=pk_cred.get("clientExtensionResults"),
+ )