diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/oauth2 | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/oauth2')
19 files changed, 5013 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/__init__.py b/.venv/lib/python3.12/site-packages/google/oauth2/__init__.py new file mode 100644 index 00000000..accae965 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/__init__.py @@ -0,0 +1,36 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google OAuth 2.0 Library for Python.""" + +import sys +import warnings + + +class Python37DeprecationWarning(DeprecationWarning): # pragma: NO COVER + """ + Deprecation warning raised when Python 3.7 runtime is detected. + Python 3.7 support will be dropped after January 1, 2024. + """ + + pass + + +# Checks if the current runtime is Python 3.7. +if sys.version_info.major == 3 and sys.version_info.minor == 7: # pragma: NO COVER + message = ( + "After January 1, 2024, new releases of this library will drop support " + "for Python 3.7." + ) + warnings.warn(message, Python37DeprecationWarning) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_client.py b/.venv/lib/python3.12/site-packages/google/oauth2/_client.py new file mode 100644 index 00000000..5a9fc350 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/_client.py @@ -0,0 +1,508 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OAuth 2.0 client. + +This is a client for interacting with an OAuth 2.0 authorization server's +token endpoint. + +For more information about the token endpoint, see +`Section 3.1 of rfc6749`_ + +.. _Section 3.1 of rfc6749: https://tools.ietf.org/html/rfc6749#section-3.2 +""" + +import datetime +import http.client as http_client +import json +import urllib + +from google.auth import _exponential_backoff +from google.auth import _helpers +from google.auth import credentials +from google.auth import exceptions +from google.auth import jwt +from google.auth import metrics +from google.auth import transport + +_URLENCODED_CONTENT_TYPE = "application/x-www-form-urlencoded" +_JSON_CONTENT_TYPE = "application/json" +_JWT_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer" +_REFRESH_GRANT_TYPE = "refresh_token" + + +def _handle_error_response(response_data, retryable_error): + """Translates an error response into an exception. + + Args: + response_data (Mapping | str): The decoded response data. + retryable_error Optional[bool]: A boolean indicating if an error is retryable. + Defaults to False. + + Raises: + google.auth.exceptions.RefreshError: The errors contained in response_data. + """ + + retryable_error = retryable_error if retryable_error else False + + if isinstance(response_data, str): + raise exceptions.RefreshError(response_data, retryable=retryable_error) + try: + error_details = "{}: {}".format( + response_data["error"], response_data.get("error_description") + ) + # If no details could be extracted, use the response data. + except (KeyError, ValueError): + error_details = json.dumps(response_data) + + raise exceptions.RefreshError( + error_details, response_data, retryable=retryable_error + ) + + +def _can_retry(status_code, response_data): + """Checks if a request can be retried by inspecting the status code + and response body of the request. + + Args: + status_code (int): The response status code. + response_data (Mapping | str): The decoded response data. + + Returns: + bool: True if the response is retryable. False otherwise. + """ + if status_code in transport.DEFAULT_RETRYABLE_STATUS_CODES: + return True + + try: + # For a failed response, response_body could be a string + error_desc = response_data.get("error_description") or "" + error_code = response_data.get("error") or "" + + if not isinstance(error_code, str) or not isinstance(error_desc, str): + return False + + # Per Oauth 2.0 RFC https://www.rfc-editor.org/rfc/rfc6749.html#section-4.1.2.1 + # This is needed because a redirect will not return a 500 status code. + retryable_error_descriptions = { + "internal_failure", + "server_error", + "temporarily_unavailable", + } + + if any(e in retryable_error_descriptions for e in (error_code, error_desc)): + return True + + except AttributeError: + pass + + return False + + +def _parse_expiry(response_data): + """Parses the expiry field from a response into a datetime. + + Args: + response_data (Mapping): The JSON-parsed response data. + + Returns: + Optional[datetime]: The expiration or ``None`` if no expiration was + specified. + """ + expires_in = response_data.get("expires_in", None) + + if expires_in is not None: + # Some services do not respect the OAUTH2.0 RFC and send expires_in as a + # JSON String. + if isinstance(expires_in, str): + expires_in = int(expires_in) + + return _helpers.utcnow() + datetime.timedelta(seconds=expires_in) + else: + return None + + +def _token_endpoint_request_no_throw( + request, + token_uri, + body, + access_token=None, + use_json=False, + can_retry=True, + headers=None, + **kwargs +): + """Makes a request to the OAuth 2.0 authorization server's token endpoint. + This function doesn't throw on response errors. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + body (Mapping[str, str]): The parameters to send in the request body. + access_token (Optional(str)): The access token needed to make the request. + use_json (Optional(bool)): Use urlencoded format or json format for the + content type. The default value is False. + can_retry (bool): Enable or disable request retry behavior. + headers (Optional[Mapping[str, str]]): The headers for the request. + kwargs: Additional arguments passed on to the request method. The + kwargs will be passed to `requests.request` method, see: + https://docs.python-requests.org/en/latest/api/#requests.request. + For example, you can use `cert=("cert_pem_path", "key_pem_path")` + to set up client side SSL certificate, and use + `verify="ca_bundle_path"` to set up the CA certificates for sever + side SSL certificate verification. + + Returns: + Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating + if the request is successful, a mapping for the JSON-decoded response + data and in the case of an error a boolean indicating if the error + is retryable. + """ + if use_json: + headers_to_use = {"Content-Type": _JSON_CONTENT_TYPE} + body = json.dumps(body).encode("utf-8") + else: + headers_to_use = {"Content-Type": _URLENCODED_CONTENT_TYPE} + body = urllib.parse.urlencode(body).encode("utf-8") + + if access_token: + headers_to_use["Authorization"] = "Bearer {}".format(access_token) + + if headers: + headers_to_use.update(headers) + + response_data = {} + retryable_error = False + + retries = _exponential_backoff.ExponentialBackoff() + for _ in retries: + response = request( + method="POST", url=token_uri, headers=headers_to_use, body=body, **kwargs + ) + response_body = ( + response.data.decode("utf-8") + if hasattr(response.data, "decode") + else response.data + ) + + try: + # response_body should be a JSON + response_data = json.loads(response_body) + except ValueError: + response_data = response_body + + if response.status == http_client.OK: + return True, response_data, None + + retryable_error = _can_retry( + status_code=response.status, response_data=response_data + ) + + if not can_retry or not retryable_error: + return False, response_data, retryable_error + + return False, response_data, retryable_error + + +def _token_endpoint_request( + request, + token_uri, + body, + access_token=None, + use_json=False, + can_retry=True, + headers=None, + **kwargs +): + """Makes a request to the OAuth 2.0 authorization server's token endpoint. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + body (Mapping[str, str]): The parameters to send in the request body. + access_token (Optional(str)): The access token needed to make the request. + use_json (Optional(bool)): Use urlencoded format or json format for the + content type. The default value is False. + can_retry (bool): Enable or disable request retry behavior. + headers (Optional[Mapping[str, str]]): The headers for the request. + kwargs: Additional arguments passed on to the request method. The + kwargs will be passed to `requests.request` method, see: + https://docs.python-requests.org/en/latest/api/#requests.request. + For example, you can use `cert=("cert_pem_path", "key_pem_path")` + to set up client side SSL certificate, and use + `verify="ca_bundle_path"` to set up the CA certificates for sever + side SSL certificate verification. + + Returns: + Mapping[str, str]: The JSON-decoded response data. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + + response_status_ok, response_data, retryable_error = _token_endpoint_request_no_throw( + request, + token_uri, + body, + access_token=access_token, + use_json=use_json, + can_retry=can_retry, + headers=headers, + **kwargs + ) + if not response_status_ok: + _handle_error_response(response_data, retryable_error) + return response_data + + +def jwt_grant(request, token_uri, assertion, can_retry=True): + """Implements the JWT Profile for OAuth 2.0 Authorization Grants. + + For more details, see `rfc7523 section 4`_. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + assertion (str): The OAuth 2.0 assertion. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Tuple[str, Optional[datetime], Mapping[str, str]]: The access token, + expiration, and additional data returned by the token endpoint. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + + .. _rfc7523 section 4: https://tools.ietf.org/html/rfc7523#section-4 + """ + body = {"assertion": assertion, "grant_type": _JWT_GRANT_TYPE} + + response_data = _token_endpoint_request( + request, + token_uri, + body, + can_retry=can_retry, + headers={ + metrics.API_CLIENT_HEADER: metrics.token_request_access_token_sa_assertion() + }, + ) + + try: + access_token = response_data["access_token"] + except KeyError as caught_exc: + new_exc = exceptions.RefreshError( + "No access token in response.", response_data, retryable=False + ) + raise new_exc from caught_exc + + expiry = _parse_expiry(response_data) + + return access_token, expiry, response_data + + +def call_iam_generate_id_token_endpoint( + request, + iam_id_token_endpoint, + signer_email, + audience, + access_token, + universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, +): + """Call iam.generateIdToken endpoint to get ID token. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + iam_id_token_endpoint (str): The IAM ID token endpoint to use. + signer_email (str): The signer email used to form the IAM + generateIdToken endpoint. + audience (str): The audience for the ID token. + access_token (str): The access token used to call the IAM endpoint. + + Returns: + Tuple[str, datetime]: The ID token and expiration. + """ + body = {"audience": audience, "includeEmail": "true", "useEmailAzp": "true"} + + response_data = _token_endpoint_request( + request, + iam_id_token_endpoint.replace( + credentials.DEFAULT_UNIVERSE_DOMAIN, universe_domain + ).format(signer_email), + body, + access_token=access_token, + use_json=True, + ) + + try: + id_token = response_data["token"] + except KeyError as caught_exc: + new_exc = exceptions.RefreshError( + "No ID token in response.", response_data, retryable=False + ) + raise new_exc from caught_exc + + payload = jwt.decode(id_token, verify=False) + expiry = datetime.datetime.utcfromtimestamp(payload["exp"]) + + return id_token, expiry + + +def id_token_jwt_grant(request, token_uri, assertion, can_retry=True): + """Implements the JWT Profile for OAuth 2.0 Authorization Grants, but + requests an OpenID Connect ID Token instead of an access token. + + This is a variant on the standard JWT Profile that is currently unique + to Google. This was added for the benefit of authenticating to services + that require ID Tokens instead of access tokens or JWT bearer tokens. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorization server's token endpoint + URI. + assertion (str): JWT token signed by a service account. The token's + payload must include a ``target_audience`` claim. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Tuple[str, Optional[datetime], Mapping[str, str]]: + The (encoded) Open ID Connect ID Token, expiration, and additional + data returned by the endpoint. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + body = {"assertion": assertion, "grant_type": _JWT_GRANT_TYPE} + + response_data = _token_endpoint_request( + request, + token_uri, + body, + can_retry=can_retry, + headers={ + metrics.API_CLIENT_HEADER: metrics.token_request_id_token_sa_assertion() + }, + ) + + try: + id_token = response_data["id_token"] + except KeyError as caught_exc: + new_exc = exceptions.RefreshError( + "No ID token in response.", response_data, retryable=False + ) + raise new_exc from caught_exc + + payload = jwt.decode(id_token, verify=False) + expiry = datetime.datetime.utcfromtimestamp(payload["exp"]) + + return id_token, expiry, response_data + + +def _handle_refresh_grant_response(response_data, refresh_token): + """Extract tokens from refresh grant response. + + Args: + response_data (Mapping[str, str]): Refresh grant response data. + refresh_token (str): Current refresh token. + + Returns: + Tuple[str, str, Optional[datetime], Mapping[str, str]]: The access token, + refresh token, expiration, and additional data returned by the token + endpoint. If response_data doesn't have refresh token, then the current + refresh token will be returned. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + try: + access_token = response_data["access_token"] + except KeyError as caught_exc: + new_exc = exceptions.RefreshError( + "No access token in response.", response_data, retryable=False + ) + raise new_exc from caught_exc + + refresh_token = response_data.get("refresh_token", refresh_token) + expiry = _parse_expiry(response_data) + + return access_token, refresh_token, expiry, response_data + + +def refresh_grant( + request, + token_uri, + refresh_token, + client_id, + client_secret, + scopes=None, + rapt_token=None, + can_retry=True, +): + """Implements the OAuth 2.0 refresh token grant. + + For more details, see `rfc678 section 6`_. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + refresh_token (str): The refresh token to use to get a new access + token. + client_id (str): The OAuth 2.0 application's client ID. + client_secret (str): The Oauth 2.0 appliaction's client secret. + scopes (Optional(Sequence[str])): Scopes to request. If present, all + scopes must be authorized for the refresh token. Useful if refresh + token has a wild card scope (e.g. + 'https://www.googleapis.com/auth/any-api'). + rapt_token (Optional(str)): The reauth Proof Token. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Tuple[str, str, Optional[datetime], Mapping[str, str]]: The access + token, new or current refresh token, expiration, and additional data + returned by the token endpoint. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + + .. _rfc6748 section 6: https://tools.ietf.org/html/rfc6749#section-6 + """ + body = { + "grant_type": _REFRESH_GRANT_TYPE, + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + } + if scopes: + body["scope"] = " ".join(scopes) + if rapt_token: + body["rapt"] = rapt_token + + response_data = _token_endpoint_request( + request, token_uri, body, can_retry=can_retry + ) + return _handle_refresh_grant_response(response_data, refresh_token) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_client_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_client_async.py new file mode 100644 index 00000000..8867f0a5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/_client_async.py @@ -0,0 +1,286 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OAuth 2.0 async client. + +This is a client for interacting with an OAuth 2.0 authorization server's +token endpoint. + +For more information about the token endpoint, see +`Section 3.1 of rfc6749`_ + +.. _Section 3.1 of rfc6749: https://tools.ietf.org/html/rfc6749#section-3.2 +""" + +import datetime +import http.client as http_client +import json +import urllib + +from google.auth import _exponential_backoff +from google.auth import exceptions +from google.auth import jwt +from google.oauth2 import _client as client + + +async def _token_endpoint_request_no_throw( + request, token_uri, body, access_token=None, use_json=False, can_retry=True +): + """Makes a request to the OAuth 2.0 authorization server's token endpoint. + This function doesn't throw on response errors. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + body (Mapping[str, str]): The parameters to send in the request body. + access_token (Optional(str)): The access token needed to make the request. + use_json (Optional(bool)): Use urlencoded format or json format for the + content type. The default value is False. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating + if the request is successful, a mapping for the JSON-decoded response + data and in the case of an error a boolean indicating if the error + is retryable. + """ + if use_json: + headers = {"Content-Type": client._JSON_CONTENT_TYPE} + body = json.dumps(body).encode("utf-8") + else: + headers = {"Content-Type": client._URLENCODED_CONTENT_TYPE} + body = urllib.parse.urlencode(body).encode("utf-8") + + if access_token: + headers["Authorization"] = "Bearer {}".format(access_token) + + response_data = {} + retryable_error = False + + retries = _exponential_backoff.ExponentialBackoff() + for _ in retries: + response = await request( + method="POST", url=token_uri, headers=headers, body=body + ) + + # Using data.read() resulted in zlib decompression errors. This may require future investigation. + response_body1 = await response.content() + + response_body = ( + response_body1.decode("utf-8") + if hasattr(response_body1, "decode") + else response_body1 + ) + + try: + response_data = json.loads(response_body) + except ValueError: + response_data = response_body + + if response.status == http_client.OK: + return True, response_data, None + + retryable_error = client._can_retry( + status_code=response.status, response_data=response_data + ) + + if not can_retry or not retryable_error: + return False, response_data, retryable_error + + return False, response_data, retryable_error + + +async def _token_endpoint_request( + request, token_uri, body, access_token=None, use_json=False, can_retry=True +): + """Makes a request to the OAuth 2.0 authorization server's token endpoint. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + body (Mapping[str, str]): The parameters to send in the request body. + access_token (Optional(str)): The access token needed to make the request. + use_json (Optional(bool)): Use urlencoded format or json format for the + content type. The default value is False. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Mapping[str, str]: The JSON-decoded response data. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + + response_status_ok, response_data, retryable_error = await _token_endpoint_request_no_throw( + request, + token_uri, + body, + access_token=access_token, + use_json=use_json, + can_retry=can_retry, + ) + if not response_status_ok: + client._handle_error_response(response_data, retryable_error) + return response_data + + +async def jwt_grant(request, token_uri, assertion, can_retry=True): + """Implements the JWT Profile for OAuth 2.0 Authorization Grants. + + For more details, see `rfc7523 section 4`_. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + assertion (str): The OAuth 2.0 assertion. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Tuple[str, Optional[datetime], Mapping[str, str]]: The access token, + expiration, and additional data returned by the token endpoint. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + + .. _rfc7523 section 4: https://tools.ietf.org/html/rfc7523#section-4 + """ + body = {"assertion": assertion, "grant_type": client._JWT_GRANT_TYPE} + + response_data = await _token_endpoint_request( + request, token_uri, body, can_retry=can_retry + ) + + try: + access_token = response_data["access_token"] + except KeyError as caught_exc: + new_exc = exceptions.RefreshError( + "No access token in response.", response_data, retryable=False + ) + raise new_exc from caught_exc + + expiry = client._parse_expiry(response_data) + + return access_token, expiry, response_data + + +async def id_token_jwt_grant(request, token_uri, assertion, can_retry=True): + """Implements the JWT Profile for OAuth 2.0 Authorization Grants, but + requests an OpenID Connect ID Token instead of an access token. + + This is a variant on the standard JWT Profile that is currently unique + to Google. This was added for the benefit of authenticating to services + that require ID Tokens instead of access tokens or JWT bearer tokens. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorization server's token endpoint + URI. + assertion (str): JWT token signed by a service account. The token's + payload must include a ``target_audience`` claim. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Tuple[str, Optional[datetime], Mapping[str, str]]: + The (encoded) Open ID Connect ID Token, expiration, and additional + data returned by the endpoint. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + body = {"assertion": assertion, "grant_type": client._JWT_GRANT_TYPE} + + response_data = await _token_endpoint_request( + request, token_uri, body, can_retry=can_retry + ) + + try: + id_token = response_data["id_token"] + except KeyError as caught_exc: + new_exc = exceptions.RefreshError( + "No ID token in response.", response_data, retryable=False + ) + raise new_exc from caught_exc + + payload = jwt.decode(id_token, verify=False) + expiry = datetime.datetime.utcfromtimestamp(payload["exp"]) + + return id_token, expiry, response_data + + +async def refresh_grant( + request, + token_uri, + refresh_token, + client_id, + client_secret, + scopes=None, + rapt_token=None, + can_retry=True, +): + """Implements the OAuth 2.0 refresh token grant. + + For more details, see `rfc678 section 6`_. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + refresh_token (str): The refresh token to use to get a new access + token. + client_id (str): The OAuth 2.0 application's client ID. + client_secret (str): The Oauth 2.0 appliaction's client secret. + scopes (Optional(Sequence[str])): Scopes to request. If present, all + scopes must be authorized for the refresh token. Useful if refresh + token has a wild card scope (e.g. + 'https://www.googleapis.com/auth/any-api'). + rapt_token (Optional(str)): The reauth Proof Token. + can_retry (bool): Enable or disable request retry behavior. + + Returns: + Tuple[str, Optional[str], Optional[datetime], Mapping[str, str]]: The + access token, new or current refresh token, expiration, and additional data + returned by the token endpoint. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + + .. _rfc6748 section 6: https://tools.ietf.org/html/rfc6749#section-6 + """ + body = { + "grant_type": client._REFRESH_GRANT_TYPE, + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + } + if scopes: + body["scope"] = " ".join(scopes) + if rapt_token: + body["rapt"] = rapt_token + + response_data = await _token_endpoint_request( + request, token_uri, body, can_retry=can_retry + ) + return client._handle_refresh_grant_response(response_data, refresh_token) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_credentials_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_credentials_async.py new file mode 100644 index 00000000..b5561aae --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/_credentials_async.py @@ -0,0 +1,118 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OAuth 2.0 Async Credentials. + +This module provides credentials based on OAuth 2.0 access and refresh tokens. +These credentials usually access resources on behalf of a user (resource +owner). + +Specifically, this is intended to use access tokens acquired using the +`Authorization Code grant`_ and can refresh those tokens using a +optional `refresh token`_. + +Obtaining the initial access and refresh token is outside of the scope of this +module. Consult `rfc6749 section 4.1`_ for complete details on the +Authorization Code grant flow. + +.. _Authorization Code grant: https://tools.ietf.org/html/rfc6749#section-1.3.1 +.. _refresh token: https://tools.ietf.org/html/rfc6749#section-6 +.. _rfc6749 section 4.1: https://tools.ietf.org/html/rfc6749#section-4.1 +""" + +from google.auth import _credentials_async as credentials +from google.auth import _helpers +from google.auth import exceptions +from google.oauth2 import _reauth_async as reauth +from google.oauth2 import credentials as oauth2_credentials + + +class Credentials(oauth2_credentials.Credentials): + """Credentials using OAuth 2.0 access and refresh tokens. + + The credentials are considered immutable. If you want to modify the + quota project, use :meth:`with_quota_project` or :: + + credentials = credentials.with_quota_project('myproject-123) + """ + + @_helpers.copy_docstring(credentials.Credentials) + async def refresh(self, request): + if ( + self._refresh_token is None + or self._token_uri is None + or self._client_id is None + or self._client_secret is None + ): + raise exceptions.RefreshError( + "The credentials do not contain the necessary fields need to " + "refresh the access token. You must specify refresh_token, " + "token_uri, client_id, and client_secret." + ) + + ( + access_token, + refresh_token, + expiry, + grant_response, + rapt_token, + ) = await reauth.refresh_grant( + request, + self._token_uri, + self._refresh_token, + self._client_id, + self._client_secret, + scopes=self._scopes, + rapt_token=self._rapt_token, + enable_reauth_refresh=self._enable_reauth_refresh, + ) + + self.token = access_token + self.expiry = expiry + self._refresh_token = refresh_token + self._id_token = grant_response.get("id_token") + self._rapt_token = rapt_token + + if self._scopes and "scope" in grant_response: + requested_scopes = frozenset(self._scopes) + granted_scopes = frozenset(grant_response["scope"].split()) + scopes_requested_but_not_granted = requested_scopes - granted_scopes + if scopes_requested_but_not_granted: + raise exceptions.RefreshError( + "Not all requested scopes were granted by the " + "authorization server, missing scopes {}.".format( + ", ".join(scopes_requested_but_not_granted) + ) + ) + + @_helpers.copy_docstring(credentials.Credentials) + async def before_request(self, request, method, url, headers): + if not self.valid: + await self.refresh(request) + self.apply(headers) + + +class UserAccessTokenCredentials(oauth2_credentials.UserAccessTokenCredentials): + """Access token credentials for user account. + + Obtain the access token for a given user account or the current active + user account with the ``gcloud auth print-access-token`` command. + + Args: + account (Optional[str]): Account to get the access token for. If not + specified, the current active account will be used. + quota_project_id (Optional[str]): The project ID used for quota + and billing. + + """ diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_id_token_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_id_token_async.py new file mode 100644 index 00000000..6594e416 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/_id_token_async.py @@ -0,0 +1,285 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google ID Token helpers. + +Provides support for verifying `OpenID Connect ID Tokens`_, especially ones +generated by Google infrastructure. + +To parse and verify an ID Token issued by Google's OAuth 2.0 authorization +server use :func:`verify_oauth2_token`. To verify an ID Token issued by +Firebase, use :func:`verify_firebase_token`. + +A general purpose ID Token verifier is available as :func:`verify_token`. + +Example:: + + from google.oauth2 import _id_token_async + from google.auth.transport import aiohttp_requests + + request = aiohttp_requests.Request() + + id_info = await _id_token_async.verify_oauth2_token( + token, request, 'my-client-id.example.com') + + if id_info['iss'] != 'https://accounts.google.com': + raise ValueError('Wrong issuer.') + + userid = id_info['sub'] + +By default, this will re-fetch certificates for each verification. Because +Google's public keys are only changed infrequently (on the order of once per +day), you may wish to take advantage of caching to reduce latency and the +potential for network errors. This can be accomplished using an external +library like `CacheControl`_ to create a cache-aware +:class:`google.auth.transport.Request`:: + + import cachecontrol + import google.auth.transport.requests + import requests + + session = requests.session() + cached_session = cachecontrol.CacheControl(session) + request = google.auth.transport.requests.Request(session=cached_session) + +.. _OpenID Connect ID Token: + http://openid.net/specs/openid-connect-core-1_0.html#IDToken +.. _CacheControl: https://cachecontrol.readthedocs.io +""" + +import http.client as http_client +import json +import os + +from google.auth import environment_vars +from google.auth import exceptions +from google.auth import jwt +from google.auth.transport import requests +from google.oauth2 import id_token as sync_id_token + + +async def _fetch_certs(request, certs_url): + """Fetches certificates. + + Google-style cerificate endpoints return JSON in the format of + ``{'key id': 'x509 certificate'}``. + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. This must be an aiohttp request. + certs_url (str): The certificate endpoint URL. + + Returns: + Mapping[str, str]: A mapping of public key ID to x.509 certificate + data. + """ + response = await request(certs_url, method="GET") + + if response.status != http_client.OK: + raise exceptions.TransportError( + "Could not fetch certificates at {}".format(certs_url) + ) + + data = await response.content() + + return json.loads(data) + + +async def verify_token( + id_token, + request, + audience=None, + certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL, + clock_skew_in_seconds=0, +): + """Verifies an ID token and returns the decoded token. + + Args: + id_token (Union[str, bytes]): The encoded token. + request (google.auth.transport.Request): The object used to make + HTTP requests. This must be an aiohttp request. + audience (str): The audience that this token is intended for. If None + then the audience is not verified. + certs_url (str): The URL that specifies the certificates to use to + verify the token. This URL should return JSON in the format of + ``{'key id': 'x509 certificate'}``. + clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` + validation. + + Returns: + Mapping[str, Any]: The decoded token. + """ + certs = await _fetch_certs(request, certs_url) + + return jwt.decode( + id_token, + certs=certs, + audience=audience, + clock_skew_in_seconds=clock_skew_in_seconds, + ) + + +async def verify_oauth2_token( + id_token, request, audience=None, clock_skew_in_seconds=0 +): + """Verifies an ID Token issued by Google's OAuth 2.0 authorization server. + + Args: + id_token (Union[str, bytes]): The encoded token. + request (google.auth.transport.Request): The object used to make + HTTP requests. This must be an aiohttp request. + audience (str): The audience that this token is intended for. This is + typically your application's OAuth 2.0 client ID. If None then the + audience is not verified. + clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` + validation. + + Returns: + Mapping[str, Any]: The decoded token. + + Raises: + exceptions.GoogleAuthError: If the issuer is invalid. + """ + idinfo = await verify_token( + id_token, + request, + audience=audience, + certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL, + clock_skew_in_seconds=clock_skew_in_seconds, + ) + + if idinfo["iss"] not in sync_id_token._GOOGLE_ISSUERS: + raise exceptions.GoogleAuthError( + "Wrong issuer. 'iss' should be one of the following: {}".format( + sync_id_token._GOOGLE_ISSUERS + ) + ) + + return idinfo + + +async def verify_firebase_token( + id_token, request, audience=None, clock_skew_in_seconds=0 +): + """Verifies an ID Token issued by Firebase Authentication. + + Args: + id_token (Union[str, bytes]): The encoded token. + request (google.auth.transport.Request): The object used to make + HTTP requests. This must be an aiohttp request. + audience (str): The audience that this token is intended for. This is + typically your Firebase application ID. If None then the audience + is not verified. + clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` + validation. + + Returns: + Mapping[str, Any]: The decoded token. + """ + return await verify_token( + id_token, + request, + audience=audience, + certs_url=sync_id_token._GOOGLE_APIS_CERTS_URL, + clock_skew_in_seconds=clock_skew_in_seconds, + ) + + +async def fetch_id_token(request, audience): + """Fetch the ID Token from the current environment. + + This function acquires ID token from the environment in the following order. + See https://google.aip.dev/auth/4110. + + 1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set + to the path of a valid service account JSON file, then ID token is + acquired using this service account credentials. + 2. If the application is running in Compute Engine, App Engine or Cloud Run, + then the ID token are obtained from the metadata server. + 3. If metadata server doesn't exist and no valid service account credentials + are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will + be raised. + + Example:: + + import google.oauth2._id_token_async + import google.auth.transport.aiohttp_requests + + request = google.auth.transport.aiohttp_requests.Request() + target_audience = "https://pubsub.googleapis.com" + + id_token = await google.oauth2._id_token_async.fetch_id_token(request, target_audience) + + Args: + request (google.auth.transport.aiohttp_requests.Request): A callable used to make + HTTP requests. + audience (str): The audience that this ID token is intended for. + + Returns: + str: The ID token. + + Raises: + ~google.auth.exceptions.DefaultCredentialsError: + If metadata server doesn't exist and no valid service account + credentials are found. + """ + # 1. Try to get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment + # variable. + credentials_filename = os.environ.get(environment_vars.CREDENTIALS) + if credentials_filename: + if not ( + os.path.exists(credentials_filename) + and os.path.isfile(credentials_filename) + ): + raise exceptions.DefaultCredentialsError( + "GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid." + ) + + try: + with open(credentials_filename, "r") as f: + from google.oauth2 import _service_account_async as service_account + + info = json.load(f) + if info.get("type") == "service_account": + credentials = service_account.IDTokenCredentials.from_service_account_info( + info, target_audience=audience + ) + await credentials.refresh(request) + return credentials.token + except ValueError as caught_exc: + new_exc = exceptions.DefaultCredentialsError( + "GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.", + caught_exc, + ) + raise new_exc from caught_exc + + # 2. Try to fetch ID token from metada server if it exists. The code works + # for GAE and Cloud Run metadata server as well. + try: + from google.auth import compute_engine + from google.auth.compute_engine import _metadata + + request_new = requests.Request() + if _metadata.ping(request_new): + credentials = compute_engine.IDTokenCredentials( + request_new, audience, use_metadata_identity_endpoint=True + ) + credentials.refresh(request_new) + return credentials.token + except (ImportError, exceptions.TransportError): + pass + + raise exceptions.DefaultCredentialsError( + "Neither metadata server or valid service account credentials are found." + ) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_reauth_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_reauth_async.py new file mode 100644 index 00000000..de3675c5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/_reauth_async.py @@ -0,0 +1,328 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A module that provides functions for handling rapt authentication. + +Reauth is a process of obtaining additional authentication (such as password, +security token, etc.) while refreshing OAuth 2.0 credentials for a user. + +Credentials that use the Reauth flow must have the reauth scope, +``https://www.googleapis.com/auth/accounts.reauth``. + +This module provides a high-level function for executing the Reauth process, +:func:`refresh_grant`, and lower-level helpers for doing the individual +steps of the reauth process. + +Those steps are: + +1. Obtaining a list of challenges from the reauth server. +2. Running through each challenge and sending the result back to the reauth + server. +3. Refreshing the access token using the returned rapt token. +""" + +import sys + +from google.auth import exceptions +from google.oauth2 import _client +from google.oauth2 import _client_async +from google.oauth2 import challenges +from google.oauth2 import reauth + + +async def _get_challenges( + request, supported_challenge_types, access_token, requested_scopes=None +): + """Does initial request to reauth API to get the challenges. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. This must be an aiohttp request. + supported_challenge_types (Sequence[str]): list of challenge names + supported by the manager. + access_token (str): Access token with reauth scopes. + requested_scopes (Optional(Sequence[str])): Authorized scopes for the credentials. + + Returns: + dict: The response from the reauth API. + """ + body = {"supportedChallengeTypes": supported_challenge_types} + if requested_scopes: + body["oauthScopesForDomainPolicyLookup"] = requested_scopes + + return await _client_async._token_endpoint_request( + request, + reauth._REAUTH_API + ":start", + body, + access_token=access_token, + use_json=True, + ) + + +async def _send_challenge_result( + request, session_id, challenge_id, client_input, access_token +): + """Attempt to refresh access token by sending next challenge result. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. This must be an aiohttp request. + session_id (str): session id returned by the initial reauth call. + challenge_id (str): challenge id returned by the initial reauth call. + client_input: dict with a challenge-specific client input. For example: + ``{'credential': password}`` for password challenge. + access_token (str): Access token with reauth scopes. + + Returns: + dict: The response from the reauth API. + """ + body = { + "sessionId": session_id, + "challengeId": challenge_id, + "action": "RESPOND", + "proposalResponse": client_input, + } + + return await _client_async._token_endpoint_request( + request, + reauth._REAUTH_API + "/{}:continue".format(session_id), + body, + access_token=access_token, + use_json=True, + ) + + +async def _run_next_challenge(msg, request, access_token): + """Get the next challenge from msg and run it. + + Args: + msg (dict): Reauth API response body (either from the initial request to + https://reauth.googleapis.com/v2/sessions:start or from sending the + previous challenge response to + https://reauth.googleapis.com/v2/sessions/id:continue) + request (google.auth.transport.Request): A callable used to make + HTTP requests. This must be an aiohttp request. + access_token (str): reauth access token + + Returns: + dict: The response from the reauth API. + + Raises: + google.auth.exceptions.ReauthError: if reauth failed. + """ + for challenge in msg["challenges"]: + if challenge["status"] != "READY": + # Skip non-activated challenges. + continue + c = challenges.AVAILABLE_CHALLENGES.get(challenge["challengeType"], None) + if not c: + raise exceptions.ReauthFailError( + "Unsupported challenge type {0}. Supported types: {1}".format( + challenge["challengeType"], + ",".join(list(challenges.AVAILABLE_CHALLENGES.keys())), + ) + ) + if not c.is_locally_eligible: + raise exceptions.ReauthFailError( + "Challenge {0} is not locally eligible".format( + challenge["challengeType"] + ) + ) + client_input = c.obtain_challenge_input(challenge) + if not client_input: + return None + return await _send_challenge_result( + request, + msg["sessionId"], + challenge["challengeId"], + client_input, + access_token, + ) + return None + + +async def _obtain_rapt(request, access_token, requested_scopes): + """Given an http request method and reauth access token, get rapt token. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. This must be an aiohttp request. + access_token (str): reauth access token + requested_scopes (Sequence[str]): scopes required by the client application + + Returns: + str: The rapt token. + + Raises: + google.auth.exceptions.ReauthError: if reauth failed + """ + msg = await _get_challenges( + request, + list(challenges.AVAILABLE_CHALLENGES.keys()), + access_token, + requested_scopes, + ) + + if msg["status"] == reauth._AUTHENTICATED: + return msg["encodedProofOfReauthToken"] + + for _ in range(0, reauth.RUN_CHALLENGE_RETRY_LIMIT): + if not ( + msg["status"] == reauth._CHALLENGE_REQUIRED + or msg["status"] == reauth._CHALLENGE_PENDING + ): + raise exceptions.ReauthFailError( + "Reauthentication challenge failed due to API error: {}".format( + msg["status"] + ) + ) + + if not reauth.is_interactive(): + raise exceptions.ReauthFailError( + "Reauthentication challenge could not be answered because you are not" + " in an interactive session." + ) + + msg = await _run_next_challenge(msg, request, access_token) + + if msg["status"] == reauth._AUTHENTICATED: + return msg["encodedProofOfReauthToken"] + + # If we got here it means we didn't get authenticated. + raise exceptions.ReauthFailError("Failed to obtain rapt token.") + + +async def get_rapt_token( + request, client_id, client_secret, refresh_token, token_uri, scopes=None +): + """Given an http request method and refresh_token, get rapt token. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. This must be an aiohttp request. + client_id (str): client id to get access token for reauth scope. + client_secret (str): client secret for the client_id + refresh_token (str): refresh token to refresh access token + token_uri (str): uri to refresh access token + scopes (Optional(Sequence[str])): scopes required by the client application + + Returns: + str: The rapt token. + Raises: + google.auth.exceptions.RefreshError: If reauth failed. + """ + sys.stderr.write("Reauthentication required.\n") + + # Get access token for reauth. + access_token, _, _, _ = await _client_async.refresh_grant( + request=request, + client_id=client_id, + client_secret=client_secret, + refresh_token=refresh_token, + token_uri=token_uri, + scopes=[reauth._REAUTH_SCOPE], + ) + + # Get rapt token from reauth API. + rapt_token = await _obtain_rapt(request, access_token, requested_scopes=scopes) + + return rapt_token + + +async def refresh_grant( + request, + token_uri, + refresh_token, + client_id, + client_secret, + scopes=None, + rapt_token=None, + enable_reauth_refresh=False, +): + """Implements the reauthentication flow. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. This must be an aiohttp request. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + refresh_token (str): The refresh token to use to get a new access + token. + client_id (str): The OAuth 2.0 application's client ID. + client_secret (str): The Oauth 2.0 appliaction's client secret. + scopes (Optional(Sequence[str])): Scopes to request. If present, all + scopes must be authorized for the refresh token. Useful if refresh + token has a wild card scope (e.g. + 'https://www.googleapis.com/auth/any-api'). + rapt_token (Optional(str)): The rapt token for reauth. + enable_reauth_refresh (Optional[bool]): Whether reauth refresh flow + should be used. The default value is False. This option is for + gcloud only, other users should use the default value. + + Returns: + Tuple[str, Optional[str], Optional[datetime], Mapping[str, str], str]: The + access token, new refresh token, expiration, the additional data + returned by the token endpoint, and the rapt token. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + body = { + "grant_type": _client._REFRESH_GRANT_TYPE, + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + } + if scopes: + body["scope"] = " ".join(scopes) + if rapt_token: + body["rapt"] = rapt_token + + response_status_ok, response_data, retryable_error = await _client_async._token_endpoint_request_no_throw( + request, token_uri, body + ) + if ( + not response_status_ok + and response_data.get("error") == reauth._REAUTH_NEEDED_ERROR + and ( + response_data.get("error_subtype") + == reauth._REAUTH_NEEDED_ERROR_INVALID_RAPT + or response_data.get("error_subtype") + == reauth._REAUTH_NEEDED_ERROR_RAPT_REQUIRED + ) + ): + if not enable_reauth_refresh: + raise exceptions.RefreshError( + "Reauthentication is needed. Please run `gcloud auth application-default login` to reauthenticate." + ) + + rapt_token = await get_rapt_token( + request, client_id, client_secret, refresh_token, token_uri, scopes=scopes + ) + body["rapt"] = rapt_token + ( + response_status_ok, + response_data, + retryable_error, + ) = await _client_async._token_endpoint_request_no_throw( + request, token_uri, body + ) + + if not response_status_ok: + _client._handle_error_response(response_data, retryable_error) + refresh_response = _client._handle_refresh_grant_response( + response_data, refresh_token + ) + return refresh_response + (rapt_token,) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/_service_account_async.py b/.venv/lib/python3.12/site-packages/google/oauth2/_service_account_async.py new file mode 100644 index 00000000..cfd315a7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/_service_account_async.py @@ -0,0 +1,132 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0 + +NOTE: This file adds asynchronous refresh methods to both credentials +classes, and therefore async/await syntax is required when calling this +method when using service account credentials with asynchronous functionality. +Otherwise, all other methods are inherited from the regular service account +credentials file google.oauth2.service_account + +""" + +from google.auth import _credentials_async as credentials_async +from google.auth import _helpers +from google.oauth2 import _client_async +from google.oauth2 import service_account + + +class Credentials( + service_account.Credentials, credentials_async.Scoped, credentials_async.Credentials +): + """Service account credentials + + Usually, you'll create these credentials with one of the helper + constructors. To create credentials using a Google service account + private key JSON file:: + + credentials = _service_account_async.Credentials.from_service_account_file( + 'service-account.json') + + Or if you already have the service account file loaded:: + + service_account_info = json.load(open('service_account.json')) + credentials = _service_account_async.Credentials.from_service_account_info( + service_account_info) + + Both helper methods pass on arguments to the constructor, so you can + specify additional scopes and a subject if necessary:: + + credentials = _service_account_async.Credentials.from_service_account_file( + 'service-account.json', + scopes=['email'], + subject='user@example.com') + + The credentials are considered immutable. If you want to modify the scopes + or the subject used for delegation, use :meth:`with_scopes` or + :meth:`with_subject`:: + + scoped_credentials = credentials.with_scopes(['email']) + delegated_credentials = credentials.with_subject(subject) + + To add a quota project, use :meth:`with_quota_project`:: + + credentials = credentials.with_quota_project('myproject-123') + """ + + @_helpers.copy_docstring(credentials_async.Credentials) + async def refresh(self, request): + assertion = self._make_authorization_grant_assertion() + access_token, expiry, _ = await _client_async.jwt_grant( + request, self._token_uri, assertion + ) + self.token = access_token + self.expiry = expiry + + +class IDTokenCredentials( + service_account.IDTokenCredentials, + credentials_async.Signing, + credentials_async.Credentials, +): + """Open ID Connect ID Token-based service account credentials. + + These credentials are largely similar to :class:`.Credentials`, but instead + of using an OAuth 2.0 Access Token as the bearer token, they use an Open + ID Connect ID Token as the bearer token. These credentials are useful when + communicating to services that require ID Tokens and can not accept access + tokens. + + Usually, you'll create these credentials with one of the helper + constructors. To create credentials using a Google service account + private key JSON file:: + + credentials = ( + _service_account_async.IDTokenCredentials.from_service_account_file( + 'service-account.json')) + + Or if you already have the service account file loaded:: + + service_account_info = json.load(open('service_account.json')) + credentials = ( + _service_account_async.IDTokenCredentials.from_service_account_info( + service_account_info)) + + Both helper methods pass on arguments to the constructor, so you can + specify additional scopes and a subject if necessary:: + + credentials = ( + _service_account_async.IDTokenCredentials.from_service_account_file( + 'service-account.json', + scopes=['email'], + subject='user@example.com')) + + The credentials are considered immutable. If you want to modify the scopes + or the subject used for delegation, use :meth:`with_scopes` or + :meth:`with_subject`:: + + scoped_credentials = credentials.with_scopes(['email']) + delegated_credentials = credentials.with_subject(subject) + + """ + + @_helpers.copy_docstring(credentials_async.Credentials) + async def refresh(self, request): + assertion = self._make_authorization_grant_assertion() + access_token, expiry, _ = await _client_async.id_token_jwt_grant( + request, self._token_uri, assertion + ) + self.token = access_token + self.expiry = expiry diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py b/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py new file mode 100644 index 00000000..6468498b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/challenges.py @@ -0,0 +1,281 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Challenges for reauthentication. +""" + +import abc +import base64 +import getpass +import sys + +from google.auth import _helpers +from google.auth import exceptions +from google.oauth2 import webauthn_handler_factory +from google.oauth2.webauthn_types import ( + AuthenticationExtensionsClientInputs, + GetRequest, + PublicKeyCredentialDescriptor, +) + + +REAUTH_ORIGIN = "https://accounts.google.com" +SAML_CHALLENGE_MESSAGE = ( + "Please run `gcloud auth login` to complete reauthentication with SAML." +) +WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout + + +def get_user_password(text): + """Get password from user. + + Override this function with a different logic if you are using this library + outside a CLI. + + Args: + text (str): message for the password prompt. + + Returns: + str: password string. + """ + return getpass.getpass(text) + + +class ReauthChallenge(metaclass=abc.ABCMeta): + """Base class for reauth challenges.""" + + @property + @abc.abstractmethod + def name(self): # pragma: NO COVER + """Returns the name of the challenge.""" + raise NotImplementedError("name property must be implemented") + + @property + @abc.abstractmethod + def is_locally_eligible(self): # pragma: NO COVER + """Returns true if a challenge is supported locally on this machine.""" + raise NotImplementedError("is_locally_eligible property must be implemented") + + @abc.abstractmethod + def obtain_challenge_input(self, metadata): # pragma: NO COVER + """Performs logic required to obtain credentials and returns it. + + Args: + metadata (Mapping): challenge metadata returned in the 'challenges' field in + the initial reauth request. Includes the 'challengeType' field + and other challenge-specific fields. + + Returns: + response that will be send to the reauth service as the content of + the 'proposalResponse' field in the request body. Usually a dict + with the keys specific to the challenge. For example, + ``{'credential': password}`` for password challenge. + """ + raise NotImplementedError("obtain_challenge_input method must be implemented") + + +class PasswordChallenge(ReauthChallenge): + """Challenge that asks for user's password.""" + + @property + def name(self): + return "PASSWORD" + + @property + def is_locally_eligible(self): + return True + + @_helpers.copy_docstring(ReauthChallenge) + def obtain_challenge_input(self, unused_metadata): + passwd = get_user_password("Please enter your password:") + if not passwd: + passwd = " " # avoid the server crashing in case of no password :D + return {"credential": passwd} + + +class SecurityKeyChallenge(ReauthChallenge): + """Challenge that asks for user's security key touch.""" + + @property + def name(self): + return "SECURITY_KEY" + + @property + def is_locally_eligible(self): + return True + + @_helpers.copy_docstring(ReauthChallenge) + def obtain_challenge_input(self, metadata): + # Check if there is an available Webauthn Handler, if not use pyu2f + try: + factory = webauthn_handler_factory.WebauthnHandlerFactory() + webauthn_handler = factory.get_handler() + if webauthn_handler is not None: + sys.stderr.write("Please insert and touch your security key\n") + return self._obtain_challenge_input_webauthn(metadata, webauthn_handler) + except Exception: + # Attempt pyu2f if exception in webauthn flow + pass + + try: + import pyu2f.convenience.authenticator # type: ignore + import pyu2f.errors # type: ignore + import pyu2f.model # type: ignore + except ImportError: + raise exceptions.ReauthFailError( + "pyu2f dependency is required to use Security key reauth feature. " + "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`." + ) + sk = metadata["securityKey"] + challenges = sk["challenges"] + # Read both 'applicationId' and 'relyingPartyId', if they are the same, use + # applicationId, if they are different, use relyingPartyId first and retry + # with applicationId + application_id = sk["applicationId"] + relying_party_id = sk["relyingPartyId"] + + if application_id != relying_party_id: + application_parameters = [relying_party_id, application_id] + else: + application_parameters = [application_id] + + challenge_data = [] + for c in challenges: + kh = c["keyHandle"].encode("ascii") + key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh))) + challenge = c["challenge"].encode("ascii") + challenge = base64.urlsafe_b64decode(challenge) + challenge_data.append({"key": key, "challenge": challenge}) + + # Track number of tries to suppress error message until all application_parameters + # are tried. + tries = 0 + for app_id in application_parameters: + try: + tries += 1 + api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( + REAUTH_ORIGIN + ) + response = api.Authenticate( + app_id, challenge_data, print_callback=sys.stderr.write + ) + return {"securityKey": response} + except pyu2f.errors.U2FError as e: + if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: + # Only show error if all app_ids have been tried + if tries == len(application_parameters): + sys.stderr.write("Ineligible security key.\n") + return None + continue + if e.code == pyu2f.errors.U2FError.TIMEOUT: + sys.stderr.write( + "Timed out while waiting for security key touch.\n" + ) + else: + raise e + except pyu2f.errors.PluginError as e: + sys.stderr.write("Plugin error: {}.\n".format(e)) + continue + except pyu2f.errors.NoDeviceFoundError: + sys.stderr.write("No security key found.\n") + return None + + def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): + sk = metadata.get("securityKey") + if sk is None: + raise exceptions.InvalidValue("securityKey is None") + challenges = sk.get("challenges") + application_id = sk.get("applicationId") + relying_party_id = sk.get("relyingPartyId") + if challenges is None or len(challenges) < 1: + raise exceptions.InvalidValue("challenges is None or empty") + if application_id is None: + raise exceptions.InvalidValue("application_id is None") + if relying_party_id is None: + raise exceptions.InvalidValue("relying_party_id is None") + + allow_credentials = [] + for challenge in challenges: + kh = challenge.get("keyHandle") + if kh is None: + raise exceptions.InvalidValue("keyHandle is None") + key_handle = self._unpadded_urlsafe_b64recode(kh) + allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle)) + + extension = AuthenticationExtensionsClientInputs(appid=application_id) + + challenge = challenges[0].get("challenge") + if challenge is None: + raise exceptions.InvalidValue("challenge is None") + + get_request = GetRequest( + origin=REAUTH_ORIGIN, + rpid=relying_party_id, + challenge=self._unpadded_urlsafe_b64recode(challenge), + timeout_ms=WEBAUTHN_TIMEOUT_MS, + allow_credentials=allow_credentials, + user_verification="required", + extensions=extension, + ) + + try: + get_response = webauthn_handler.get(get_request) + except Exception as e: + sys.stderr.write("Webauthn Error: {}.\n".format(e)) + raise e + + response = { + "clientData": get_response.response.client_data_json, + "authenticatorData": get_response.response.authenticator_data, + "signatureData": get_response.response.signature, + "applicationId": application_id, + "keyHandle": get_response.id, + "securityKeyReplyType": 2, + } + return {"securityKey": response} + + def _unpadded_urlsafe_b64recode(self, s): + """Converts standard b64 encoded string to url safe b64 encoded string + with no padding.""" + b = base64.urlsafe_b64decode(s) + return base64.urlsafe_b64encode(b).decode().rstrip("=") + + +class SamlChallenge(ReauthChallenge): + """Challenge that asks the users to browse to their ID Providers. + + Currently SAML challenge is not supported. When obtaining the challenge + input, exception will be raised to instruct the users to run + `gcloud auth login` for reauthentication. + """ + + @property + def name(self): + return "SAML" + + @property + def is_locally_eligible(self): + return True + + def obtain_challenge_input(self, metadata): + # Magic Arch has not fully supported returning a proper dedirect URL + # for programmatic SAML users today. So we error our here and request + # users to use gcloud to complete a login. + raise exceptions.ReauthSamlChallengeFailError(SAML_CHALLENGE_MESSAGE) + + +AVAILABLE_CHALLENGES = { + challenge.name: challenge + for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()] +} diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/credentials.py b/.venv/lib/python3.12/site-packages/google/oauth2/credentials.py new file mode 100644 index 00000000..6e158089 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/credentials.py @@ -0,0 +1,614 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OAuth 2.0 Credentials. + +This module provides credentials based on OAuth 2.0 access and refresh tokens. +These credentials usually access resources on behalf of a user (resource +owner). + +Specifically, this is intended to use access tokens acquired using the +`Authorization Code grant`_ and can refresh those tokens using a +optional `refresh token`_. + +Obtaining the initial access and refresh token is outside of the scope of this +module. Consult `rfc6749 section 4.1`_ for complete details on the +Authorization Code grant flow. + +.. _Authorization Code grant: https://tools.ietf.org/html/rfc6749#section-1.3.1 +.. _refresh token: https://tools.ietf.org/html/rfc6749#section-6 +.. _rfc6749 section 4.1: https://tools.ietf.org/html/rfc6749#section-4.1 +""" + +from datetime import datetime +import io +import json +import logging +import warnings + +from google.auth import _cloud_sdk +from google.auth import _helpers +from google.auth import credentials +from google.auth import exceptions +from google.auth import metrics +from google.oauth2 import reauth + +_LOGGER = logging.getLogger(__name__) + + +# The Google OAuth 2.0 token endpoint. Used for authorized user credentials. +_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" + +# The Google OAuth 2.0 token info endpoint. Used for getting token info JSON from access tokens. +_GOOGLE_OAUTH2_TOKEN_INFO_ENDPOINT = "https://oauth2.googleapis.com/tokeninfo" + + +class Credentials(credentials.ReadOnlyScoped, credentials.CredentialsWithQuotaProject): + """Credentials using OAuth 2.0 access and refresh tokens. + + The credentials are considered immutable except the tokens and the token + expiry, which are updated after refresh. If you want to modify the quota + project, use :meth:`with_quota_project` or :: + + credentials = credentials.with_quota_project('myproject-123') + + Reauth is disabled by default. To enable reauth, set the + `enable_reauth_refresh` parameter to True in the constructor. Note that + reauth feature is intended for gcloud to use only. + If reauth is enabled, `pyu2f` dependency has to be installed in order to use security + key reauth feature. Dependency can be installed via `pip install pyu2f` or `pip install + google-auth[reauth]`. + """ + + def __init__( + self, + token, + refresh_token=None, + id_token=None, + token_uri=None, + client_id=None, + client_secret=None, + scopes=None, + default_scopes=None, + quota_project_id=None, + expiry=None, + rapt_token=None, + refresh_handler=None, + enable_reauth_refresh=False, + granted_scopes=None, + trust_boundary=None, + universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, + account=None, + ): + """ + Args: + token (Optional(str)): The OAuth 2.0 access token. Can be None + if refresh information is provided. + refresh_token (str): The OAuth 2.0 refresh token. If specified, + credentials can be refreshed. + id_token (str): The Open ID Connect ID Token. + token_uri (str): The OAuth 2.0 authorization server's token + endpoint URI. Must be specified for refresh, can be left as + None if the token can not be refreshed. + client_id (str): The OAuth 2.0 client ID. Must be specified for + refresh, can be left as None if the token can not be refreshed. + client_secret(str): The OAuth 2.0 client secret. Must be specified + for refresh, can be left as None if the token can not be + refreshed. + scopes (Sequence[str]): The scopes used to obtain authorization. + This parameter is used by :meth:`has_scopes`. OAuth 2.0 + credentials can not request additional scopes after + authorization. The scopes must be derivable from the refresh + token if refresh information is provided (e.g. The refresh + token scopes are a superset of this or contain a wild card + scope like 'https://www.googleapis.com/auth/any-api'). + default_scopes (Sequence[str]): Default scopes passed by a + Google client library. Use 'scopes' for user-defined scopes. + quota_project_id (Optional[str]): The project ID used for quota and billing. + This project may be different from the project used to + create the credentials. + rapt_token (Optional[str]): The reauth Proof Token. + refresh_handler (Optional[Callable[[google.auth.transport.Request, Sequence[str]], [str, datetime]]]): + A callable which takes in the HTTP request callable and the list of + OAuth scopes and when called returns an access token string for the + requested scopes and its expiry datetime. This is useful when no + refresh tokens are provided and tokens are obtained by calling + some external process on demand. It is particularly useful for + retrieving downscoped tokens from a token broker. + enable_reauth_refresh (Optional[bool]): Whether reauth refresh flow + should be used. This flag is for gcloud to use only. + granted_scopes (Optional[Sequence[str]]): The scopes that were consented/granted by the user. + This could be different from the requested scopes and it could be empty if granted + and requested scopes were same. + trust_boundary (str): String representation of trust boundary meta. + universe_domain (Optional[str]): The universe domain. The default + universe domain is googleapis.com. + account (Optional[str]): The account associated with the credential. + """ + super(Credentials, self).__init__() + self.token = token + self.expiry = expiry + self._refresh_token = refresh_token + self._id_token = id_token + self._scopes = scopes + self._default_scopes = default_scopes + self._granted_scopes = granted_scopes + self._token_uri = token_uri + self._client_id = client_id + self._client_secret = client_secret + self._quota_project_id = quota_project_id + self._rapt_token = rapt_token + self.refresh_handler = refresh_handler + self._enable_reauth_refresh = enable_reauth_refresh + self._trust_boundary = trust_boundary + self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN + self._account = account or "" + self._cred_file_path = None + + def __getstate__(self): + """A __getstate__ method must exist for the __setstate__ to be called + This is identical to the default implementation. + See https://docs.python.org/3.7/library/pickle.html#object.__setstate__ + """ + state_dict = self.__dict__.copy() + # Remove _refresh_handler function as there are limitations pickling and + # unpickling certain callables (lambda, functools.partial instances) + # because they need to be importable. + # Instead, the refresh_handler setter should be used to repopulate this. + if "_refresh_handler" in state_dict: + del state_dict["_refresh_handler"] + + if "_refresh_worker" in state_dict: + del state_dict["_refresh_worker"] + return state_dict + + def __setstate__(self, d): + """Credentials pickled with older versions of the class do not have + all the attributes.""" + self.token = d.get("token") + self.expiry = d.get("expiry") + self._refresh_token = d.get("_refresh_token") + self._id_token = d.get("_id_token") + self._scopes = d.get("_scopes") + self._default_scopes = d.get("_default_scopes") + self._granted_scopes = d.get("_granted_scopes") + self._token_uri = d.get("_token_uri") + self._client_id = d.get("_client_id") + self._client_secret = d.get("_client_secret") + self._quota_project_id = d.get("_quota_project_id") + self._rapt_token = d.get("_rapt_token") + self._enable_reauth_refresh = d.get("_enable_reauth_refresh") + self._trust_boundary = d.get("_trust_boundary") + self._universe_domain = ( + d.get("_universe_domain") or credentials.DEFAULT_UNIVERSE_DOMAIN + ) + self._cred_file_path = d.get("_cred_file_path") + # The refresh_handler setter should be used to repopulate this. + self._refresh_handler = None + self._refresh_worker = None + self._use_non_blocking_refresh = d.get("_use_non_blocking_refresh", False) + self._account = d.get("_account", "") + + @property + def refresh_token(self): + """Optional[str]: The OAuth 2.0 refresh token.""" + return self._refresh_token + + @property + def scopes(self): + """Optional[str]: The OAuth 2.0 permission scopes.""" + return self._scopes + + @property + def granted_scopes(self): + """Optional[Sequence[str]]: The OAuth 2.0 permission scopes that were granted by the user.""" + return self._granted_scopes + + @property + def token_uri(self): + """Optional[str]: The OAuth 2.0 authorization server's token endpoint + URI.""" + return self._token_uri + + @property + def id_token(self): + """Optional[str]: The Open ID Connect ID Token. + + Depending on the authorization server and the scopes requested, this + may be populated when credentials are obtained and updated when + :meth:`refresh` is called. This token is a JWT. It can be verified + and decoded using :func:`google.oauth2.id_token.verify_oauth2_token`. + """ + return self._id_token + + @property + def client_id(self): + """Optional[str]: The OAuth 2.0 client ID.""" + return self._client_id + + @property + def client_secret(self): + """Optional[str]: The OAuth 2.0 client secret.""" + return self._client_secret + + @property + def requires_scopes(self): + """False: OAuth 2.0 credentials have their scopes set when + the initial token is requested and can not be changed.""" + return False + + @property + def rapt_token(self): + """Optional[str]: The reauth Proof Token.""" + return self._rapt_token + + @property + def refresh_handler(self): + """Returns the refresh handler if available. + + Returns: + Optional[Callable[[google.auth.transport.Request, Sequence[str]], [str, datetime]]]: + The current refresh handler. + """ + return self._refresh_handler + + @refresh_handler.setter + def refresh_handler(self, value): + """Updates the current refresh handler. + + Args: + value (Optional[Callable[[google.auth.transport.Request, Sequence[str]], [str, datetime]]]): + The updated value of the refresh handler. + + Raises: + TypeError: If the value is not a callable or None. + """ + if not callable(value) and value is not None: + raise TypeError("The provided refresh_handler is not a callable or None.") + self._refresh_handler = value + + @property + def account(self): + """str: The user account associated with the credential. If the account is unknown an empty string is returned.""" + return self._account + + def _make_copy(self): + cred = self.__class__( + self.token, + refresh_token=self.refresh_token, + id_token=self.id_token, + token_uri=self.token_uri, + client_id=self.client_id, + client_secret=self.client_secret, + scopes=self.scopes, + default_scopes=self.default_scopes, + granted_scopes=self.granted_scopes, + quota_project_id=self.quota_project_id, + rapt_token=self.rapt_token, + enable_reauth_refresh=self._enable_reauth_refresh, + trust_boundary=self._trust_boundary, + universe_domain=self._universe_domain, + account=self._account, + ) + cred._cred_file_path = self._cred_file_path + return cred + + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + if self._cred_file_path: + cred_info = { + "credential_source": self._cred_file_path, + "credential_type": "user credentials", + } + if self.account: + cred_info["principal"] = self.account + return cred_info + return None + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + cred = self._make_copy() + cred._quota_project_id = quota_project_id + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) + def with_token_uri(self, token_uri): + cred = self._make_copy() + cred._token_uri = token_uri + return cred + + def with_account(self, account): + """Returns a copy of these credentials with a modified account. + + Args: + account (str): The account to set + + Returns: + google.oauth2.credentials.Credentials: A new credentials instance. + """ + cred = self._make_copy() + cred._account = account + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) + def with_universe_domain(self, universe_domain): + cred = self._make_copy() + cred._universe_domain = universe_domain + return cred + + def _metric_header_for_usage(self): + return metrics.CRED_TYPE_USER + + @_helpers.copy_docstring(credentials.Credentials) + def refresh(self, request): + if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: + raise exceptions.RefreshError( + "User credential refresh is only supported in the default " + "googleapis.com universe domain, but the current universe " + "domain is {}. If you created the credential with an access " + "token, it's likely that the provided token is expired now, " + "please update your code with a valid token.".format( + self._universe_domain + ) + ) + + scopes = self._scopes if self._scopes is not None else self._default_scopes + # Use refresh handler if available and no refresh token is + # available. This is useful in general when tokens are obtained by calling + # some external process on demand. It is particularly useful for retrieving + # downscoped tokens from a token broker. + if self._refresh_token is None and self.refresh_handler: + token, expiry = self.refresh_handler(request, scopes=scopes) + # Validate returned data. + if not isinstance(token, str): + raise exceptions.RefreshError( + "The refresh_handler returned token is not a string." + ) + if not isinstance(expiry, datetime): + raise exceptions.RefreshError( + "The refresh_handler returned expiry is not a datetime object." + ) + if _helpers.utcnow() >= expiry - _helpers.REFRESH_THRESHOLD: + raise exceptions.RefreshError( + "The credentials returned by the refresh_handler are " + "already expired." + ) + self.token = token + self.expiry = expiry + return + + if ( + self._refresh_token is None + or self._token_uri is None + or self._client_id is None + or self._client_secret is None + ): + raise exceptions.RefreshError( + "The credentials do not contain the necessary fields need to " + "refresh the access token. You must specify refresh_token, " + "token_uri, client_id, and client_secret." + ) + + ( + access_token, + refresh_token, + expiry, + grant_response, + rapt_token, + ) = reauth.refresh_grant( + request, + self._token_uri, + self._refresh_token, + self._client_id, + self._client_secret, + scopes=scopes, + rapt_token=self._rapt_token, + enable_reauth_refresh=self._enable_reauth_refresh, + ) + + self.token = access_token + self.expiry = expiry + self._refresh_token = refresh_token + self._id_token = grant_response.get("id_token") + self._rapt_token = rapt_token + + if scopes and "scope" in grant_response: + requested_scopes = frozenset(scopes) + self._granted_scopes = grant_response["scope"].split() + granted_scopes = frozenset(self._granted_scopes) + scopes_requested_but_not_granted = requested_scopes - granted_scopes + if scopes_requested_but_not_granted: + # User might be presented with unbundled scopes at the time of + # consent. So it is a valid scenario to not have all the requested + # scopes as part of granted scopes but log a warning in case the + # developer wants to debug the scenario. + _LOGGER.warning( + "Not all requested scopes were granted by the " + "authorization server, missing scopes {}.".format( + ", ".join(scopes_requested_but_not_granted) + ) + ) + + @classmethod + def from_authorized_user_info(cls, info, scopes=None): + """Creates a Credentials instance from parsed authorized user info. + + Args: + info (Mapping[str, str]): The authorized user info in Google + format. + scopes (Sequence[str]): Optional list of scopes to include in the + credentials. + + Returns: + google.oauth2.credentials.Credentials: The constructed + credentials. + + Raises: + ValueError: If the info is not in the expected format. + """ + keys_needed = set(("refresh_token", "client_id", "client_secret")) + missing = keys_needed.difference(info.keys()) + + if missing: + raise ValueError( + "Authorized user info was not in the expected format, missing " + "fields {}.".format(", ".join(missing)) + ) + + # access token expiry (datetime obj); auto-expire if not saved + expiry = info.get("expiry") + if expiry: + expiry = datetime.strptime( + expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S" + ) + else: + expiry = _helpers.utcnow() - _helpers.REFRESH_THRESHOLD + + # process scopes, which needs to be a seq + if scopes is None and "scopes" in info: + scopes = info.get("scopes") + if isinstance(scopes, str): + scopes = scopes.split(" ") + + return cls( + token=info.get("token"), + refresh_token=info.get("refresh_token"), + token_uri=_GOOGLE_OAUTH2_TOKEN_ENDPOINT, # always overrides + scopes=scopes, + client_id=info.get("client_id"), + client_secret=info.get("client_secret"), + quota_project_id=info.get("quota_project_id"), # may not exist + expiry=expiry, + rapt_token=info.get("rapt_token"), # may not exist + trust_boundary=info.get("trust_boundary"), # may not exist + universe_domain=info.get("universe_domain"), # may not exist + account=info.get("account", ""), # may not exist + ) + + @classmethod + def from_authorized_user_file(cls, filename, scopes=None): + """Creates a Credentials instance from an authorized user json file. + + Args: + filename (str): The path to the authorized user json file. + scopes (Sequence[str]): Optional list of scopes to include in the + credentials. + + Returns: + google.oauth2.credentials.Credentials: The constructed + credentials. + + Raises: + ValueError: If the file is not in the expected format. + """ + with io.open(filename, "r", encoding="utf-8") as json_file: + data = json.load(json_file) + return cls.from_authorized_user_info(data, scopes) + + def to_json(self, strip=None): + """Utility function that creates a JSON representation of a Credentials + object. + + Args: + strip (Sequence[str]): Optional list of members to exclude from the + generated JSON. + + Returns: + str: A JSON representation of this instance. When converted into + a dictionary, it can be passed to from_authorized_user_info() + to create a new credential instance. + """ + prep = { + "token": self.token, + "refresh_token": self.refresh_token, + "token_uri": self.token_uri, + "client_id": self.client_id, + "client_secret": self.client_secret, + "scopes": self.scopes, + "rapt_token": self.rapt_token, + "universe_domain": self._universe_domain, + "account": self._account, + } + if self.expiry: # flatten expiry timestamp + prep["expiry"] = self.expiry.isoformat() + "Z" + + # Remove empty entries (those which are None) + prep = {k: v for k, v in prep.items() if v is not None} + + # Remove entries that explicitely need to be removed + if strip is not None: + prep = {k: v for k, v in prep.items() if k not in strip} + + return json.dumps(prep) + + +class UserAccessTokenCredentials(credentials.CredentialsWithQuotaProject): + """Access token credentials for user account. + + Obtain the access token for a given user account or the current active + user account with the ``gcloud auth print-access-token`` command. + + Args: + account (Optional[str]): Account to get the access token for. If not + specified, the current active account will be used. + quota_project_id (Optional[str]): The project ID used for quota + and billing. + """ + + def __init__(self, account=None, quota_project_id=None): + warnings.warn( + "UserAccessTokenCredentials is deprecated, please use " + "google.oauth2.credentials.Credentials instead. To use " + "that credential type, simply run " + "`gcloud auth application-default login` and let the " + "client libraries pick up the application default credentials." + ) + super(UserAccessTokenCredentials, self).__init__() + self._account = account + self._quota_project_id = quota_project_id + + def with_account(self, account): + """Create a new instance with the given account. + + Args: + account (str): Account to get the access token for. + + Returns: + google.oauth2.credentials.UserAccessTokenCredentials: The created + credentials with the given account. + """ + return self.__class__(account=account, quota_project_id=self._quota_project_id) + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + return self.__class__(account=self._account, quota_project_id=quota_project_id) + + def refresh(self, request): + """Refreshes the access token. + + Args: + request (google.auth.transport.Request): This argument is required + by the base class interface but not used in this implementation, + so just set it to `None`. + + Raises: + google.auth.exceptions.UserAccessTokenError: If the access token + refresh failed. + """ + self.token = _cloud_sdk.get_auth_access_token(self._account) + + @_helpers.copy_docstring(credentials.Credentials) + def before_request(self, request, method, url, headers): + self.refresh(request) + self.apply(headers) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py b/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py new file mode 100644 index 00000000..7410cfc2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py @@ -0,0 +1,251 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Experimental GDCH credentials support. +""" + +import datetime + +from google.auth import _helpers +from google.auth import _service_account_info +from google.auth import credentials +from google.auth import exceptions +from google.auth import jwt +from google.oauth2 import _client + + +TOKEN_EXCHANGE_TYPE = "urn:ietf:params:oauth:token-type:token-exchange" +ACCESS_TOKEN_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" +SERVICE_ACCOUNT_TOKEN_TYPE = "urn:k8s:params:oauth:token-type:serviceaccount" +JWT_LIFETIME = datetime.timedelta(seconds=3600) # 1 hour + + +class ServiceAccountCredentials(credentials.Credentials): + """Credentials for GDCH (`Google Distributed Cloud Hosted`_) for service + account users. + + .. _Google Distributed Cloud Hosted: + https://cloud.google.com/blog/topics/hybrid-cloud/\ + announcing-google-distributed-cloud-edge-and-hosted + + To create a GDCH service account credential, first create a JSON file of + the following format:: + + { + "type": "gdch_service_account", + "format_version": "1", + "project": "<project name>", + "private_key_id": "<key id>", + "private_key": "-----BEGIN EC PRIVATE KEY-----\n<key bytes>\n-----END EC PRIVATE KEY-----\n", + "name": "<service identity name>", + "ca_cert_path": "<CA cert path>", + "token_uri": "https://service-identity.<Domain>/authenticate" + } + + The "format_version" field stands for the format of the JSON file. For now + it is always "1". The `private_key_id` and `private_key` is used for signing. + The `ca_cert_path` is used for token server TLS certificate verification. + + After the JSON file is created, set `GOOGLE_APPLICATION_CREDENTIALS` environment + variable to the JSON file path, then use the following code to create the + credential:: + + import google.auth + + credential, _ = google.auth.default() + credential = credential.with_gdch_audience("<the audience>") + + We can also create the credential directly:: + + from google.oauth import gdch_credentials + + credential = gdch_credentials.ServiceAccountCredentials.from_service_account_file("<the json file path>") + credential = credential.with_gdch_audience("<the audience>") + + The token is obtained in the following way. This class first creates a + self signed JWT. It uses the `name` value as the `iss` and `sub` claim, and + the `token_uri` as the `aud` claim, and signs the JWT with the `private_key`. + It then sends the JWT to the `token_uri` to exchange a final token for + `audience`. + """ + + def __init__( + self, signer, service_identity_name, project, audience, token_uri, ca_cert_path + ): + """ + Args: + signer (google.auth.crypt.Signer): The signer used to sign JWTs. + service_identity_name (str): The service identity name. It will be + used as the `iss` and `sub` claim in the self signed JWT. + project (str): The project. + audience (str): The audience for the final token. + token_uri (str): The token server uri. + ca_cert_path (str): The CA cert path for token server side TLS + certificate verification. If the token server uses well known + CA, then this parameter can be `None`. + """ + super(ServiceAccountCredentials, self).__init__() + self._signer = signer + self._service_identity_name = service_identity_name + self._project = project + self._audience = audience + self._token_uri = token_uri + self._ca_cert_path = ca_cert_path + + def _create_jwt(self): + now = _helpers.utcnow() + expiry = now + JWT_LIFETIME + iss_sub_value = "system:serviceaccount:{}:{}".format( + self._project, self._service_identity_name + ) + + payload = { + "iss": iss_sub_value, + "sub": iss_sub_value, + "aud": self._token_uri, + "iat": _helpers.datetime_to_secs(now), + "exp": _helpers.datetime_to_secs(expiry), + } + + return _helpers.from_bytes(jwt.encode(self._signer, payload)) + + @_helpers.copy_docstring(credentials.Credentials) + def refresh(self, request): + import google.auth.transport.requests + + if not isinstance(request, google.auth.transport.requests.Request): + raise exceptions.RefreshError( + "For GDCH service account credentials, request must be a google.auth.transport.requests.Request object" + ) + + # Create a self signed JWT, and do token exchange. + jwt_token = self._create_jwt() + request_body = { + "grant_type": TOKEN_EXCHANGE_TYPE, + "audience": self._audience, + "requested_token_type": ACCESS_TOKEN_TOKEN_TYPE, + "subject_token": jwt_token, + "subject_token_type": SERVICE_ACCOUNT_TOKEN_TYPE, + } + response_data = _client._token_endpoint_request( + request, + self._token_uri, + request_body, + access_token=None, + use_json=True, + verify=self._ca_cert_path, + ) + + self.token, _, self.expiry, _ = _client._handle_refresh_grant_response( + response_data, None + ) + + def with_gdch_audience(self, audience): + """Create a copy of GDCH credentials with the specified audience. + + Args: + audience (str): The intended audience for GDCH credentials. + """ + return self.__class__( + self._signer, + self._service_identity_name, + self._project, + audience, + self._token_uri, + self._ca_cert_path, + ) + + @classmethod + def _from_signer_and_info(cls, signer, info): + """Creates a Credentials instance from a signer and service account + info. + + Args: + signer (google.auth.crypt.Signer): The signer used to sign JWTs. + info (Mapping[str, str]): The service account info. + + Returns: + google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed + credentials. + + Raises: + ValueError: If the info is not in the expected format. + """ + if info["format_version"] != "1": + raise ValueError("Only format version 1 is supported") + + return cls( + signer, + info["name"], # service_identity_name + info["project"], + None, # audience + info["token_uri"], + info.get("ca_cert_path", None), + ) + + @classmethod + def from_service_account_info(cls, info): + """Creates a Credentials instance from parsed service account info. + + Args: + info (Mapping[str, str]): The service account info in Google + format. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed + credentials. + + Raises: + ValueError: If the info is not in the expected format. + """ + signer = _service_account_info.from_dict( + info, + require=[ + "format_version", + "private_key_id", + "private_key", + "name", + "project", + "token_uri", + ], + use_rsa_signer=False, + ) + return cls._from_signer_and_info(signer, info) + + @classmethod + def from_service_account_file(cls, filename): + """Creates a Credentials instance from a service account json file. + + Args: + filename (str): The path to the service account json file. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed + credentials. + """ + info, signer = _service_account_info.from_filename( + filename, + require=[ + "format_version", + "private_key_id", + "private_key", + "name", + "project", + "token_uri", + ], + use_rsa_signer=False, + ) + return cls._from_signer_and_info(signer, info) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/id_token.py b/.venv/lib/python3.12/site-packages/google/oauth2/id_token.py new file mode 100644 index 00000000..b68ab6b3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/id_token.py @@ -0,0 +1,358 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Google ID Token helpers. + +Provides support for verifying `OpenID Connect ID Tokens`_, especially ones +generated by Google infrastructure. + +To parse and verify an ID Token issued by Google's OAuth 2.0 authorization +server use :func:`verify_oauth2_token`. To verify an ID Token issued by +Firebase, use :func:`verify_firebase_token`. + +A general purpose ID Token verifier is available as :func:`verify_token`. + +Example:: + + from google.oauth2 import id_token + from google.auth.transport import requests + + request = requests.Request() + + id_info = id_token.verify_oauth2_token( + token, request, 'my-client-id.example.com') + + userid = id_info['sub'] + +By default, this will re-fetch certificates for each verification. Because +Google's public keys are only changed infrequently (on the order of once per +day), you may wish to take advantage of caching to reduce latency and the +potential for network errors. This can be accomplished using an external +library like `CacheControl`_ to create a cache-aware +:class:`google.auth.transport.Request`:: + + import cachecontrol + import google.auth.transport.requests + import requests + + session = requests.session() + cached_session = cachecontrol.CacheControl(session) + request = google.auth.transport.requests.Request(session=cached_session) + +.. _OpenID Connect ID Tokens: + http://openid.net/specs/openid-connect-core-1_0.html#IDToken +.. _CacheControl: https://cachecontrol.readthedocs.io +""" + +import http.client as http_client +import json +import os + +from google.auth import environment_vars +from google.auth import exceptions +from google.auth import jwt + + +# The URL that provides public certificates for verifying ID tokens issued +# by Google's OAuth 2.0 authorization server. +_GOOGLE_OAUTH2_CERTS_URL = "https://www.googleapis.com/oauth2/v1/certs" + +# The URL that provides public certificates for verifying ID tokens issued +# by Firebase and the Google APIs infrastructure +_GOOGLE_APIS_CERTS_URL = ( + "https://www.googleapis.com/robot/v1/metadata/x509" + "/securetoken@system.gserviceaccount.com" +) + +_GOOGLE_ISSUERS = ["accounts.google.com", "https://accounts.google.com"] + + +def _fetch_certs(request, certs_url): + """Fetches certificates. + + Google-style cerificate endpoints return JSON in the format of + ``{'key id': 'x509 certificate'}`` or a certificate array according + to the JWK spec (see https://tools.ietf.org/html/rfc7517). + + Args: + request (google.auth.transport.Request): The object used to make + HTTP requests. + certs_url (str): The certificate endpoint URL. + + Returns: + Mapping[str, str] | Mapping[str, list]: A mapping of public keys + in x.509 or JWK spec. + """ + response = request(certs_url, method="GET") + + if response.status != http_client.OK: + raise exceptions.TransportError( + "Could not fetch certificates at {}".format(certs_url) + ) + + return json.loads(response.data.decode("utf-8")) + + +def verify_token( + id_token, + request, + audience=None, + certs_url=_GOOGLE_OAUTH2_CERTS_URL, + clock_skew_in_seconds=0, +): + """Verifies an ID token and returns the decoded token. + + Args: + id_token (Union[str, bytes]): The encoded token. + request (google.auth.transport.Request): The object used to make + HTTP requests. + audience (str or list): The audience or audiences that this token is + intended for. If None then the audience is not verified. + certs_url (str): The URL that specifies the certificates to use to + verify the token. This URL should return JSON in the format of + ``{'key id': 'x509 certificate'}`` or a certificate array according to + the JWK spec (see https://tools.ietf.org/html/rfc7517). + clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` + validation. + + Returns: + Mapping[str, Any]: The decoded token. + """ + certs = _fetch_certs(request, certs_url) + + if "keys" in certs: + try: + import jwt as jwt_lib # type: ignore + except ImportError as caught_exc: # pragma: NO COVER + raise ImportError( + "The pyjwt library is not installed, please install the pyjwt package to use the jwk certs format." + ) from caught_exc + jwks_client = jwt_lib.PyJWKClient(certs_url) + signing_key = jwks_client.get_signing_key_from_jwt(id_token) + return jwt_lib.decode( + id_token, + signing_key.key, + algorithms=[signing_key.algorithm_name], + audience=audience, + ) + else: + return jwt.decode( + id_token, + certs=certs, + audience=audience, + clock_skew_in_seconds=clock_skew_in_seconds, + ) + + +def verify_oauth2_token(id_token, request, audience=None, clock_skew_in_seconds=0): + """Verifies an ID Token issued by Google's OAuth 2.0 authorization server. + + Args: + id_token (Union[str, bytes]): The encoded token. + request (google.auth.transport.Request): The object used to make + HTTP requests. + audience (str): The audience that this token is intended for. This is + typically your application's OAuth 2.0 client ID. If None then the + audience is not verified. + clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` + validation. + + Returns: + Mapping[str, Any]: The decoded token. + + Raises: + exceptions.GoogleAuthError: If the issuer is invalid. + ValueError: If token verification fails + """ + idinfo = verify_token( + id_token, + request, + audience=audience, + certs_url=_GOOGLE_OAUTH2_CERTS_URL, + clock_skew_in_seconds=clock_skew_in_seconds, + ) + + if idinfo["iss"] not in _GOOGLE_ISSUERS: + raise exceptions.GoogleAuthError( + "Wrong issuer. 'iss' should be one of the following: {}".format( + _GOOGLE_ISSUERS + ) + ) + + return idinfo + + +def verify_firebase_token(id_token, request, audience=None, clock_skew_in_seconds=0): + """Verifies an ID Token issued by Firebase Authentication. + + Args: + id_token (Union[str, bytes]): The encoded token. + request (google.auth.transport.Request): The object used to make + HTTP requests. + audience (str): The audience that this token is intended for. This is + typically your Firebase application ID. If None then the audience + is not verified. + clock_skew_in_seconds (int): The clock skew used for `iat` and `exp` + validation. + + Returns: + Mapping[str, Any]: The decoded token. + """ + return verify_token( + id_token, + request, + audience=audience, + certs_url=_GOOGLE_APIS_CERTS_URL, + clock_skew_in_seconds=clock_skew_in_seconds, + ) + + +def fetch_id_token_credentials(audience, request=None): + """Create the ID Token credentials from the current environment. + + This function acquires ID token from the environment in the following order. + See https://google.aip.dev/auth/4110. + + 1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set + to the path of a valid service account JSON file, then ID token is + acquired using this service account credentials. + 2. If the application is running in Compute Engine, App Engine or Cloud Run, + then the ID token are obtained from the metadata server. + 3. If metadata server doesn't exist and no valid service account credentials + are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will + be raised. + + Example:: + + import google.oauth2.id_token + import google.auth.transport.requests + + request = google.auth.transport.requests.Request() + target_audience = "https://pubsub.googleapis.com" + + # Create ID token credentials. + credentials = google.oauth2.id_token.fetch_id_token_credentials(target_audience, request=request) + + # Refresh the credential to obtain an ID token. + credentials.refresh(request) + + id_token = credentials.token + id_token_expiry = credentials.expiry + + Args: + audience (str): The audience that this ID token is intended for. + request (Optional[google.auth.transport.Request]): A callable used to make + HTTP requests. A request object will be created if not provided. + + Returns: + google.auth.credentials.Credentials: The ID token credentials. + + Raises: + ~google.auth.exceptions.DefaultCredentialsError: + If metadata server doesn't exist and no valid service account + credentials are found. + """ + # 1. Try to get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment + # variable. + credentials_filename = os.environ.get(environment_vars.CREDENTIALS) + if credentials_filename: + if not ( + os.path.exists(credentials_filename) + and os.path.isfile(credentials_filename) + ): + raise exceptions.DefaultCredentialsError( + "GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid." + ) + + try: + with open(credentials_filename, "r") as f: + from google.oauth2 import service_account + + info = json.load(f) + if info.get("type") == "service_account": + return service_account.IDTokenCredentials.from_service_account_info( + info, target_audience=audience + ) + except ValueError as caught_exc: + new_exc = exceptions.DefaultCredentialsError( + "GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.", + caught_exc, + ) + raise new_exc from caught_exc + + # 2. Try to fetch ID token from metada server if it exists. The code + # works for GAE and Cloud Run metadata server as well. + try: + from google.auth import compute_engine + from google.auth.compute_engine import _metadata + + # Create a request object if not provided. + if not request: + import google.auth.transport.requests + + request = google.auth.transport.requests.Request() + + if _metadata.ping(request): + return compute_engine.IDTokenCredentials( + request, audience, use_metadata_identity_endpoint=True + ) + except (ImportError, exceptions.TransportError): + pass + + raise exceptions.DefaultCredentialsError( + "Neither metadata server or valid service account credentials are found." + ) + + +def fetch_id_token(request, audience): + """Fetch the ID Token from the current environment. + + This function acquires ID token from the environment in the following order. + See https://google.aip.dev/auth/4110. + + 1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set + to the path of a valid service account JSON file, then ID token is + acquired using this service account credentials. + 2. If the application is running in Compute Engine, App Engine or Cloud Run, + then the ID token are obtained from the metadata server. + 3. If metadata server doesn't exist and no valid service account credentials + are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will + be raised. + + Example:: + + import google.oauth2.id_token + import google.auth.transport.requests + + request = google.auth.transport.requests.Request() + target_audience = "https://pubsub.googleapis.com" + + id_token = google.oauth2.id_token.fetch_id_token(request, target_audience) + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + audience (str): The audience that this ID token is intended for. + + Returns: + str: The ID token. + + Raises: + ~google.auth.exceptions.DefaultCredentialsError: + If metadata server doesn't exist and no valid service account + credentials are found. + """ + id_token_credentials = fetch_id_token_credentials(audience, request=request) + id_token_credentials.refresh(request) + return id_token_credentials.token diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/py.typed b/.venv/lib/python3.12/site-packages/google/oauth2/py.typed new file mode 100644 index 00000000..d82ed62c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/py.typed @@ -0,0 +1,2 @@ +# Marker file for PEP 561.
+# The google-oauth2 package uses inline types.
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/reauth.py b/.venv/lib/python3.12/site-packages/google/oauth2/reauth.py new file mode 100644 index 00000000..1e39e0bc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/reauth.py @@ -0,0 +1,369 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A module that provides functions for handling rapt authentication. + +Reauth is a process of obtaining additional authentication (such as password, +security token, etc.) while refreshing OAuth 2.0 credentials for a user. + +Credentials that use the Reauth flow must have the reauth scope, +``https://www.googleapis.com/auth/accounts.reauth``. + +This module provides a high-level function for executing the Reauth process, +:func:`refresh_grant`, and lower-level helpers for doing the individual +steps of the reauth process. + +Those steps are: + +1. Obtaining a list of challenges from the reauth server. +2. Running through each challenge and sending the result back to the reauth + server. +3. Refreshing the access token using the returned rapt token. +""" + +import sys + +from google.auth import exceptions +from google.auth import metrics +from google.oauth2 import _client +from google.oauth2 import challenges + + +_REAUTH_SCOPE = "https://www.googleapis.com/auth/accounts.reauth" +_REAUTH_API = "https://reauth.googleapis.com/v2/sessions" + +_REAUTH_NEEDED_ERROR = "invalid_grant" +_REAUTH_NEEDED_ERROR_INVALID_RAPT = "invalid_rapt" +_REAUTH_NEEDED_ERROR_RAPT_REQUIRED = "rapt_required" + +_AUTHENTICATED = "AUTHENTICATED" +_CHALLENGE_REQUIRED = "CHALLENGE_REQUIRED" +_CHALLENGE_PENDING = "CHALLENGE_PENDING" + + +# Override this global variable to set custom max number of rounds of reauth +# challenges should be run. +RUN_CHALLENGE_RETRY_LIMIT = 5 + + +def is_interactive(): + """Check if we are in an interractive environment. + + Override this function with a different logic if you are using this library + outside a CLI. + + If the rapt token needs refreshing, the user needs to answer the challenges. + If the user is not in an interractive environment, the challenges can not + be answered and we just wait for timeout for no reason. + + Returns: + bool: True if is interactive environment, False otherwise. + """ + + return sys.stdin.isatty() + + +def _get_challenges( + request, supported_challenge_types, access_token, requested_scopes=None +): + """Does initial request to reauth API to get the challenges. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + supported_challenge_types (Sequence[str]): list of challenge names + supported by the manager. + access_token (str): Access token with reauth scopes. + requested_scopes (Optional(Sequence[str])): Authorized scopes for the credentials. + + Returns: + dict: The response from the reauth API. + """ + body = {"supportedChallengeTypes": supported_challenge_types} + if requested_scopes: + body["oauthScopesForDomainPolicyLookup"] = requested_scopes + metrics_header = {metrics.API_CLIENT_HEADER: metrics.reauth_start()} + + return _client._token_endpoint_request( + request, + _REAUTH_API + ":start", + body, + access_token=access_token, + use_json=True, + headers=metrics_header, + ) + + +def _send_challenge_result( + request, session_id, challenge_id, client_input, access_token +): + """Attempt to refresh access token by sending next challenge result. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + session_id (str): session id returned by the initial reauth call. + challenge_id (str): challenge id returned by the initial reauth call. + client_input: dict with a challenge-specific client input. For example: + ``{'credential': password}`` for password challenge. + access_token (str): Access token with reauth scopes. + + Returns: + dict: The response from the reauth API. + """ + body = { + "sessionId": session_id, + "challengeId": challenge_id, + "action": "RESPOND", + "proposalResponse": client_input, + } + metrics_header = {metrics.API_CLIENT_HEADER: metrics.reauth_continue()} + + return _client._token_endpoint_request( + request, + _REAUTH_API + "/{}:continue".format(session_id), + body, + access_token=access_token, + use_json=True, + headers=metrics_header, + ) + + +def _run_next_challenge(msg, request, access_token): + """Get the next challenge from msg and run it. + + Args: + msg (dict): Reauth API response body (either from the initial request to + https://reauth.googleapis.com/v2/sessions:start or from sending the + previous challenge response to + https://reauth.googleapis.com/v2/sessions/id:continue) + request (google.auth.transport.Request): A callable used to make + HTTP requests. + access_token (str): reauth access token + + Returns: + dict: The response from the reauth API. + + Raises: + google.auth.exceptions.ReauthError: if reauth failed. + """ + for challenge in msg["challenges"]: + if challenge["status"] != "READY": + # Skip non-activated challenges. + continue + c = challenges.AVAILABLE_CHALLENGES.get(challenge["challengeType"], None) + if not c: + raise exceptions.ReauthFailError( + "Unsupported challenge type {0}. Supported types: {1}".format( + challenge["challengeType"], + ",".join(list(challenges.AVAILABLE_CHALLENGES.keys())), + ) + ) + if not c.is_locally_eligible: + raise exceptions.ReauthFailError( + "Challenge {0} is not locally eligible".format( + challenge["challengeType"] + ) + ) + client_input = c.obtain_challenge_input(challenge) + if not client_input: + return None + return _send_challenge_result( + request, + msg["sessionId"], + challenge["challengeId"], + client_input, + access_token, + ) + return None + + +def _obtain_rapt(request, access_token, requested_scopes): + """Given an http request method and reauth access token, get rapt token. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + access_token (str): reauth access token + requested_scopes (Sequence[str]): scopes required by the client application + + Returns: + str: The rapt token. + + Raises: + google.auth.exceptions.ReauthError: if reauth failed + """ + msg = _get_challenges( + request, + list(challenges.AVAILABLE_CHALLENGES.keys()), + access_token, + requested_scopes, + ) + + if msg["status"] == _AUTHENTICATED: + return msg["encodedProofOfReauthToken"] + + for _ in range(0, RUN_CHALLENGE_RETRY_LIMIT): + if not ( + msg["status"] == _CHALLENGE_REQUIRED or msg["status"] == _CHALLENGE_PENDING + ): + raise exceptions.ReauthFailError( + "Reauthentication challenge failed due to API error: {}".format( + msg["status"] + ) + ) + + if not is_interactive(): + raise exceptions.ReauthFailError( + "Reauthentication challenge could not be answered because you are not" + " in an interactive session." + ) + + msg = _run_next_challenge(msg, request, access_token) + + if not msg: + raise exceptions.ReauthFailError("Failed to obtain rapt token.") + if msg["status"] == _AUTHENTICATED: + return msg["encodedProofOfReauthToken"] + + # If we got here it means we didn't get authenticated. + raise exceptions.ReauthFailError("Failed to obtain rapt token.") + + +def get_rapt_token( + request, client_id, client_secret, refresh_token, token_uri, scopes=None +): + """Given an http request method and refresh_token, get rapt token. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + client_id (str): client id to get access token for reauth scope. + client_secret (str): client secret for the client_id + refresh_token (str): refresh token to refresh access token + token_uri (str): uri to refresh access token + scopes (Optional(Sequence[str])): scopes required by the client application + + Returns: + str: The rapt token. + Raises: + google.auth.exceptions.RefreshError: If reauth failed. + """ + sys.stderr.write("Reauthentication required.\n") + + # Get access token for reauth. + access_token, _, _, _ = _client.refresh_grant( + request=request, + client_id=client_id, + client_secret=client_secret, + refresh_token=refresh_token, + token_uri=token_uri, + scopes=[_REAUTH_SCOPE], + ) + + # Get rapt token from reauth API. + rapt_token = _obtain_rapt(request, access_token, requested_scopes=scopes) + sys.stderr.write("Reauthentication successful.\n") + + return rapt_token + + +def refresh_grant( + request, + token_uri, + refresh_token, + client_id, + client_secret, + scopes=None, + rapt_token=None, + enable_reauth_refresh=False, +): + """Implements the reauthentication flow. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + token_uri (str): The OAuth 2.0 authorizations server's token endpoint + URI. + refresh_token (str): The refresh token to use to get a new access + token. + client_id (str): The OAuth 2.0 application's client ID. + client_secret (str): The Oauth 2.0 appliaction's client secret. + scopes (Optional(Sequence[str])): Scopes to request. If present, all + scopes must be authorized for the refresh token. Useful if refresh + token has a wild card scope (e.g. + 'https://www.googleapis.com/auth/any-api'). + rapt_token (Optional(str)): The rapt token for reauth. + enable_reauth_refresh (Optional[bool]): Whether reauth refresh flow + should be used. The default value is False. This option is for + gcloud only, other users should use the default value. + + Returns: + Tuple[str, Optional[str], Optional[datetime], Mapping[str, str], str]: The + access token, new refresh token, expiration, the additional data + returned by the token endpoint, and the rapt token. + + Raises: + google.auth.exceptions.RefreshError: If the token endpoint returned + an error. + """ + body = { + "grant_type": _client._REFRESH_GRANT_TYPE, + "client_id": client_id, + "client_secret": client_secret, + "refresh_token": refresh_token, + } + if scopes: + body["scope"] = " ".join(scopes) + if rapt_token: + body["rapt"] = rapt_token + metrics_header = {metrics.API_CLIENT_HEADER: metrics.token_request_user()} + + response_status_ok, response_data, retryable_error = _client._token_endpoint_request_no_throw( + request, token_uri, body, headers=metrics_header + ) + + if not response_status_ok and isinstance(response_data, str): + raise exceptions.RefreshError(response_data, retryable=False) + + if ( + not response_status_ok + and response_data.get("error") == _REAUTH_NEEDED_ERROR + and ( + response_data.get("error_subtype") == _REAUTH_NEEDED_ERROR_INVALID_RAPT + or response_data.get("error_subtype") == _REAUTH_NEEDED_ERROR_RAPT_REQUIRED + ) + ): + if not enable_reauth_refresh: + raise exceptions.RefreshError( + "Reauthentication is needed. Please run `gcloud auth application-default login` to reauthenticate." + ) + + rapt_token = get_rapt_token( + request, client_id, client_secret, refresh_token, token_uri, scopes=scopes + ) + body["rapt"] = rapt_token + ( + response_status_ok, + response_data, + retryable_error, + ) = _client._token_endpoint_request_no_throw( + request, token_uri, body, headers=metrics_header + ) + + if not response_status_ok: + _client._handle_error_response(response_data, retryable_error) + return _client._handle_refresh_grant_response(response_data, refresh_token) + ( + rapt_token, + ) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/service_account.py b/.venv/lib/python3.12/site-packages/google/oauth2/service_account.py new file mode 100644 index 00000000..3e84194a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/service_account.py @@ -0,0 +1,847 @@ +# Copyright 2016 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Service Accounts: JSON Web Token (JWT) Profile for OAuth 2.0 + +This module implements the JWT Profile for OAuth 2.0 Authorization Grants +as defined by `RFC 7523`_ with particular support for how this RFC is +implemented in Google's infrastructure. Google refers to these credentials +as *Service Accounts*. + +Service accounts are used for server-to-server communication, such as +interactions between a web application server and a Google service. The +service account belongs to your application instead of to an individual end +user. In contrast to other OAuth 2.0 profiles, no users are involved and your +application "acts" as the service account. + +Typically an application uses a service account when the application uses +Google APIs to work with its own data rather than a user's data. For example, +an application that uses Google Cloud Datastore for data persistence would use +a service account to authenticate its calls to the Google Cloud Datastore API. +However, an application that needs to access a user's Drive documents would +use the normal OAuth 2.0 profile. + +Additionally, Google Apps domain administrators can grant service accounts +`domain-wide delegation`_ authority to access user data on behalf of users in +the domain. + +This profile uses a JWT to acquire an OAuth 2.0 access token. The JWT is used +in place of the usual authorization token returned during the standard +OAuth 2.0 Authorization Code grant. The JWT is only used for this purpose, as +the acquired access token is used as the bearer token when making requests +using these credentials. + +This profile differs from normal OAuth 2.0 profile because no user consent +step is required. The use of the private key allows this profile to assert +identity directly. + +This profile also differs from the :mod:`google.auth.jwt` authentication +because the JWT credentials use the JWT directly as the bearer token. This +profile instead only uses the JWT to obtain an OAuth 2.0 access token. The +obtained OAuth 2.0 access token is used as the bearer token. + +Domain-wide delegation +---------------------- + +Domain-wide delegation allows a service account to access user data on +behalf of any user in a Google Apps domain without consent from the user. +For example, an application that uses the Google Calendar API to add events to +the calendars of all users in a Google Apps domain would use a service account +to access the Google Calendar API on behalf of users. + +The Google Apps administrator must explicitly authorize the service account to +do this. This authorization step is referred to as "delegating domain-wide +authority" to a service account. + +You can use domain-wise delegation by creating a set of credentials with a +specific subject using :meth:`~Credentials.with_subject`. + +.. _RFC 7523: https://tools.ietf.org/html/rfc7523 +""" + +import copy +import datetime + +from google.auth import _helpers +from google.auth import _service_account_info +from google.auth import credentials +from google.auth import exceptions +from google.auth import iam +from google.auth import jwt +from google.auth import metrics +from google.oauth2 import _client + +_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds +_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token" + + +class Credentials( + credentials.Signing, + credentials.Scoped, + credentials.CredentialsWithQuotaProject, + credentials.CredentialsWithTokenUri, +): + """Service account credentials + + Usually, you'll create these credentials with one of the helper + constructors. To create credentials using a Google service account + private key JSON file:: + + credentials = service_account.Credentials.from_service_account_file( + 'service-account.json') + + Or if you already have the service account file loaded:: + + service_account_info = json.load(open('service_account.json')) + credentials = service_account.Credentials.from_service_account_info( + service_account_info) + + Both helper methods pass on arguments to the constructor, so you can + specify additional scopes and a subject if necessary:: + + credentials = service_account.Credentials.from_service_account_file( + 'service-account.json', + scopes=['email'], + subject='user@example.com') + + The credentials are considered immutable. If you want to modify the scopes + or the subject used for delegation, use :meth:`with_scopes` or + :meth:`with_subject`:: + + scoped_credentials = credentials.with_scopes(['email']) + delegated_credentials = credentials.with_subject(subject) + + To add a quota project, use :meth:`with_quota_project`:: + + credentials = credentials.with_quota_project('myproject-123') + """ + + def __init__( + self, + signer, + service_account_email, + token_uri, + scopes=None, + default_scopes=None, + subject=None, + project_id=None, + quota_project_id=None, + additional_claims=None, + always_use_jwt_access=False, + universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, + trust_boundary=None, + ): + """ + Args: + signer (google.auth.crypt.Signer): The signer used to sign JWTs. + service_account_email (str): The service account's email. + scopes (Sequence[str]): User-defined scopes to request during the + authorization grant. + default_scopes (Sequence[str]): Default scopes passed by a + Google client library. Use 'scopes' for user-defined scopes. + token_uri (str): The OAuth 2.0 Token URI. + subject (str): For domain-wide delegation, the email address of the + user to for which to request delegated access. + project_id (str): Project ID associated with the service account + credential. + quota_project_id (Optional[str]): The project ID used for quota and + billing. + additional_claims (Mapping[str, str]): Any additional claims for + the JWT assertion used in the authorization grant. + always_use_jwt_access (Optional[bool]): Whether self signed JWT should + be always used. + universe_domain (str): The universe domain. The default + universe domain is googleapis.com. For default value self + signed jwt is used for token refresh. + trust_boundary (str): String representation of trust boundary meta. + + .. note:: Typically one of the helper constructors + :meth:`from_service_account_file` or + :meth:`from_service_account_info` are used instead of calling the + constructor directly. + """ + super(Credentials, self).__init__() + + self._cred_file_path = None + self._scopes = scopes + self._default_scopes = default_scopes + self._signer = signer + self._service_account_email = service_account_email + self._subject = subject + self._project_id = project_id + self._quota_project_id = quota_project_id + self._token_uri = token_uri + self._always_use_jwt_access = always_use_jwt_access + self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN + + if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: + self._always_use_jwt_access = True + + self._jwt_credentials = None + + if additional_claims is not None: + self._additional_claims = additional_claims + else: + self._additional_claims = {} + self._trust_boundary = {"locations": [], "encoded_locations": "0x0"} + + @classmethod + def _from_signer_and_info(cls, signer, info, **kwargs): + """Creates a Credentials instance from a signer and service account + info. + + Args: + signer (google.auth.crypt.Signer): The signer used to sign JWTs. + info (Mapping[str, str]): The service account info. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.jwt.Credentials: The constructed credentials. + + Raises: + ValueError: If the info is not in the expected format. + """ + return cls( + signer, + service_account_email=info["client_email"], + token_uri=info["token_uri"], + project_id=info.get("project_id"), + universe_domain=info.get( + "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN + ), + trust_boundary=info.get("trust_boundary"), + **kwargs, + ) + + @classmethod + def from_service_account_info(cls, info, **kwargs): + """Creates a Credentials instance from parsed service account info. + + Args: + info (Mapping[str, str]): The service account info in Google + format. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.service_account.Credentials: The constructed + credentials. + + Raises: + ValueError: If the info is not in the expected format. + """ + signer = _service_account_info.from_dict( + info, require=["client_email", "token_uri"] + ) + return cls._from_signer_and_info(signer, info, **kwargs) + + @classmethod + def from_service_account_file(cls, filename, **kwargs): + """Creates a Credentials instance from a service account json file. + + Args: + filename (str): The path to the service account json file. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.service_account.Credentials: The constructed + credentials. + """ + info, signer = _service_account_info.from_filename( + filename, require=["client_email", "token_uri"] + ) + return cls._from_signer_and_info(signer, info, **kwargs) + + @property + def service_account_email(self): + """The service account email.""" + return self._service_account_email + + @property + def project_id(self): + """Project ID associated with this credential.""" + return self._project_id + + @property + def requires_scopes(self): + """Checks if the credentials requires scopes. + + Returns: + bool: True if there are no scopes set otherwise False. + """ + return True if not self._scopes else False + + def _make_copy(self): + cred = self.__class__( + self._signer, + service_account_email=self._service_account_email, + scopes=copy.copy(self._scopes), + default_scopes=copy.copy(self._default_scopes), + token_uri=self._token_uri, + subject=self._subject, + project_id=self._project_id, + quota_project_id=self._quota_project_id, + additional_claims=self._additional_claims.copy(), + always_use_jwt_access=self._always_use_jwt_access, + universe_domain=self._universe_domain, + ) + cred._cred_file_path = self._cred_file_path + return cred + + @_helpers.copy_docstring(credentials.Scoped) + def with_scopes(self, scopes, default_scopes=None): + cred = self._make_copy() + cred._scopes = scopes + cred._default_scopes = default_scopes + return cred + + def with_always_use_jwt_access(self, always_use_jwt_access): + """Create a copy of these credentials with the specified always_use_jwt_access value. + + Args: + always_use_jwt_access (bool): Whether always use self signed JWT or not. + + Returns: + google.auth.service_account.Credentials: A new credentials + instance. + Raises: + google.auth.exceptions.InvalidValue: If the universe domain is not + default and always_use_jwt_access is False. + """ + cred = self._make_copy() + if ( + cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN + and not always_use_jwt_access + ): + raise exceptions.InvalidValue( + "always_use_jwt_access should be True for non-default universe domain" + ) + cred._always_use_jwt_access = always_use_jwt_access + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain) + def with_universe_domain(self, universe_domain): + cred = self._make_copy() + cred._universe_domain = universe_domain + if universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: + cred._always_use_jwt_access = True + return cred + + def with_subject(self, subject): + """Create a copy of these credentials with the specified subject. + + Args: + subject (str): The subject claim. + + Returns: + google.auth.service_account.Credentials: A new credentials + instance. + """ + cred = self._make_copy() + cred._subject = subject + return cred + + def with_claims(self, additional_claims): + """Returns a copy of these credentials with modified claims. + + Args: + additional_claims (Mapping[str, str]): Any additional claims for + the JWT payload. This will be merged with the current + additional claims. + + Returns: + google.auth.service_account.Credentials: A new credentials + instance. + """ + new_additional_claims = copy.deepcopy(self._additional_claims) + new_additional_claims.update(additional_claims or {}) + cred = self._make_copy() + cred._additional_claims = new_additional_claims + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + cred = self._make_copy() + cred._quota_project_id = quota_project_id + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) + def with_token_uri(self, token_uri): + cred = self._make_copy() + cred._token_uri = token_uri + return cred + + def _make_authorization_grant_assertion(self): + """Create the OAuth 2.0 assertion. + + This assertion is used during the OAuth 2.0 grant to acquire an + access token. + + Returns: + bytes: The authorization grant assertion. + """ + now = _helpers.utcnow() + lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) + expiry = now + lifetime + + payload = { + "iat": _helpers.datetime_to_secs(now), + "exp": _helpers.datetime_to_secs(expiry), + # The issuer must be the service account email. + "iss": self._service_account_email, + # The audience must be the auth token endpoint's URI + "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, + "scope": _helpers.scopes_to_string(self._scopes or ()), + } + + payload.update(self._additional_claims) + + # The subject can be a user email for domain-wide delegation. + if self._subject: + payload.setdefault("sub", self._subject) + + token = jwt.encode(self._signer, payload) + + return token + + def _use_self_signed_jwt(self): + # Since domain wide delegation doesn't work with self signed JWT. If + # subject exists, then we should not use self signed JWT. + return self._subject is None and self._jwt_credentials is not None + + def _metric_header_for_usage(self): + if self._use_self_signed_jwt(): + return metrics.CRED_TYPE_SA_JWT + return metrics.CRED_TYPE_SA_ASSERTION + + @_helpers.copy_docstring(credentials.Credentials) + def refresh(self, request): + if self._always_use_jwt_access and not self._jwt_credentials: + # If self signed jwt should be used but jwt credential is not + # created, try to create one with scopes + self._create_self_signed_jwt(None) + + if ( + self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN + and self._subject + ): + raise exceptions.RefreshError( + "domain wide delegation is not supported for non-default universe domain" + ) + + if self._use_self_signed_jwt(): + self._jwt_credentials.refresh(request) + self.token = self._jwt_credentials.token.decode() + self.expiry = self._jwt_credentials.expiry + else: + assertion = self._make_authorization_grant_assertion() + access_token, expiry, _ = _client.jwt_grant( + request, self._token_uri, assertion + ) + self.token = access_token + self.expiry = expiry + + def _create_self_signed_jwt(self, audience): + """Create a self-signed JWT from the credentials if requirements are met. + + Args: + audience (str): The service URL. ``https://[API_ENDPOINT]/`` + """ + # https://google.aip.dev/auth/4111 + if self._always_use_jwt_access: + if self._scopes: + additional_claims = {"scope": " ".join(self._scopes)} + if ( + self._jwt_credentials is None + or self._jwt_credentials.additional_claims != additional_claims + ): + self._jwt_credentials = jwt.Credentials.from_signing_credentials( + self, None, additional_claims=additional_claims + ) + elif audience: + if ( + self._jwt_credentials is None + or self._jwt_credentials._audience != audience + ): + + self._jwt_credentials = jwt.Credentials.from_signing_credentials( + self, audience + ) + elif self._default_scopes: + additional_claims = {"scope": " ".join(self._default_scopes)} + if ( + self._jwt_credentials is None + or additional_claims != self._jwt_credentials.additional_claims + ): + self._jwt_credentials = jwt.Credentials.from_signing_credentials( + self, None, additional_claims=additional_claims + ) + elif not self._scopes and audience: + self._jwt_credentials = jwt.Credentials.from_signing_credentials( + self, audience + ) + + @_helpers.copy_docstring(credentials.Signing) + def sign_bytes(self, message): + return self._signer.sign(message) + + @property # type: ignore + @_helpers.copy_docstring(credentials.Signing) + def signer(self): + return self._signer + + @property # type: ignore + @_helpers.copy_docstring(credentials.Signing) + def signer_email(self): + return self._service_account_email + + @_helpers.copy_docstring(credentials.Credentials) + def get_cred_info(self): + if self._cred_file_path: + return { + "credential_source": self._cred_file_path, + "credential_type": "service account credentials", + "principal": self.service_account_email, + } + return None + + +class IDTokenCredentials( + credentials.Signing, + credentials.CredentialsWithQuotaProject, + credentials.CredentialsWithTokenUri, +): + """Open ID Connect ID Token-based service account credentials. + + These credentials are largely similar to :class:`.Credentials`, but instead + of using an OAuth 2.0 Access Token as the bearer token, they use an Open + ID Connect ID Token as the bearer token. These credentials are useful when + communicating to services that require ID Tokens and can not accept access + tokens. + + Usually, you'll create these credentials with one of the helper + constructors. To create credentials using a Google service account + private key JSON file:: + + credentials = ( + service_account.IDTokenCredentials.from_service_account_file( + 'service-account.json')) + + + Or if you already have the service account file loaded:: + + service_account_info = json.load(open('service_account.json')) + credentials = ( + service_account.IDTokenCredentials.from_service_account_info( + service_account_info)) + + + Both helper methods pass on arguments to the constructor, so you can + specify additional scopes and a subject if necessary:: + + credentials = ( + service_account.IDTokenCredentials.from_service_account_file( + 'service-account.json', + scopes=['email'], + subject='user@example.com')) + + + The credentials are considered immutable. If you want to modify the scopes + or the subject used for delegation, use :meth:`with_scopes` or + :meth:`with_subject`:: + + scoped_credentials = credentials.with_scopes(['email']) + delegated_credentials = credentials.with_subject(subject) + + """ + + def __init__( + self, + signer, + service_account_email, + token_uri, + target_audience, + additional_claims=None, + quota_project_id=None, + universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN, + ): + """ + Args: + signer (google.auth.crypt.Signer): The signer used to sign JWTs. + service_account_email (str): The service account's email. + token_uri (str): The OAuth 2.0 Token URI. + target_audience (str): The intended audience for these credentials, + used when requesting the ID Token. The ID Token's ``aud`` claim + will be set to this string. + additional_claims (Mapping[str, str]): Any additional claims for + the JWT assertion used in the authorization grant. + quota_project_id (Optional[str]): The project ID used for quota and billing. + universe_domain (str): The universe domain. The default + universe domain is googleapis.com. For default value IAM ID + token endponint is used for token refresh. Note that + iam.serviceAccountTokenCreator role is required to use the IAM + endpoint. + .. note:: Typically one of the helper constructors + :meth:`from_service_account_file` or + :meth:`from_service_account_info` are used instead of calling the + constructor directly. + """ + super(IDTokenCredentials, self).__init__() + self._signer = signer + self._service_account_email = service_account_email + self._token_uri = token_uri + self._target_audience = target_audience + self._quota_project_id = quota_project_id + self._use_iam_endpoint = False + + if not universe_domain: + self._universe_domain = credentials.DEFAULT_UNIVERSE_DOMAIN + else: + self._universe_domain = universe_domain + self._iam_id_token_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace( + "googleapis.com", self._universe_domain + ) + + if self._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN: + self._use_iam_endpoint = True + + if additional_claims is not None: + self._additional_claims = additional_claims + else: + self._additional_claims = {} + + @classmethod + def _from_signer_and_info(cls, signer, info, **kwargs): + """Creates a credentials instance from a signer and service account + info. + + Args: + signer (google.auth.crypt.Signer): The signer used to sign JWTs. + info (Mapping[str, str]): The service account info. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.jwt.IDTokenCredentials: The constructed credentials. + + Raises: + ValueError: If the info is not in the expected format. + """ + kwargs.setdefault("service_account_email", info["client_email"]) + kwargs.setdefault("token_uri", info["token_uri"]) + if "universe_domain" in info: + kwargs["universe_domain"] = info["universe_domain"] + return cls(signer, **kwargs) + + @classmethod + def from_service_account_info(cls, info, **kwargs): + """Creates a credentials instance from parsed service account info. + + Args: + info (Mapping[str, str]): The service account info in Google + format. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.service_account.IDTokenCredentials: The constructed + credentials. + + Raises: + ValueError: If the info is not in the expected format. + """ + signer = _service_account_info.from_dict( + info, require=["client_email", "token_uri"] + ) + return cls._from_signer_and_info(signer, info, **kwargs) + + @classmethod + def from_service_account_file(cls, filename, **kwargs): + """Creates a credentials instance from a service account json file. + + Args: + filename (str): The path to the service account json file. + kwargs: Additional arguments to pass to the constructor. + + Returns: + google.auth.service_account.IDTokenCredentials: The constructed + credentials. + """ + info, signer = _service_account_info.from_filename( + filename, require=["client_email", "token_uri"] + ) + return cls._from_signer_and_info(signer, info, **kwargs) + + def _make_copy(self): + cred = self.__class__( + self._signer, + service_account_email=self._service_account_email, + token_uri=self._token_uri, + target_audience=self._target_audience, + additional_claims=self._additional_claims.copy(), + quota_project_id=self.quota_project_id, + universe_domain=self._universe_domain, + ) + # _use_iam_endpoint is not exposed in the constructor + cred._use_iam_endpoint = self._use_iam_endpoint + return cred + + def with_target_audience(self, target_audience): + """Create a copy of these credentials with the specified target + audience. + + Args: + target_audience (str): The intended audience for these credentials, + used when requesting the ID Token. + + Returns: + google.auth.service_account.IDTokenCredentials: A new credentials + instance. + """ + cred = self._make_copy() + cred._target_audience = target_audience + return cred + + def _with_use_iam_endpoint(self, use_iam_endpoint): + """Create a copy of these credentials with the use_iam_endpoint value. + + Args: + use_iam_endpoint (bool): If True, IAM generateIdToken endpoint will + be used instead of the token_uri. Note that + iam.serviceAccountTokenCreator role is required to use the IAM + endpoint. The default value is False. This feature is currently + experimental and subject to change without notice. + + Returns: + google.auth.service_account.IDTokenCredentials: A new credentials + instance. + Raises: + google.auth.exceptions.InvalidValue: If the universe domain is not + default and use_iam_endpoint is False. + """ + cred = self._make_copy() + if ( + cred._universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN + and not use_iam_endpoint + ): + raise exceptions.InvalidValue( + "use_iam_endpoint should be True for non-default universe domain" + ) + cred._use_iam_endpoint = use_iam_endpoint + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) + def with_quota_project(self, quota_project_id): + cred = self._make_copy() + cred._quota_project_id = quota_project_id + return cred + + @_helpers.copy_docstring(credentials.CredentialsWithTokenUri) + def with_token_uri(self, token_uri): + cred = self._make_copy() + cred._token_uri = token_uri + return cred + + def _make_authorization_grant_assertion(self): + """Create the OAuth 2.0 assertion. + + This assertion is used during the OAuth 2.0 grant to acquire an + ID token. + + Returns: + bytes: The authorization grant assertion. + """ + now = _helpers.utcnow() + lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS) + expiry = now + lifetime + + payload = { + "iat": _helpers.datetime_to_secs(now), + "exp": _helpers.datetime_to_secs(expiry), + # The issuer must be the service account email. + "iss": self.service_account_email, + # The audience must be the auth token endpoint's URI + "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT, + # The target audience specifies which service the ID token is + # intended for. + "target_audience": self._target_audience, + } + + payload.update(self._additional_claims) + + token = jwt.encode(self._signer, payload) + + return token + + def _refresh_with_iam_endpoint(self, request): + """Use IAM generateIdToken endpoint to obtain an ID token. + + It works as follows: + + 1. First we create a self signed jwt with + https://www.googleapis.com/auth/iam being the scope. + + 2. Next we use the self signed jwt as the access token, and make a POST + request to IAM generateIdToken endpoint. The request body is: + { + "audience": self._target_audience, + "includeEmail": "true", + "useEmailAzp": "true", + } + + If the request is succesfully, it will return {"token":"the ID token"}, + and we can extract the ID token and compute its expiry. + """ + jwt_credentials = jwt.Credentials.from_signing_credentials( + self, + None, + additional_claims={"scope": "https://www.googleapis.com/auth/iam"}, + ) + jwt_credentials.refresh(request) + self.token, self.expiry = _client.call_iam_generate_id_token_endpoint( + request, + self._iam_id_token_endpoint, + self.signer_email, + self._target_audience, + jwt_credentials.token.decode(), + self._universe_domain, + ) + + @_helpers.copy_docstring(credentials.Credentials) + def refresh(self, request): + if self._use_iam_endpoint: + self._refresh_with_iam_endpoint(request) + else: + assertion = self._make_authorization_grant_assertion() + access_token, expiry, _ = _client.id_token_jwt_grant( + request, self._token_uri, assertion + ) + self.token = access_token + self.expiry = expiry + + @property + def service_account_email(self): + """The service account email.""" + return self._service_account_email + + @_helpers.copy_docstring(credentials.Signing) + def sign_bytes(self, message): + return self._signer.sign(message) + + @property # type: ignore + @_helpers.copy_docstring(credentials.Signing) + def signer(self): + return self._signer + + @property # type: ignore + @_helpers.copy_docstring(credentials.Signing) + def signer_email(self): + return self._service_account_email diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/sts.py b/.venv/lib/python3.12/site-packages/google/oauth2/sts.py new file mode 100644 index 00000000..ad396273 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/sts.py @@ -0,0 +1,176 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OAuth 2.0 Token Exchange Spec. + +This module defines a token exchange utility based on the `OAuth 2.0 Token +Exchange`_ spec. This will be mainly used to exchange external credentials +for GCP access tokens in workload identity pools to access Google APIs. + +The implementation will support various types of client authentication as +allowed in the spec. + +A deviation on the spec will be for additional Google specific options that +cannot be easily mapped to parameters defined in the RFC. + +The returned dictionary response will be based on the `rfc8693 section 2.2.1`_ +spec JSON response. + +.. _OAuth 2.0 Token Exchange: https://tools.ietf.org/html/rfc8693 +.. _rfc8693 section 2.2.1: https://tools.ietf.org/html/rfc8693#section-2.2.1 +""" + +import http.client as http_client +import json +import urllib + +from google.oauth2 import utils + + +_URLENCODED_HEADERS = {"Content-Type": "application/x-www-form-urlencoded"} + + +class Client(utils.OAuthClientAuthHandler): + """Implements the OAuth 2.0 token exchange spec based on + https://tools.ietf.org/html/rfc8693. + """ + + def __init__(self, token_exchange_endpoint, client_authentication=None): + """Initializes an STS client instance. + + Args: + token_exchange_endpoint (str): The token exchange endpoint. + client_authentication (Optional(google.oauth2.oauth2_utils.ClientAuthentication)): + The optional OAuth client authentication credentials if available. + """ + super(Client, self).__init__(client_authentication) + self._token_exchange_endpoint = token_exchange_endpoint + + def _make_request(self, request, headers, request_body): + # Initialize request headers. + request_headers = _URLENCODED_HEADERS.copy() + + # Inject additional headers. + if headers: + for k, v in dict(headers).items(): + request_headers[k] = v + + # Apply OAuth client authentication. + self.apply_client_authentication_options(request_headers, request_body) + + # Execute request. + response = request( + url=self._token_exchange_endpoint, + method="POST", + headers=request_headers, + body=urllib.parse.urlencode(request_body).encode("utf-8"), + ) + + response_body = ( + response.data.decode("utf-8") + if hasattr(response.data, "decode") + else response.data + ) + + # If non-200 response received, translate to OAuthError exception. + if response.status != http_client.OK: + utils.handle_error_response(response_body) + + response_data = json.loads(response_body) + + # Return successful response. + return response_data + + def exchange_token( + self, + request, + grant_type, + subject_token, + subject_token_type, + resource=None, + audience=None, + scopes=None, + requested_token_type=None, + actor_token=None, + actor_token_type=None, + additional_options=None, + additional_headers=None, + ): + """Exchanges the provided token for another type of token based on the + rfc8693 spec. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + grant_type (str): The OAuth 2.0 token exchange grant type. + subject_token (str): The OAuth 2.0 token exchange subject token. + subject_token_type (str): The OAuth 2.0 token exchange subject token type. + resource (Optional[str]): The optional OAuth 2.0 token exchange resource field. + audience (Optional[str]): The optional OAuth 2.0 token exchange audience field. + scopes (Optional[Sequence[str]]): The optional list of scopes to use. + requested_token_type (Optional[str]): The optional OAuth 2.0 token exchange requested + token type. + actor_token (Optional[str]): The optional OAuth 2.0 token exchange actor token. + actor_token_type (Optional[str]): The optional OAuth 2.0 token exchange actor token type. + additional_options (Optional[Mapping[str, str]]): The optional additional + non-standard Google specific options. + additional_headers (Optional[Mapping[str, str]]): The optional additional + headers to pass to the token exchange endpoint. + + Returns: + Mapping[str, str]: The token exchange JSON-decoded response data containing + the requested token and its expiration time. + + Raises: + google.auth.exceptions.OAuthError: If the token endpoint returned + an error. + """ + # Initialize request body. + request_body = { + "grant_type": grant_type, + "resource": resource, + "audience": audience, + "scope": " ".join(scopes or []), + "requested_token_type": requested_token_type, + "subject_token": subject_token, + "subject_token_type": subject_token_type, + "actor_token": actor_token, + "actor_token_type": actor_token_type, + "options": None, + } + # Add additional non-standard options. + if additional_options: + request_body["options"] = urllib.parse.quote(json.dumps(additional_options)) + # Remove empty fields in request body. + for k, v in dict(request_body).items(): + if v is None or v == "": + del request_body[k] + + return self._make_request(request, additional_headers, request_body) + + def refresh_token(self, request, refresh_token): + """Exchanges a refresh token for an access token based on the + RFC6749 spec. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + subject_token (str): The OAuth 2.0 refresh token. + """ + + return self._make_request( + request, + None, + {"grant_type": "refresh_token", "refresh_token": refresh_token}, + ) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/utils.py b/.venv/lib/python3.12/site-packages/google/oauth2/utils.py new file mode 100644 index 00000000..d72ff191 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/utils.py @@ -0,0 +1,168 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OAuth 2.0 Utilities. + +This module provides implementations for various OAuth 2.0 utilities. +This includes `OAuth error handling`_ and +`Client authentication for OAuth flows`_. + +OAuth error handling +-------------------- +This will define interfaces for handling OAuth related error responses as +stated in `RFC 6749 section 5.2`_. +This will include a common function to convert these HTTP error responses to a +:class:`google.auth.exceptions.OAuthError` exception. + + +Client authentication for OAuth flows +------------------------------------- +We introduce an interface for defining client authentication credentials based +on `RFC 6749 section 2.3.1`_. This will expose the following +capabilities: + + * Ability to support basic authentication via request header. + * Ability to support bearer token authentication via request header. + * Ability to support client ID / secret authentication via request body. + +.. _RFC 6749 section 2.3.1: https://tools.ietf.org/html/rfc6749#section-2.3.1 +.. _RFC 6749 section 5.2: https://tools.ietf.org/html/rfc6749#section-5.2 +""" + +import abc +import base64 +import enum +import json + +from google.auth import exceptions + + +# OAuth client authentication based on +# https://tools.ietf.org/html/rfc6749#section-2.3. +class ClientAuthType(enum.Enum): + basic = 1 + request_body = 2 + + +class ClientAuthentication(object): + """Defines the client authentication credentials for basic and request-body + types based on https://tools.ietf.org/html/rfc6749#section-2.3.1. + """ + + def __init__(self, client_auth_type, client_id, client_secret=None): + """Instantiates a client authentication object containing the client ID + and secret credentials for basic and response-body auth. + + Args: + client_auth_type (google.oauth2.oauth_utils.ClientAuthType): The + client authentication type. + client_id (str): The client ID. + client_secret (Optional[str]): The client secret. + """ + self.client_auth_type = client_auth_type + self.client_id = client_id + self.client_secret = client_secret + + +class OAuthClientAuthHandler(metaclass=abc.ABCMeta): + """Abstract class for handling client authentication in OAuth-based + operations. + """ + + def __init__(self, client_authentication=None): + """Instantiates an OAuth client authentication handler. + + Args: + client_authentication (Optional[google.oauth2.utils.ClientAuthentication]): + The OAuth client authentication credentials if available. + """ + super(OAuthClientAuthHandler, self).__init__() + self._client_authentication = client_authentication + + def apply_client_authentication_options( + self, headers, request_body=None, bearer_token=None + ): + """Applies client authentication on the OAuth request's headers or POST + body. + + Args: + headers (Mapping[str, str]): The HTTP request header. + request_body (Optional[Mapping[str, str]]): The HTTP request body + dictionary. For requests that do not support request body, this + is None and will be ignored. + bearer_token (Optional[str]): The optional bearer token. + """ + # Inject authenticated header. + self._inject_authenticated_headers(headers, bearer_token) + # Inject authenticated request body. + if bearer_token is None: + self._inject_authenticated_request_body(request_body) + + def _inject_authenticated_headers(self, headers, bearer_token=None): + if bearer_token is not None: + headers["Authorization"] = "Bearer %s" % bearer_token + elif ( + self._client_authentication is not None + and self._client_authentication.client_auth_type is ClientAuthType.basic + ): + username = self._client_authentication.client_id + password = self._client_authentication.client_secret or "" + + credentials = base64.b64encode( + ("%s:%s" % (username, password)).encode() + ).decode() + headers["Authorization"] = "Basic %s" % credentials + + def _inject_authenticated_request_body(self, request_body): + if ( + self._client_authentication is not None + and self._client_authentication.client_auth_type + is ClientAuthType.request_body + ): + if request_body is None: + raise exceptions.OAuthError( + "HTTP request does not support request-body" + ) + else: + request_body["client_id"] = self._client_authentication.client_id + request_body["client_secret"] = ( + self._client_authentication.client_secret or "" + ) + + +def handle_error_response(response_body): + """Translates an error response from an OAuth operation into an + OAuthError exception. + + Args: + response_body (str): The decoded response data. + + Raises: + google.auth.exceptions.OAuthError + """ + try: + error_components = [] + error_data = json.loads(response_body) + + error_components.append("Error code {}".format(error_data["error"])) + if "error_description" in error_data: + error_components.append(": {}".format(error_data["error_description"])) + if "error_uri" in error_data: + error_components.append(" - {}".format(error_data["error_uri"])) + error_details = "".join(error_components) + # If no details could be extracted, use the response data. + except (KeyError, ValueError): + error_details = response_body + + raise exceptions.OAuthError(error_details, response_body) diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py new file mode 100644 index 00000000..e27c7e09 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler.py @@ -0,0 +1,82 @@ +import abc +import os +import struct +import subprocess + +from google.auth import exceptions +from google.oauth2.webauthn_types import GetRequest, GetResponse + + +class WebAuthnHandler(abc.ABC): + @abc.abstractmethod + def is_available(self) -> bool: + """Check whether this WebAuthn handler is available""" + raise NotImplementedError("is_available method must be implemented") + + @abc.abstractmethod + def get(self, get_request: GetRequest) -> GetResponse: + """WebAuthn get (assertion)""" + raise NotImplementedError("get method must be implemented") + + +class PluginHandler(WebAuthnHandler): + """Offloads WebAuthn get reqeust to a pluggable command-line tool. + + Offloads WebAuthn get to a plugin which takes the form of a + command-line tool. The command-line tool is configurable via the + PluginHandler._ENV_VAR environment variable. + + The WebAuthn plugin should implement the following interface: + + Communication occurs over stdin/stdout, and messages are both sent and + received in the form: + + [4 bytes - payload size (little-endian)][variable bytes - json payload] + """ + + _ENV_VAR = "GOOGLE_AUTH_WEBAUTHN_PLUGIN" + + def is_available(self) -> bool: + try: + self._find_plugin() + except Exception: + return False + else: + return True + + def get(self, get_request: GetRequest) -> GetResponse: + request_json = get_request.to_json() + cmd = self._find_plugin() + response_json = self._call_plugin(cmd, request_json) + return GetResponse.from_json(response_json) + + def _call_plugin(self, cmd: str, input_json: str) -> str: + # Calculate length of input + input_length = len(input_json) + length_bytes_le = struct.pack("<I", input_length) + request = length_bytes_le + input_json.encode() + + # Call plugin + process_result = subprocess.run( + [cmd], input=request, capture_output=True, check=True + ) + + # Check length of response + response_len_le = process_result.stdout[:4] + response_len = struct.unpack("<I", response_len_le)[0] + response = process_result.stdout[4:] + if response_len != len(response): + raise exceptions.MalformedError( + "Plugin response length {} does not match data {}".format( + response_len, len(response) + ) + ) + return response.decode() + + def _find_plugin(self) -> str: + plugin_cmd = os.environ.get(PluginHandler._ENV_VAR) + if plugin_cmd is None: + raise exceptions.InvalidResource( + "{} env var is not set".format(PluginHandler._ENV_VAR) + ) + return plugin_cmd diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler_factory.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler_factory.py new file mode 100644 index 00000000..184329fe --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_handler_factory.py @@ -0,0 +1,16 @@ +from typing import List, Optional + +from google.oauth2.webauthn_handler import PluginHandler, WebAuthnHandler + + +class WebauthnHandlerFactory: + handlers: List[WebAuthnHandler] + + def __init__(self): + self.handlers = [PluginHandler()] + + def get_handler(self) -> Optional[WebAuthnHandler]: + for handler in self.handlers: + if handler.is_available(): + return handler + return None diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_types.py b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_types.py new file mode 100644 index 00000000..7784e83d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/google/oauth2/webauthn_types.py @@ -0,0 +1,156 @@ +from dataclasses import dataclass +import json +from typing import Any, Dict, List, Optional + +from google.auth import exceptions + + +@dataclass(frozen=True) +class PublicKeyCredentialDescriptor: + """Descriptor for a security key based credential. + + https://www.w3.org/TR/webauthn-3/#dictionary-credential-descriptor + + Args: + id: <url-safe base64-encoded> credential id (key handle). + transports: <'usb'|'nfc'|'ble'|'internal'> List of supported transports. + """ + + id: str + transports: Optional[List[str]] = None + + def to_dict(self): + cred = {"type": "public-key", "id": self.id} + if self.transports: + cred["transports"] = self.transports + return cred + + +@dataclass +class AuthenticationExtensionsClientInputs: + """Client extensions inputs for WebAuthn extensions. + + Args: + appid: app id that can be asserted with in addition to rpid. + https://www.w3.org/TR/webauthn-3/#sctn-appid-extension + """ + + appid: Optional[str] = None + + def to_dict(self): + extensions = {} + if self.appid: + extensions["appid"] = self.appid + return extensions + + +@dataclass +class GetRequest: + """WebAuthn get request + + Args: + origin: Origin where the WebAuthn get assertion takes place. + rpid: Relying Party ID. + challenge: <url-safe base64-encoded> raw challenge. + timeout_ms: Timeout number in millisecond. + allow_credentials: List of allowed credentials. + user_verification: <'required'|'preferred'|'discouraged'> User verification requirement. + extensions: WebAuthn authentication extensions inputs. + """ + + origin: str + rpid: str + challenge: str + timeout_ms: Optional[int] = None + allow_credentials: Optional[List[PublicKeyCredentialDescriptor]] = None + user_verification: Optional[str] = None + extensions: Optional[AuthenticationExtensionsClientInputs] = None + + def to_json(self) -> str: + req_options: Dict[str, Any] = {"rpid": self.rpid, "challenge": self.challenge} + if self.timeout_ms: + req_options["timeout"] = self.timeout_ms + if self.allow_credentials: + req_options["allowCredentials"] = [ + c.to_dict() for c in self.allow_credentials + ] + if self.user_verification: + req_options["userVerification"] = self.user_verification + if self.extensions: + req_options["extensions"] = self.extensions.to_dict() + return json.dumps( + {"type": "get", "origin": self.origin, "requestData": req_options} + ) + + +@dataclass(frozen=True) +class AuthenticatorAssertionResponse: + """Authenticator response to a WebAuthn get (assertion) request. + + https://www.w3.org/TR/webauthn-3/#authenticatorassertionresponse + + Args: + client_data_json: <url-safe base64-encoded> client data JSON. + authenticator_data: <url-safe base64-encoded> authenticator data. + signature: <url-safe base64-encoded> signature. + user_handle: <url-safe base64-encoded> user handle. + """ + + client_data_json: str + authenticator_data: str + signature: str + user_handle: Optional[str] + + +@dataclass(frozen=True) +class GetResponse: + """WebAuthn get (assertion) response. + + Args: + id: <url-safe base64-encoded> credential id (key handle). + response: The authenticator assertion response. + authenticator_attachment: <'cross-platform'|'platform'> The attachment status of the authenticator. + client_extension_results: WebAuthn authentication extensions output results in a dictionary. + """ + + id: str + response: AuthenticatorAssertionResponse + authenticator_attachment: Optional[str] + client_extension_results: Optional[Dict] + + @staticmethod + def from_json(json_str: str): + """Verify and construct GetResponse from a JSON string.""" + try: + resp_json = json.loads(json_str) + except ValueError: + raise exceptions.MalformedError("Invalid Get JSON response") + if resp_json.get("type") != "getResponse": + raise exceptions.MalformedError( + "Invalid Get response type: {}".format(resp_json.get("type")) + ) + pk_cred = resp_json.get("responseData") + if pk_cred is None: + if resp_json.get("error"): + raise exceptions.ReauthFailError( + "WebAuthn.get failure: {}".format(resp_json["error"]) + ) + else: + raise exceptions.MalformedError("Get response is empty") + if pk_cred.get("type") != "public-key": + raise exceptions.MalformedError( + "Invalid credential type: {}".format(pk_cred.get("type")) + ) + assertion_json = pk_cred["response"] + assertion_resp = AuthenticatorAssertionResponse( + client_data_json=assertion_json["clientDataJSON"], + authenticator_data=assertion_json["authenticatorData"], + signature=assertion_json["signature"], + user_handle=assertion_json.get("userHandle"), + ) + return GetResponse( + id=pk_cred["id"], + response=assertion_resp, + authenticator_attachment=pk_cred.get("authenticatorAttachment"), + client_extension_results=pk_cred.get("clientExtensionResults"), + ) |