aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/auth/compute_engine
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/auth/compute_engine
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/auth/compute_engine')
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/compute_engine/__init__.py22
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/compute_engine/_metadata.py375
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/compute_engine/credentials.py496
3 files changed, 893 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/auth/compute_engine/__init__.py b/.venv/lib/python3.12/site-packages/google/auth/compute_engine/__init__.py
new file mode 100644
index 00000000..7e1206fc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/compute_engine/__init__.py
@@ -0,0 +1,22 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google Compute Engine authentication."""
+
+from google.auth.compute_engine._metadata import detect_gce_residency_linux
+from google.auth.compute_engine.credentials import Credentials
+from google.auth.compute_engine.credentials import IDTokenCredentials
+
+
+__all__ = ["Credentials", "IDTokenCredentials", "detect_gce_residency_linux"]
diff --git a/.venv/lib/python3.12/site-packages/google/auth/compute_engine/_metadata.py b/.venv/lib/python3.12/site-packages/google/auth/compute_engine/_metadata.py
new file mode 100644
index 00000000..06f99de0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/compute_engine/_metadata.py
@@ -0,0 +1,375 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Provides helper methods for talking to the Compute Engine metadata server.
+
+See https://cloud.google.com/compute/docs/metadata for more details.
+"""
+
+import datetime
+import http.client as http_client
+import json
+import logging
+import os
+from urllib.parse import urljoin
+
+from google.auth import _helpers
+from google.auth import environment_vars
+from google.auth import exceptions
+from google.auth import metrics
+from google.auth import transport
+from google.auth._exponential_backoff import ExponentialBackoff
+
+_LOGGER = logging.getLogger(__name__)
+
+# Environment variable GCE_METADATA_HOST is originally named
+# GCE_METADATA_ROOT. For compatibility reasons, here it checks
+# the new variable first; if not set, the system falls back
+# to the old variable.
+_GCE_METADATA_HOST = os.getenv(environment_vars.GCE_METADATA_HOST, None)
+if not _GCE_METADATA_HOST:
+ _GCE_METADATA_HOST = os.getenv(
+ environment_vars.GCE_METADATA_ROOT, "metadata.google.internal"
+ )
+_METADATA_ROOT = "http://{}/computeMetadata/v1/".format(_GCE_METADATA_HOST)
+
+# This is used to ping the metadata server, it avoids the cost of a DNS
+# lookup.
+_METADATA_IP_ROOT = "http://{}".format(
+ os.getenv(environment_vars.GCE_METADATA_IP, "169.254.169.254")
+)
+_METADATA_FLAVOR_HEADER = "metadata-flavor"
+_METADATA_FLAVOR_VALUE = "Google"
+_METADATA_HEADERS = {_METADATA_FLAVOR_HEADER: _METADATA_FLAVOR_VALUE}
+
+# Timeout in seconds to wait for the GCE metadata server when detecting the
+# GCE environment.
+try:
+ _METADATA_DEFAULT_TIMEOUT = int(os.getenv("GCE_METADATA_TIMEOUT", 3))
+except ValueError: # pragma: NO COVER
+ _METADATA_DEFAULT_TIMEOUT = 3
+
+# Detect GCE Residency
+_GOOGLE = "Google"
+_GCE_PRODUCT_NAME_FILE = "/sys/class/dmi/id/product_name"
+
+
+def is_on_gce(request):
+ """Checks to see if the code runs on Google Compute Engine
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+
+ Returns:
+ bool: True if the code runs on Google Compute Engine, False otherwise.
+ """
+ if ping(request):
+ return True
+
+ if os.name == "nt":
+ # TODO: implement GCE residency detection on Windows
+ return False
+
+ # Detect GCE residency on Linux
+ return detect_gce_residency_linux()
+
+
+def detect_gce_residency_linux():
+ """Detect Google Compute Engine residency by smbios check on Linux
+
+ Returns:
+ bool: True if the GCE product name file is detected, False otherwise.
+ """
+ try:
+ with open(_GCE_PRODUCT_NAME_FILE, "r") as file_obj:
+ content = file_obj.read().strip()
+
+ except Exception:
+ return False
+
+ return content.startswith(_GOOGLE)
+
+
+def ping(request, timeout=_METADATA_DEFAULT_TIMEOUT, retry_count=3):
+ """Checks to see if the metadata server is available.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ timeout (int): How long to wait for the metadata server to respond.
+ retry_count (int): How many times to attempt connecting to metadata
+ server using above timeout.
+
+ Returns:
+ bool: True if the metadata server is reachable, False otherwise.
+ """
+ # NOTE: The explicit ``timeout`` is a workaround. The underlying
+ # issue is that resolving an unknown host on some networks will take
+ # 20-30 seconds; making this timeout short fixes the issue, but
+ # could lead to false negatives in the event that we are on GCE, but
+ # the metadata resolution was particularly slow. The latter case is
+ # "unlikely".
+ headers = _METADATA_HEADERS.copy()
+ headers[metrics.API_CLIENT_HEADER] = metrics.mds_ping()
+
+ backoff = ExponentialBackoff(total_attempts=retry_count)
+
+ for attempt in backoff:
+ try:
+ response = request(
+ url=_METADATA_IP_ROOT, method="GET", headers=headers, timeout=timeout
+ )
+
+ metadata_flavor = response.headers.get(_METADATA_FLAVOR_HEADER)
+ return (
+ response.status == http_client.OK
+ and metadata_flavor == _METADATA_FLAVOR_VALUE
+ )
+
+ except exceptions.TransportError as e:
+ _LOGGER.warning(
+ "Compute Engine Metadata server unavailable on "
+ "attempt %s of %s. Reason: %s",
+ attempt,
+ retry_count,
+ e,
+ )
+
+ return False
+
+
+def get(
+ request,
+ path,
+ root=_METADATA_ROOT,
+ params=None,
+ recursive=False,
+ retry_count=5,
+ headers=None,
+ return_none_for_not_found_error=False,
+):
+ """Fetch a resource from the metadata server.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ path (str): The resource to retrieve. For example,
+ ``'instance/service-accounts/default'``.
+ root (str): The full path to the metadata server root.
+ params (Optional[Mapping[str, str]]): A mapping of query parameter
+ keys to values.
+ recursive (bool): Whether to do a recursive query of metadata. See
+ https://cloud.google.com/compute/docs/metadata#aggcontents for more
+ details.
+ retry_count (int): How many times to attempt connecting to metadata
+ server using above timeout.
+ headers (Optional[Mapping[str, str]]): Headers for the request.
+ return_none_for_not_found_error (Optional[bool]): If True, returns None
+ for 404 error instead of throwing an exception.
+
+ Returns:
+ Union[Mapping, str]: If the metadata server returns JSON, a mapping of
+ the decoded JSON is returned. Otherwise, the response content is
+ returned as a string.
+
+ Raises:
+ google.auth.exceptions.TransportError: if an error occurred while
+ retrieving metadata.
+ """
+ base_url = urljoin(root, path)
+ query_params = {} if params is None else params
+
+ headers_to_use = _METADATA_HEADERS.copy()
+ if headers:
+ headers_to_use.update(headers)
+
+ if recursive:
+ query_params["recursive"] = "true"
+
+ url = _helpers.update_query(base_url, query_params)
+
+ backoff = ExponentialBackoff(total_attempts=retry_count)
+ failure_reason = None
+ for attempt in backoff:
+ try:
+ response = request(url=url, method="GET", headers=headers_to_use)
+ if response.status in transport.DEFAULT_RETRYABLE_STATUS_CODES:
+ _LOGGER.warning(
+ "Compute Engine Metadata server unavailable on "
+ "attempt %s of %s. Response status: %s",
+ attempt,
+ retry_count,
+ response.status,
+ )
+ failure_reason = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+ continue
+ else:
+ break
+
+ except exceptions.TransportError as e:
+ _LOGGER.warning(
+ "Compute Engine Metadata server unavailable on "
+ "attempt %s of %s. Reason: %s",
+ attempt,
+ retry_count,
+ e,
+ )
+ failure_reason = e
+ else:
+ raise exceptions.TransportError(
+ "Failed to retrieve {} from the Google Compute Engine "
+ "metadata service. Compute Engine Metadata server unavailable due to {}".format(
+ url, failure_reason
+ )
+ )
+
+ content = _helpers.from_bytes(response.data)
+
+ if response.status == http_client.NOT_FOUND and return_none_for_not_found_error:
+ return None
+
+ if response.status == http_client.OK:
+ if (
+ _helpers.parse_content_type(response.headers["content-type"])
+ == "application/json"
+ ):
+ try:
+ return json.loads(content)
+ except ValueError as caught_exc:
+ new_exc = exceptions.TransportError(
+ "Received invalid JSON from the Google Compute Engine "
+ "metadata service: {:.20}".format(content)
+ )
+ raise new_exc from caught_exc
+ else:
+ return content
+
+ raise exceptions.TransportError(
+ "Failed to retrieve {} from the Google Compute Engine "
+ "metadata service. Status: {} Response:\n{}".format(
+ url, response.status, response.data
+ ),
+ response,
+ )
+
+
+def get_project_id(request):
+ """Get the Google Cloud Project ID from the metadata server.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+
+ Returns:
+ str: The project ID
+
+ Raises:
+ google.auth.exceptions.TransportError: if an error occurred while
+ retrieving metadata.
+ """
+ return get(request, "project/project-id")
+
+
+def get_universe_domain(request):
+ """Get the universe domain value from the metadata server.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+
+ Returns:
+ str: The universe domain value. If the universe domain endpoint is not
+ not found, return the default value, which is googleapis.com
+
+ Raises:
+ google.auth.exceptions.TransportError: if an error other than
+ 404 occurs while retrieving metadata.
+ """
+ universe_domain = get(
+ request, "universe/universe-domain", return_none_for_not_found_error=True
+ )
+ if not universe_domain:
+ return "googleapis.com"
+ return universe_domain
+
+
+def get_service_account_info(request, service_account="default"):
+ """Get information about a service account from the metadata server.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ service_account (str): The string 'default' or a service account email
+ address. The determines which service account for which to acquire
+ information.
+
+ Returns:
+ Mapping: The service account's information, for example::
+
+ {
+ 'email': '...',
+ 'scopes': ['scope', ...],
+ 'aliases': ['default', '...']
+ }
+
+ Raises:
+ google.auth.exceptions.TransportError: if an error occurred while
+ retrieving metadata.
+ """
+ path = "instance/service-accounts/{0}/".format(service_account)
+ # See https://cloud.google.com/compute/docs/metadata#aggcontents
+ # for more on the use of 'recursive'.
+ return get(request, path, params={"recursive": "true"})
+
+
+def get_service_account_token(request, service_account="default", scopes=None):
+ """Get the OAuth 2.0 access token for a service account.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ service_account (str): The string 'default' or a service account email
+ address. The determines which service account for which to acquire
+ an access token.
+ scopes (Optional[Union[str, List[str]]]): Optional string or list of
+ strings with auth scopes.
+ Returns:
+ Tuple[str, datetime]: The access token and its expiration.
+
+ Raises:
+ google.auth.exceptions.TransportError: if an error occurred while
+ retrieving metadata.
+ """
+ if scopes:
+ if not isinstance(scopes, str):
+ scopes = ",".join(scopes)
+ params = {"scopes": scopes}
+ else:
+ params = None
+
+ metrics_header = {
+ metrics.API_CLIENT_HEADER: metrics.token_request_access_token_mds()
+ }
+
+ path = "instance/service-accounts/{0}/token".format(service_account)
+ token_json = get(request, path, params=params, headers=metrics_header)
+ token_expiry = _helpers.utcnow() + datetime.timedelta(
+ seconds=token_json["expires_in"]
+ )
+ return token_json["access_token"], token_expiry
diff --git a/.venv/lib/python3.12/site-packages/google/auth/compute_engine/credentials.py b/.venv/lib/python3.12/site-packages/google/auth/compute_engine/credentials.py
new file mode 100644
index 00000000..f0126c0a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/compute_engine/credentials.py
@@ -0,0 +1,496 @@
+# Copyright 2016 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google Compute Engine credentials.
+
+This module provides authentication for an application running on Google
+Compute Engine using the Compute Engine metadata server.
+
+"""
+
+import datetime
+
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import iam
+from google.auth import jwt
+from google.auth import metrics
+from google.auth.compute_engine import _metadata
+from google.oauth2 import _client
+
+
+class Credentials(
+ credentials.Scoped,
+ credentials.CredentialsWithQuotaProject,
+ credentials.CredentialsWithUniverseDomain,
+):
+ """Compute Engine Credentials.
+
+ These credentials use the Google Compute Engine metadata server to obtain
+ OAuth 2.0 access tokens associated with the instance's service account,
+ and are also used for Cloud Run, Flex and App Engine (except for the Python
+ 2.7 runtime, which is supported only on older versions of this library).
+
+ For more information about Compute Engine authentication, including how
+ to configure scopes, see the `Compute Engine authentication
+ documentation`_.
+
+ .. note:: On Compute Engine the metadata server ignores requested scopes.
+ On Cloud Run, Flex and App Engine the server honours requested scopes.
+
+ .. _Compute Engine authentication documentation:
+ https://cloud.google.com/compute/docs/authentication#using
+ """
+
+ def __init__(
+ self,
+ service_account_email="default",
+ quota_project_id=None,
+ scopes=None,
+ default_scopes=None,
+ universe_domain=None,
+ ):
+ """
+ Args:
+ service_account_email (str): The service account email to use, or
+ 'default'. A Compute Engine instance may have multiple service
+ accounts.
+ quota_project_id (Optional[str]): The project ID used for quota and
+ billing.
+ scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
+ default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+ Google client library. Use 'scopes' for user-defined scopes.
+ universe_domain (Optional[str]): The universe domain. If not
+ provided or None, credential will attempt to fetch the value
+ from metadata server. If metadata server doesn't have universe
+ domain endpoint, then the default googleapis.com will be used.
+ """
+ super(Credentials, self).__init__()
+ self._service_account_email = service_account_email
+ self._quota_project_id = quota_project_id
+ self._scopes = scopes
+ self._default_scopes = default_scopes
+ self._universe_domain_cached = False
+ if universe_domain:
+ self._universe_domain = universe_domain
+ self._universe_domain_cached = True
+
+ def _retrieve_info(self, request):
+ """Retrieve information about the service account.
+
+ Updates the scopes and retrieves the full service account email.
+
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+ """
+ info = _metadata.get_service_account_info(
+ request, service_account=self._service_account_email
+ )
+
+ self._service_account_email = info["email"]
+
+ # Don't override scopes requested by the user.
+ if self._scopes is None:
+ self._scopes = info["scopes"]
+
+ def _metric_header_for_usage(self):
+ return metrics.CRED_TYPE_SA_MDS
+
+ def refresh(self, request):
+ """Refresh the access token and scopes.
+
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the Compute Engine metadata
+ service can't be reached if if the instance has not
+ credentials.
+ """
+ scopes = self._scopes if self._scopes is not None else self._default_scopes
+ try:
+ self._retrieve_info(request)
+ self.token, self.expiry = _metadata.get_service_account_token(
+ request, service_account=self._service_account_email, scopes=scopes
+ )
+ except exceptions.TransportError as caught_exc:
+ new_exc = exceptions.RefreshError(caught_exc)
+ raise new_exc from caught_exc
+
+ @property
+ def service_account_email(self):
+ """The service account email.
+
+ .. note:: This is not guaranteed to be set until :meth:`refresh` has been
+ called.
+ """
+ return self._service_account_email
+
+ @property
+ def requires_scopes(self):
+ return not self._scopes
+
+ @property
+ def universe_domain(self):
+ if self._universe_domain_cached:
+ return self._universe_domain
+
+ from google.auth.transport import requests as google_auth_requests
+
+ self._universe_domain = _metadata.get_universe_domain(
+ google_auth_requests.Request()
+ )
+ self._universe_domain_cached = True
+ return self._universe_domain
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def get_cred_info(self):
+ return {
+ "credential_source": "metadata server",
+ "credential_type": "VM credentials",
+ "principal": self.service_account_email,
+ }
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ creds = self.__class__(
+ service_account_email=self._service_account_email,
+ quota_project_id=quota_project_id,
+ scopes=self._scopes,
+ default_scopes=self._default_scopes,
+ )
+ creds._universe_domain = self._universe_domain
+ creds._universe_domain_cached = self._universe_domain_cached
+ return creds
+
+ @_helpers.copy_docstring(credentials.Scoped)
+ def with_scopes(self, scopes, default_scopes=None):
+ # Compute Engine credentials can not be scoped (the metadata service
+ # ignores the scopes parameter). App Engine, Cloud Run and Flex support
+ # requesting scopes.
+ creds = self.__class__(
+ scopes=scopes,
+ default_scopes=default_scopes,
+ service_account_email=self._service_account_email,
+ quota_project_id=self._quota_project_id,
+ )
+ creds._universe_domain = self._universe_domain
+ creds._universe_domain_cached = self._universe_domain_cached
+ return creds
+
+ @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
+ def with_universe_domain(self, universe_domain):
+ return self.__class__(
+ scopes=self._scopes,
+ default_scopes=self._default_scopes,
+ service_account_email=self._service_account_email,
+ quota_project_id=self._quota_project_id,
+ universe_domain=universe_domain,
+ )
+
+
+_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
+_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
+
+
+class IDTokenCredentials(
+ credentials.CredentialsWithQuotaProject,
+ credentials.Signing,
+ credentials.CredentialsWithTokenUri,
+):
+ """Open ID Connect ID Token-based service account credentials.
+
+ These credentials relies on the default service account of a GCE instance.
+
+ ID token can be requested from `GCE metadata server identity endpoint`_, IAM
+ token endpoint or other token endpoints you specify. If metadata server
+ identity endpoint is not used, the GCE instance must have been started with
+ a service account that has access to the IAM Cloud API.
+
+ .. _GCE metadata server identity endpoint:
+ https://cloud.google.com/compute/docs/instances/verifying-instance-identity
+ """
+
+ def __init__(
+ self,
+ request,
+ target_audience,
+ token_uri=None,
+ additional_claims=None,
+ service_account_email=None,
+ signer=None,
+ use_metadata_identity_endpoint=False,
+ quota_project_id=None,
+ ):
+ """
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token. The ID Token's ``aud`` claim
+ will be set to this string.
+ token_uri (str): The OAuth 2.0 Token URI.
+ additional_claims (Mapping[str, str]): Any additional claims for
+ the JWT assertion used in the authorization grant.
+ service_account_email (str): Optional explicit service account to
+ use to sign JWT tokens.
+ By default, this is the default GCE service account.
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ In case the signer is specified, the request argument will be
+ ignored.
+ use_metadata_identity_endpoint (bool): Whether to use GCE metadata
+ identity endpoint. For backward compatibility the default value
+ is False. If set to True, ``token_uri``, ``additional_claims``,
+ ``service_account_email``, ``signer`` argument should not be set;
+ otherwise ValueError will be raised.
+ quota_project_id (Optional[str]): The project ID used for quota and
+ billing.
+
+ Raises:
+ ValueError:
+ If ``use_metadata_identity_endpoint`` is set to True, and one of
+ ``token_uri``, ``additional_claims``, ``service_account_email``,
+ ``signer`` arguments is set.
+ """
+ super(IDTokenCredentials, self).__init__()
+
+ self._quota_project_id = quota_project_id
+ self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
+ self._target_audience = target_audience
+
+ if use_metadata_identity_endpoint:
+ if token_uri or additional_claims or service_account_email or signer:
+ raise exceptions.MalformedError(
+ "If use_metadata_identity_endpoint is set, token_uri, "
+ "additional_claims, service_account_email, signer arguments"
+ " must not be set"
+ )
+ self._token_uri = None
+ self._additional_claims = None
+ self._signer = None
+
+ if service_account_email is None:
+ sa_info = _metadata.get_service_account_info(request)
+ self._service_account_email = sa_info["email"]
+ else:
+ self._service_account_email = service_account_email
+
+ if not use_metadata_identity_endpoint:
+ if signer is None:
+ signer = iam.Signer(
+ request=request,
+ credentials=Credentials(),
+ service_account_email=self._service_account_email,
+ )
+ self._signer = signer
+ self._token_uri = token_uri or _DEFAULT_TOKEN_URI
+
+ if additional_claims is not None:
+ self._additional_claims = additional_claims
+ else:
+ self._additional_claims = {}
+
+ def with_target_audience(self, target_audience):
+ """Create a copy of these credentials with the specified target
+ audience.
+ Args:
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token.
+ Returns:
+ google.auth.service_account.IDTokenCredentials: A new credentials
+ instance.
+ """
+ # since the signer is already instantiated,
+ # the request is not needed
+ if self._use_metadata_identity_endpoint:
+ return self.__class__(
+ None,
+ target_audience=target_audience,
+ use_metadata_identity_endpoint=True,
+ quota_project_id=self._quota_project_id,
+ )
+ else:
+ return self.__class__(
+ None,
+ service_account_email=self._service_account_email,
+ token_uri=self._token_uri,
+ target_audience=target_audience,
+ additional_claims=self._additional_claims.copy(),
+ signer=self.signer,
+ use_metadata_identity_endpoint=False,
+ quota_project_id=self._quota_project_id,
+ )
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+
+ # since the signer is already instantiated,
+ # the request is not needed
+ if self._use_metadata_identity_endpoint:
+ return self.__class__(
+ None,
+ target_audience=self._target_audience,
+ use_metadata_identity_endpoint=True,
+ quota_project_id=quota_project_id,
+ )
+ else:
+ return self.__class__(
+ None,
+ service_account_email=self._service_account_email,
+ token_uri=self._token_uri,
+ target_audience=self._target_audience,
+ additional_claims=self._additional_claims.copy(),
+ signer=self.signer,
+ use_metadata_identity_endpoint=False,
+ quota_project_id=quota_project_id,
+ )
+
+ @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
+ def with_token_uri(self, token_uri):
+
+ # since the signer is already instantiated,
+ # the request is not needed
+ if self._use_metadata_identity_endpoint:
+ raise exceptions.MalformedError(
+ "If use_metadata_identity_endpoint is set, token_uri" " must not be set"
+ )
+ else:
+ return self.__class__(
+ None,
+ service_account_email=self._service_account_email,
+ token_uri=token_uri,
+ target_audience=self._target_audience,
+ additional_claims=self._additional_claims.copy(),
+ signer=self.signer,
+ use_metadata_identity_endpoint=False,
+ quota_project_id=self.quota_project_id,
+ )
+
+ def _make_authorization_grant_assertion(self):
+ """Create the OAuth 2.0 assertion.
+ This assertion is used during the OAuth 2.0 grant to acquire an
+ ID token.
+ Returns:
+ bytes: The authorization grant assertion.
+ """
+ now = _helpers.utcnow()
+ lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
+ expiry = now + lifetime
+
+ payload = {
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(expiry),
+ # The issuer must be the service account email.
+ "iss": self.service_account_email,
+ # The audience must be the auth token endpoint's URI
+ "aud": self._token_uri,
+ # The target audience specifies which service the ID token is
+ # intended for.
+ "target_audience": self._target_audience,
+ }
+
+ payload.update(self._additional_claims)
+
+ token = jwt.encode(self._signer, payload)
+
+ return token
+
+ def _call_metadata_identity_endpoint(self, request):
+ """Request ID token from metadata identity endpoint.
+
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+
+ Returns:
+ Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the Compute Engine metadata
+ service can't be reached or if the instance has no credentials.
+ ValueError: If extracting expiry from the obtained ID token fails.
+ """
+ try:
+ path = "instance/service-accounts/default/identity"
+ params = {"audience": self._target_audience, "format": "full"}
+ metrics_header = {
+ metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds()
+ }
+ id_token = _metadata.get(
+ request, path, params=params, headers=metrics_header
+ )
+ except exceptions.TransportError as caught_exc:
+ new_exc = exceptions.RefreshError(caught_exc)
+ raise new_exc from caught_exc
+
+ _, payload, _, _ = jwt._unverified_decode(id_token)
+ return id_token, datetime.datetime.utcfromtimestamp(payload["exp"])
+
+ def refresh(self, request):
+ """Refreshes the ID token.
+
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the credentials could
+ not be refreshed.
+ ValueError: If extracting expiry from the obtained ID token fails.
+ """
+ if self._use_metadata_identity_endpoint:
+ self.token, self.expiry = self._call_metadata_identity_endpoint(request)
+ else:
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = _client.id_token_jwt_grant(
+ request, self._token_uri, assertion
+ )
+ self.token = access_token
+ self.expiry = expiry
+
+ @property # type: ignore
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer(self):
+ return self._signer
+
+ def sign_bytes(self, message):
+ """Signs the given message.
+
+ Args:
+ message (bytes): The message to sign.
+
+ Returns:
+ bytes: The message's cryptographic signature.
+
+ Raises:
+ ValueError:
+ Signer is not available if metadata identity endpoint is used.
+ """
+ if self._use_metadata_identity_endpoint:
+ raise exceptions.InvalidOperation(
+ "Signer is not available if metadata identity endpoint is used"
+ )
+ return self._signer.sign(message)
+
+ @property
+ def service_account_email(self):
+ """The service account email."""
+ return self._service_account_email
+
+ @property
+ def signer_email(self):
+ return self._service_account_email