aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py251
1 files changed, 251 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py b/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py
new file mode 100644
index 00000000..7410cfc2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/oauth2/gdch_credentials.py
@@ -0,0 +1,251 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Experimental GDCH credentials support.
+"""
+
+import datetime
+
+from google.auth import _helpers
+from google.auth import _service_account_info
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import jwt
+from google.oauth2 import _client
+
+
+TOKEN_EXCHANGE_TYPE = "urn:ietf:params:oauth:token-type:token-exchange"
+ACCESS_TOKEN_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+SERVICE_ACCOUNT_TOKEN_TYPE = "urn:k8s:params:oauth:token-type:serviceaccount"
+JWT_LIFETIME = datetime.timedelta(seconds=3600) # 1 hour
+
+
+class ServiceAccountCredentials(credentials.Credentials):
+ """Credentials for GDCH (`Google Distributed Cloud Hosted`_) for service
+ account users.
+
+ .. _Google Distributed Cloud Hosted:
+ https://cloud.google.com/blog/topics/hybrid-cloud/\
+ announcing-google-distributed-cloud-edge-and-hosted
+
+ To create a GDCH service account credential, first create a JSON file of
+ the following format::
+
+ {
+ "type": "gdch_service_account",
+ "format_version": "1",
+ "project": "<project name>",
+ "private_key_id": "<key id>",
+ "private_key": "-----BEGIN EC PRIVATE KEY-----\n<key bytes>\n-----END EC PRIVATE KEY-----\n",
+ "name": "<service identity name>",
+ "ca_cert_path": "<CA cert path>",
+ "token_uri": "https://service-identity.<Domain>/authenticate"
+ }
+
+ The "format_version" field stands for the format of the JSON file. For now
+ it is always "1". The `private_key_id` and `private_key` is used for signing.
+ The `ca_cert_path` is used for token server TLS certificate verification.
+
+ After the JSON file is created, set `GOOGLE_APPLICATION_CREDENTIALS` environment
+ variable to the JSON file path, then use the following code to create the
+ credential::
+
+ import google.auth
+
+ credential, _ = google.auth.default()
+ credential = credential.with_gdch_audience("<the audience>")
+
+ We can also create the credential directly::
+
+ from google.oauth import gdch_credentials
+
+ credential = gdch_credentials.ServiceAccountCredentials.from_service_account_file("<the json file path>")
+ credential = credential.with_gdch_audience("<the audience>")
+
+ The token is obtained in the following way. This class first creates a
+ self signed JWT. It uses the `name` value as the `iss` and `sub` claim, and
+ the `token_uri` as the `aud` claim, and signs the JWT with the `private_key`.
+ It then sends the JWT to the `token_uri` to exchange a final token for
+ `audience`.
+ """
+
+ def __init__(
+ self, signer, service_identity_name, project, audience, token_uri, ca_cert_path
+ ):
+ """
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ service_identity_name (str): The service identity name. It will be
+ used as the `iss` and `sub` claim in the self signed JWT.
+ project (str): The project.
+ audience (str): The audience for the final token.
+ token_uri (str): The token server uri.
+ ca_cert_path (str): The CA cert path for token server side TLS
+ certificate verification. If the token server uses well known
+ CA, then this parameter can be `None`.
+ """
+ super(ServiceAccountCredentials, self).__init__()
+ self._signer = signer
+ self._service_identity_name = service_identity_name
+ self._project = project
+ self._audience = audience
+ self._token_uri = token_uri
+ self._ca_cert_path = ca_cert_path
+
+ def _create_jwt(self):
+ now = _helpers.utcnow()
+ expiry = now + JWT_LIFETIME
+ iss_sub_value = "system:serviceaccount:{}:{}".format(
+ self._project, self._service_identity_name
+ )
+
+ payload = {
+ "iss": iss_sub_value,
+ "sub": iss_sub_value,
+ "aud": self._token_uri,
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(expiry),
+ }
+
+ return _helpers.from_bytes(jwt.encode(self._signer, payload))
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ import google.auth.transport.requests
+
+ if not isinstance(request, google.auth.transport.requests.Request):
+ raise exceptions.RefreshError(
+ "For GDCH service account credentials, request must be a google.auth.transport.requests.Request object"
+ )
+
+ # Create a self signed JWT, and do token exchange.
+ jwt_token = self._create_jwt()
+ request_body = {
+ "grant_type": TOKEN_EXCHANGE_TYPE,
+ "audience": self._audience,
+ "requested_token_type": ACCESS_TOKEN_TOKEN_TYPE,
+ "subject_token": jwt_token,
+ "subject_token_type": SERVICE_ACCOUNT_TOKEN_TYPE,
+ }
+ response_data = _client._token_endpoint_request(
+ request,
+ self._token_uri,
+ request_body,
+ access_token=None,
+ use_json=True,
+ verify=self._ca_cert_path,
+ )
+
+ self.token, _, self.expiry, _ = _client._handle_refresh_grant_response(
+ response_data, None
+ )
+
+ def with_gdch_audience(self, audience):
+ """Create a copy of GDCH credentials with the specified audience.
+
+ Args:
+ audience (str): The intended audience for GDCH credentials.
+ """
+ return self.__class__(
+ self._signer,
+ self._service_identity_name,
+ self._project,
+ audience,
+ self._token_uri,
+ self._ca_cert_path,
+ )
+
+ @classmethod
+ def _from_signer_and_info(cls, signer, info):
+ """Creates a Credentials instance from a signer and service account
+ info.
+
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ info (Mapping[str, str]): The service account info.
+
+ Returns:
+ google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ if info["format_version"] != "1":
+ raise ValueError("Only format version 1 is supported")
+
+ return cls(
+ signer,
+ info["name"], # service_identity_name
+ info["project"],
+ None, # audience
+ info["token_uri"],
+ info.get("ca_cert_path", None),
+ )
+
+ @classmethod
+ def from_service_account_info(cls, info):
+ """Creates a Credentials instance from parsed service account info.
+
+ Args:
+ info (Mapping[str, str]): The service account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ signer = _service_account_info.from_dict(
+ info,
+ require=[
+ "format_version",
+ "private_key_id",
+ "private_key",
+ "name",
+ "project",
+ "token_uri",
+ ],
+ use_rsa_signer=False,
+ )
+ return cls._from_signer_and_info(signer, info)
+
+ @classmethod
+ def from_service_account_file(cls, filename):
+ """Creates a Credentials instance from a service account json file.
+
+ Args:
+ filename (str): The path to the service account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.oauth2.gdch_credentials.ServiceAccountCredentials: The constructed
+ credentials.
+ """
+ info, signer = _service_account_info.from_filename(
+ filename,
+ require=[
+ "format_version",
+ "private_key_id",
+ "private_key",
+ "name",
+ "project",
+ "token_uri",
+ ],
+ use_rsa_signer=False,
+ )
+ return cls._from_signer_and_info(signer, info)