aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/auth/external_account.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/auth/external_account.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/auth/external_account.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/external_account.py628
1 files changed, 628 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/auth/external_account.py b/.venv/lib/python3.12/site-packages/google/auth/external_account.py
new file mode 100644
index 00000000..161e6c50
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/external_account.py
@@ -0,0 +1,628 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""External Account Credentials.
+
+This module provides credentials that exchange workload identity pool external
+credentials for Google access tokens. This facilitates accessing Google Cloud
+Platform resources from on-prem and non-Google Cloud platforms (e.g. AWS,
+Microsoft Azure, OIDC identity providers), using native credentials retrieved
+from the current environment without the need to copy, save and manage
+long-lived service account credentials.
+
+Specifically, this is intended to use access tokens acquired using the GCP STS
+token exchange endpoint following the `OAuth 2.0 Token Exchange`_ spec.
+
+.. _OAuth 2.0 Token Exchange: https://tools.ietf.org/html/rfc8693
+"""
+
+import abc
+import copy
+from dataclasses import dataclass
+import datetime
+import functools
+import io
+import json
+import re
+
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import impersonated_credentials
+from google.auth import metrics
+from google.oauth2 import sts
+from google.oauth2 import utils
+
+# External account JSON type identifier.
+_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account"
+# The token exchange grant_type used for exchanging credentials.
+_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
+# The token exchange requested_token_type. This is always an access_token.
+_STS_REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+# Cloud resource manager URL used to retrieve project information.
+_CLOUD_RESOURCE_MANAGER = "https://cloudresourcemanager.googleapis.com/v1/projects/"
+# Default Google sts token url.
+_DEFAULT_TOKEN_URL = "https://sts.{universe_domain}/v1/token"
+
+
+@dataclass
+class SupplierContext:
+ """A context class that contains information about the requested third party credential that is passed
+ to AWS security credential and subject token suppliers.
+
+ Attributes:
+ subject_token_type (str): The requested subject token type based on the Oauth2.0 token exchange spec.
+ Expected values include::
+
+ “urn:ietf:params:oauth:token-type:jwt”
+ “urn:ietf:params:oauth:token-type:id-token”
+ “urn:ietf:params:oauth:token-type:saml2”
+ “urn:ietf:params:aws:token-type:aws4_request”
+
+ audience (str): The requested audience for the subject token.
+ """
+
+ subject_token_type: str
+ audience: str
+
+
+class Credentials(
+ credentials.Scoped,
+ credentials.CredentialsWithQuotaProject,
+ credentials.CredentialsWithTokenUri,
+ metaclass=abc.ABCMeta,
+):
+ """Base class for all external account credentials.
+
+ This is used to instantiate Credentials for exchanging external account
+ credentials for Google access token and authorizing requests to Google APIs.
+ The base class implements the common logic for exchanging external account
+ credentials for Google access tokens.
+ """
+
+ def __init__(
+ self,
+ audience,
+ subject_token_type,
+ token_url,
+ credential_source,
+ service_account_impersonation_url=None,
+ service_account_impersonation_options=None,
+ client_id=None,
+ client_secret=None,
+ token_info_url=None,
+ quota_project_id=None,
+ scopes=None,
+ default_scopes=None,
+ workforce_pool_user_project=None,
+ universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+ trust_boundary=None,
+ ):
+ """Instantiates an external account credentials object.
+
+ Args:
+ audience (str): The STS audience field.
+ subject_token_type (str): The subject token type based on the Oauth2.0 token exchange spec.
+ Expected values include::
+
+ “urn:ietf:params:oauth:token-type:jwt”
+ “urn:ietf:params:oauth:token-type:id-token”
+ “urn:ietf:params:oauth:token-type:saml2”
+ “urn:ietf:params:aws:token-type:aws4_request”
+
+ token_url (str): The STS endpoint URL.
+ credential_source (Mapping): The credential source dictionary.
+ service_account_impersonation_url (Optional[str]): The optional service account
+ impersonation generateAccessToken URL.
+ client_id (Optional[str]): The optional client ID.
+ client_secret (Optional[str]): The optional client secret.
+ token_info_url (str): The optional STS endpoint URL for token introspection.
+ quota_project_id (Optional[str]): The optional quota project ID.
+ scopes (Optional[Sequence[str]]): Optional scopes to request during the
+ authorization grant.
+ default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+ Google client library. Use 'scopes' for user-defined scopes.
+ workforce_pool_user_project (Optona[str]): The optional workforce pool user
+ project number when the credential corresponds to a workforce pool and not
+ a workload identity pool. The underlying principal must still have
+ serviceusage.services.use IAM permission to use the project for
+ billing/quota.
+ universe_domain (str): The universe domain. The default universe
+ domain is googleapis.com.
+ trust_boundary (str): String representation of trust boundary meta.
+ Raises:
+ google.auth.exceptions.RefreshError: If the generateAccessToken
+ endpoint returned an error.
+ """
+ super(Credentials, self).__init__()
+ self._audience = audience
+ self._subject_token_type = subject_token_type
+ self._universe_domain = universe_domain
+ self._token_url = token_url
+ if self._token_url == _DEFAULT_TOKEN_URL:
+ self._token_url = self._token_url.replace(
+ "{universe_domain}", self._universe_domain
+ )
+ self._token_info_url = token_info_url
+ self._credential_source = credential_source
+ self._service_account_impersonation_url = service_account_impersonation_url
+ self._service_account_impersonation_options = (
+ service_account_impersonation_options or {}
+ )
+ self._client_id = client_id
+ self._client_secret = client_secret
+ self._quota_project_id = quota_project_id
+ self._scopes = scopes
+ self._default_scopes = default_scopes
+ self._workforce_pool_user_project = workforce_pool_user_project
+ self._trust_boundary = {
+ "locations": [],
+ "encoded_locations": "0x0",
+ } # expose a placeholder trust boundary value.
+
+ if self._client_id:
+ self._client_auth = utils.ClientAuthentication(
+ utils.ClientAuthType.basic, self._client_id, self._client_secret
+ )
+ else:
+ self._client_auth = None
+ self._sts_client = sts.Client(self._token_url, self._client_auth)
+
+ self._metrics_options = self._create_default_metrics_options()
+
+ self._impersonated_credentials = None
+ self._project_id = None
+ self._supplier_context = SupplierContext(
+ self._subject_token_type, self._audience
+ )
+ self._cred_file_path = None
+
+ if not self.is_workforce_pool and self._workforce_pool_user_project:
+ # Workload identity pools do not support workforce pool user projects.
+ raise exceptions.InvalidValue(
+ "workforce_pool_user_project should not be set for non-workforce pool "
+ "credentials"
+ )
+
+ @property
+ def info(self):
+ """Generates the dictionary representation of the current credentials.
+
+ Returns:
+ Mapping: The dictionary representation of the credentials. This is the
+ reverse of "from_info" defined on the subclasses of this class. It is
+ useful for serializing the current credentials so it can deserialized
+ later.
+ """
+ config_info = self._constructor_args()
+ config_info.update(
+ type=_EXTERNAL_ACCOUNT_JSON_TYPE,
+ service_account_impersonation=config_info.pop(
+ "service_account_impersonation_options", None
+ ),
+ )
+ config_info.pop("scopes", None)
+ config_info.pop("default_scopes", None)
+ return {key: value for key, value in config_info.items() if value is not None}
+
+ def _constructor_args(self):
+ args = {
+ "audience": self._audience,
+ "subject_token_type": self._subject_token_type,
+ "token_url": self._token_url,
+ "token_info_url": self._token_info_url,
+ "service_account_impersonation_url": self._service_account_impersonation_url,
+ "service_account_impersonation_options": copy.deepcopy(
+ self._service_account_impersonation_options
+ )
+ or None,
+ "credential_source": copy.deepcopy(self._credential_source),
+ "quota_project_id": self._quota_project_id,
+ "client_id": self._client_id,
+ "client_secret": self._client_secret,
+ "workforce_pool_user_project": self._workforce_pool_user_project,
+ "scopes": self._scopes,
+ "default_scopes": self._default_scopes,
+ "universe_domain": self._universe_domain,
+ }
+ if not self.is_workforce_pool:
+ args.pop("workforce_pool_user_project")
+ return args
+
+ @property
+ def service_account_email(self):
+ """Returns the service account email if service account impersonation is used.
+
+ Returns:
+ Optional[str]: The service account email if impersonation is used. Otherwise
+ None is returned.
+ """
+ if self._service_account_impersonation_url:
+ # Parse email from URL. The formal looks as follows:
+ # https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken
+ url = self._service_account_impersonation_url
+ start_index = url.rfind("/")
+ end_index = url.find(":generateAccessToken")
+ if start_index != -1 and end_index != -1 and start_index < end_index:
+ start_index = start_index + 1
+ return url[start_index:end_index]
+ return None
+
+ @property
+ def is_user(self):
+ """Returns whether the credentials represent a user (True) or workload (False).
+ Workloads behave similarly to service accounts. Currently workloads will use
+ service account impersonation but will eventually not require impersonation.
+ As a result, this property is more reliable than the service account email
+ property in determining if the credentials represent a user or workload.
+
+ Returns:
+ bool: True if the credentials represent a user. False if they represent a
+ workload.
+ """
+ # If service account impersonation is used, the credentials will always represent a
+ # service account.
+ if self._service_account_impersonation_url:
+ return False
+ return self.is_workforce_pool
+
+ @property
+ def is_workforce_pool(self):
+ """Returns whether the credentials represent a workforce pool (True) or
+ workload (False) based on the credentials' audience.
+
+ This will also return True for impersonated workforce pool credentials.
+
+ Returns:
+ bool: True if the credentials represent a workforce pool. False if they
+ represent a workload.
+ """
+ # Workforce pools representing users have the following audience format:
+ # //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId
+ p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/")
+ return p.match(self._audience or "") is not None
+
+ @property
+ def requires_scopes(self):
+ """Checks if the credentials requires scopes.
+
+ Returns:
+ bool: True if there are no scopes set otherwise False.
+ """
+ return not self._scopes and not self._default_scopes
+
+ @property
+ def project_number(self):
+ """Optional[str]: The project number corresponding to the workload identity pool."""
+
+ # STS audience pattern:
+ # //iam.googleapis.com/projects/$PROJECT_NUMBER/locations/...
+ components = self._audience.split("/")
+ try:
+ project_index = components.index("projects")
+ if project_index + 1 < len(components):
+ return components[project_index + 1] or None
+ except ValueError:
+ return None
+
+ @property
+ def token_info_url(self):
+ """Optional[str]: The STS token introspection endpoint."""
+
+ return self._token_info_url
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def get_cred_info(self):
+ if self._cred_file_path:
+ cred_info_json = {
+ "credential_source": self._cred_file_path,
+ "credential_type": "external account credentials",
+ }
+ if self.service_account_email:
+ cred_info_json["principal"] = self.service_account_email
+ return cred_info_json
+ return None
+
+ @_helpers.copy_docstring(credentials.Scoped)
+ def with_scopes(self, scopes, default_scopes=None):
+ kwargs = self._constructor_args()
+ kwargs.update(scopes=scopes, default_scopes=default_scopes)
+ scoped = self.__class__(**kwargs)
+ scoped._cred_file_path = self._cred_file_path
+ scoped._metrics_options = self._metrics_options
+ return scoped
+
+ @abc.abstractmethod
+ def retrieve_subject_token(self, request):
+ """Retrieves the subject token using the credential_source object.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ Returns:
+ str: The retrieved subject token.
+ """
+ # pylint: disable=missing-raises-doc
+ # (pylint doesn't recognize that this is abstract)
+ raise NotImplementedError("retrieve_subject_token must be implemented")
+
+ def get_project_id(self, request):
+ """Retrieves the project ID corresponding to the workload identity or workforce pool.
+ For workforce pool credentials, it returns the project ID corresponding to
+ the workforce_pool_user_project.
+
+ When not determinable, None is returned.
+
+ This is introduced to support the current pattern of using the Auth library:
+
+ credentials, project_id = google.auth.default()
+
+ The resource may not have permission (resourcemanager.projects.get) to
+ call this API or the required scopes may not be selected:
+ https://cloud.google.com/resource-manager/reference/rest/v1/projects/get#authorization-scopes
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ Returns:
+ Optional[str]: The project ID corresponding to the workload identity pool
+ or workforce pool if determinable.
+ """
+ if self._project_id:
+ # If already retrieved, return the cached project ID value.
+ return self._project_id
+ scopes = self._scopes if self._scopes is not None else self._default_scopes
+ # Scopes are required in order to retrieve a valid access token.
+ project_number = self.project_number or self._workforce_pool_user_project
+ if project_number and scopes:
+ headers = {}
+ url = _CLOUD_RESOURCE_MANAGER + project_number
+ self.before_request(request, "GET", url, headers)
+ response = request(url=url, method="GET", headers=headers)
+
+ response_body = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+ response_data = json.loads(response_body)
+
+ if response.status == 200:
+ # Cache result as this field is immutable.
+ self._project_id = response_data.get("projectId")
+ return self._project_id
+
+ return None
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ scopes = self._scopes if self._scopes is not None else self._default_scopes
+
+ # Inject client certificate into request.
+ if self._mtls_required():
+ request = functools.partial(
+ request, cert=self._get_mtls_cert_and_key_paths()
+ )
+
+ if self._should_initialize_impersonated_credentials():
+ self._impersonated_credentials = self._initialize_impersonated_credentials()
+
+ if self._impersonated_credentials:
+ self._impersonated_credentials.refresh(request)
+ self.token = self._impersonated_credentials.token
+ self.expiry = self._impersonated_credentials.expiry
+ else:
+ now = _helpers.utcnow()
+ additional_options = None
+ # Do not pass workforce_pool_user_project when client authentication
+ # is used. The client ID is sufficient for determining the user project.
+ if self._workforce_pool_user_project and not self._client_id:
+ additional_options = {"userProject": self._workforce_pool_user_project}
+ additional_headers = {
+ metrics.API_CLIENT_HEADER: metrics.byoid_metrics_header(
+ self._metrics_options
+ )
+ }
+ response_data = self._sts_client.exchange_token(
+ request=request,
+ grant_type=_STS_GRANT_TYPE,
+ subject_token=self.retrieve_subject_token(request),
+ subject_token_type=self._subject_token_type,
+ audience=self._audience,
+ scopes=scopes,
+ requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
+ additional_options=additional_options,
+ additional_headers=additional_headers,
+ )
+ self.token = response_data.get("access_token")
+ expires_in = response_data.get("expires_in")
+ # Some services do not respect the OAUTH2.0 RFC and send expires_in as a
+ # JSON String.
+ if isinstance(expires_in, str):
+ expires_in = int(expires_in)
+
+ lifetime = datetime.timedelta(seconds=expires_in)
+
+ self.expiry = now + lifetime
+
+ def _make_copy(self):
+ kwargs = self._constructor_args()
+ new_cred = self.__class__(**kwargs)
+ new_cred._cred_file_path = self._cred_file_path
+ new_cred._metrics_options = self._metrics_options
+ return new_cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ # Return copy of instance with the provided quota project ID.
+ cred = self._make_copy()
+ cred._quota_project_id = quota_project_id
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
+ def with_token_uri(self, token_uri):
+ cred = self._make_copy()
+ cred._token_url = token_uri
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
+ def with_universe_domain(self, universe_domain):
+ cred = self._make_copy()
+ cred._universe_domain = universe_domain
+ return cred
+
+ def _should_initialize_impersonated_credentials(self):
+ return (
+ self._service_account_impersonation_url is not None
+ and self._impersonated_credentials is None
+ )
+
+ def _initialize_impersonated_credentials(self):
+ """Generates an impersonated credentials.
+
+ For more details, see `projects.serviceAccounts.generateAccessToken`_.
+
+ .. _projects.serviceAccounts.generateAccessToken: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/generateAccessToken
+
+ Returns:
+ impersonated_credentials.Credential: The impersonated credentials
+ object.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the generateAccessToken
+ endpoint returned an error.
+ """
+ # Return copy of instance with no service account impersonation.
+ kwargs = self._constructor_args()
+ kwargs.update(
+ service_account_impersonation_url=None,
+ service_account_impersonation_options={},
+ )
+ source_credentials = self.__class__(**kwargs)
+ source_credentials._metrics_options = self._metrics_options
+
+ # Determine target_principal.
+ target_principal = self.service_account_email
+ if not target_principal:
+ raise exceptions.RefreshError(
+ "Unable to determine target principal from service account impersonation URL."
+ )
+
+ scopes = self._scopes if self._scopes is not None else self._default_scopes
+ # Initialize and return impersonated credentials.
+ return impersonated_credentials.Credentials(
+ source_credentials=source_credentials,
+ target_principal=target_principal,
+ target_scopes=scopes,
+ quota_project_id=self._quota_project_id,
+ iam_endpoint_override=self._service_account_impersonation_url,
+ lifetime=self._service_account_impersonation_options.get(
+ "token_lifetime_seconds"
+ ),
+ )
+
+ def _create_default_metrics_options(self):
+ metrics_options = {}
+ if self._service_account_impersonation_url:
+ metrics_options["sa-impersonation"] = "true"
+ else:
+ metrics_options["sa-impersonation"] = "false"
+ if self._service_account_impersonation_options.get("token_lifetime_seconds"):
+ metrics_options["config-lifetime"] = "true"
+ else:
+ metrics_options["config-lifetime"] = "false"
+
+ return metrics_options
+
+ def _mtls_required(self):
+ """Returns a boolean representing whether the current credential is configured
+ for mTLS and should add a certificate to the outgoing calls to the sts and service
+ account impersonation endpoint.
+
+ Returns:
+ bool: True if the credential is configured for mTLS, False if it is not.
+ """
+ return False
+
+ def _get_mtls_cert_and_key_paths(self):
+ """Gets the file locations for a certificate and private key file
+ to be used for configuring mTLS for the sts and service account
+ impersonation calls. Currently only expected to return a value when using
+ X509 workload identity federation.
+
+ Returns:
+ Tuple[str, str]: The cert and key file locations as strings in a tuple.
+
+ Raises:
+ NotImplementedError: When the current credential is not configured for
+ mTLS.
+ """
+ raise NotImplementedError(
+ "_get_mtls_cert_and_key_location must be implemented."
+ )
+
+ @classmethod
+ def from_info(cls, info, **kwargs):
+ """Creates a Credentials instance from parsed external account info.
+
+ Args:
+ info (Mapping[str, str]): The external account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.identity_pool.Credentials: The constructed
+ credentials.
+
+ Raises:
+ InvalidValue: For invalid parameters.
+ """
+ return cls(
+ audience=info.get("audience"),
+ subject_token_type=info.get("subject_token_type"),
+ token_url=info.get("token_url"),
+ token_info_url=info.get("token_info_url"),
+ service_account_impersonation_url=info.get(
+ "service_account_impersonation_url"
+ ),
+ service_account_impersonation_options=info.get(
+ "service_account_impersonation"
+ )
+ or {},
+ client_id=info.get("client_id"),
+ client_secret=info.get("client_secret"),
+ credential_source=info.get("credential_source"),
+ quota_project_id=info.get("quota_project_id"),
+ workforce_pool_user_project=info.get("workforce_pool_user_project"),
+ universe_domain=info.get(
+ "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
+ ),
+ **kwargs
+ )
+
+ @classmethod
+ def from_file(cls, filename, **kwargs):
+ """Creates a Credentials instance from an external account json file.
+
+ Args:
+ filename (str): The path to the external account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.identity_pool.Credentials: The constructed
+ credentials.
+ """
+ with io.open(filename, "r", encoding="utf-8") as json_file:
+ data = json.load(json_file)
+ return cls.from_info(data, **kwargs)