aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/auth/impersonated_credentials.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/google/auth/impersonated_credentials.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/auth/impersonated_credentials.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/impersonated_credentials.py579
1 files changed, 579 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/auth/impersonated_credentials.py b/.venv/lib/python3.12/site-packages/google/auth/impersonated_credentials.py
new file mode 100644
index 00000000..ed7e3f00
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/impersonated_credentials.py
@@ -0,0 +1,579 @@
+# Copyright 2018 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google Cloud Impersonated credentials.
+
+This module provides authentication for applications where local credentials
+impersonates a remote service account using `IAM Credentials API`_.
+
+This class can be used to impersonate a service account as long as the original
+Credential object has the "Service Account Token Creator" role on the target
+service account.
+
+ .. _IAM Credentials API:
+ https://cloud.google.com/iam/credentials/reference/rest/
+"""
+
+import base64
+import copy
+from datetime import datetime
+import http.client as http_client
+import json
+
+from google.auth import _exponential_backoff
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import iam
+from google.auth import jwt
+from google.auth import metrics
+from google.oauth2 import _client
+
+
+_REFRESH_ERROR = "Unable to acquire impersonated credentials"
+
+_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
+
+_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
+
+
+def _make_iam_token_request(
+ request,
+ principal,
+ headers,
+ body,
+ universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+ iam_endpoint_override=None,
+):
+ """Makes a request to the Google Cloud IAM service for an access token.
+ Args:
+ request (Request): The Request object to use.
+ principal (str): The principal to request an access token for.
+ headers (Mapping[str, str]): Map of headers to transmit.
+ body (Mapping[str, str]): JSON Payload body for the iamcredentials
+ API call.
+ iam_endpoint_override (Optiona[str]): The full IAM endpoint override
+ with the target_principal embedded. This is useful when supporting
+ impersonation with regional endpoints.
+
+ Raises:
+ google.auth.exceptions.TransportError: Raised if there is an underlying
+ HTTP connection error
+ google.auth.exceptions.RefreshError: Raised if the impersonated
+ credentials are not available. Common reasons are
+ `iamcredentials.googleapis.com` is not enabled or the
+ `Service Account Token Creator` is not assigned
+ """
+ iam_endpoint = iam_endpoint_override or iam._IAM_ENDPOINT.replace(
+ credentials.DEFAULT_UNIVERSE_DOMAIN, universe_domain
+ ).format(principal)
+
+ body = json.dumps(body).encode("utf-8")
+
+ response = request(url=iam_endpoint, method="POST", headers=headers, body=body)
+
+ # support both string and bytes type response.data
+ response_body = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+
+ if response.status != http_client.OK:
+ raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
+
+ try:
+ token_response = json.loads(response_body)
+ token = token_response["accessToken"]
+ expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ")
+
+ return token, expiry
+
+ except (KeyError, ValueError) as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "{}: No access token or invalid expiration in response.".format(
+ _REFRESH_ERROR
+ ),
+ response_body,
+ )
+ raise new_exc from caught_exc
+
+
+class Credentials(
+ credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing
+):
+ """This module defines impersonated credentials which are essentially
+ impersonated identities.
+
+ Impersonated Credentials allows credentials issued to a user or
+ service account to impersonate another. The target service account must
+ grant the originating credential principal the
+ `Service Account Token Creator`_ IAM role:
+
+ For more information about Token Creator IAM role and
+ IAMCredentials API, see
+ `Creating Short-Lived Service Account Credentials`_.
+
+ .. _Service Account Token Creator:
+ https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role
+
+ .. _Creating Short-Lived Service Account Credentials:
+ https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials
+
+ Usage:
+
+ First grant source_credentials the `Service Account Token Creator`
+ role on the target account to impersonate. In this example, the
+ service account represented by svc_account.json has the
+ token creator role on
+ `impersonated-account@_project_.iam.gserviceaccount.com`.
+
+ Enable the IAMCredentials API on the source project:
+ `gcloud services enable iamcredentials.googleapis.com`.
+
+ Initialize a source credential which does not have access to
+ list bucket::
+
+ from google.oauth2 import service_account
+
+ target_scopes = [
+ 'https://www.googleapis.com/auth/devstorage.read_only']
+
+ source_credentials = (
+ service_account.Credentials.from_service_account_file(
+ '/path/to/svc_account.json',
+ scopes=target_scopes))
+
+ Now use the source credentials to acquire credentials to impersonate
+ another service account::
+
+ from google.auth import impersonated_credentials
+
+ target_credentials = impersonated_credentials.Credentials(
+ source_credentials=source_credentials,
+ target_principal='impersonated-account@_project_.iam.gserviceaccount.com',
+ target_scopes = target_scopes,
+ lifetime=500)
+
+ Resource access is granted::
+
+ client = storage.Client(credentials=target_credentials)
+ buckets = client.list_buckets(project='your_project')
+ for bucket in buckets:
+ print(bucket.name)
+ """
+
+ def __init__(
+ self,
+ source_credentials,
+ target_principal,
+ target_scopes,
+ delegates=None,
+ subject=None,
+ lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
+ quota_project_id=None,
+ iam_endpoint_override=None,
+ ):
+ """
+ Args:
+ source_credentials (google.auth.Credentials): The source credential
+ used as to acquire the impersonated credentials.
+ target_principal (str): The service account to impersonate.
+ target_scopes (Sequence[str]): Scopes to request during the
+ authorization grant.
+ delegates (Sequence[str]): The chained list of delegates required
+ to grant the final access_token. If set, the sequence of
+ identities must have "Service Account Token Creator" capability
+ granted to the prceeding identity. For example, if set to
+ [serviceAccountB, serviceAccountC], the source_credential
+ must have the Token Creator role on serviceAccountB.
+ serviceAccountB must have the Token Creator on
+ serviceAccountC.
+ Finally, C must have Token Creator on target_principal.
+ If left unset, source_credential must have that role on
+ target_principal.
+ lifetime (int): Number of seconds the delegated credential should
+ be valid for (upto 3600).
+ quota_project_id (Optional[str]): The project ID used for quota and billing.
+ This project may be different from the project used to
+ create the credentials.
+ iam_endpoint_override (Optional[str]): The full IAM endpoint override
+ with the target_principal embedded. This is useful when supporting
+ impersonation with regional endpoints.
+ subject (Optional[str]): sub field of a JWT. This field should only be set
+ if you wish to impersonate as a user. This feature is useful when
+ using domain wide delegation.
+ """
+
+ super(Credentials, self).__init__()
+
+ self._source_credentials = copy.copy(source_credentials)
+ # Service account source credentials must have the _IAM_SCOPE
+ # added to refresh correctly. User credentials cannot have
+ # their original scopes modified.
+ if isinstance(self._source_credentials, credentials.Scoped):
+ self._source_credentials = self._source_credentials.with_scopes(
+ iam._IAM_SCOPE
+ )
+ # If the source credential is service account and self signed jwt
+ # is needed, we need to create a jwt credential inside it
+ if (
+ hasattr(self._source_credentials, "_create_self_signed_jwt")
+ and self._source_credentials._always_use_jwt_access
+ ):
+ self._source_credentials._create_self_signed_jwt(None)
+
+ self._universe_domain = source_credentials.universe_domain
+ self._target_principal = target_principal
+ self._target_scopes = target_scopes
+ self._delegates = delegates
+ self._subject = subject
+ self._lifetime = lifetime or _DEFAULT_TOKEN_LIFETIME_SECS
+ self.token = None
+ self.expiry = _helpers.utcnow()
+ self._quota_project_id = quota_project_id
+ self._iam_endpoint_override = iam_endpoint_override
+ self._cred_file_path = None
+
+ def _metric_header_for_usage(self):
+ return metrics.CRED_TYPE_SA_IMPERSONATE
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ self._update_token(request)
+
+ def _update_token(self, request):
+ """Updates credentials with a new access_token representing
+ the impersonated account.
+
+ Args:
+ request (google.auth.transport.requests.Request): Request object
+ to use for refreshing credentials.
+ """
+
+ # Refresh our source credentials if it is not valid.
+ if (
+ self._source_credentials.token_state == credentials.TokenState.STALE
+ or self._source_credentials.token_state == credentials.TokenState.INVALID
+ ):
+ self._source_credentials.refresh(request)
+
+ body = {
+ "delegates": self._delegates,
+ "scope": self._target_scopes,
+ "lifetime": str(self._lifetime) + "s",
+ }
+
+ headers = {
+ "Content-Type": "application/json",
+ metrics.API_CLIENT_HEADER: metrics.token_request_access_token_impersonate(),
+ }
+
+ # Apply the source credentials authentication info.
+ self._source_credentials.apply(headers)
+
+ # If a subject is specified a domain-wide delegation auth-flow is initiated
+ # to impersonate as the provided subject (user).
+ if self._subject:
+ if self.universe_domain != credentials.DEFAULT_UNIVERSE_DOMAIN:
+ raise exceptions.GoogleAuthError(
+ "Domain-wide delegation is not supported in universes other "
+ + "than googleapis.com"
+ )
+
+ now = _helpers.utcnow()
+ payload = {
+ "iss": self._target_principal,
+ "scope": _helpers.scopes_to_string(self._target_scopes or ()),
+ "sub": self._subject,
+ "aud": _GOOGLE_OAUTH2_TOKEN_ENDPOINT,
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(now) + _DEFAULT_TOKEN_LIFETIME_SECS,
+ }
+
+ assertion = _sign_jwt_request(
+ request=request,
+ principal=self._target_principal,
+ headers=headers,
+ payload=payload,
+ delegates=self._delegates,
+ )
+
+ self.token, self.expiry, _ = _client.jwt_grant(
+ request, _GOOGLE_OAUTH2_TOKEN_ENDPOINT, assertion
+ )
+
+ return
+
+ self.token, self.expiry = _make_iam_token_request(
+ request=request,
+ principal=self._target_principal,
+ headers=headers,
+ body=body,
+ universe_domain=self.universe_domain,
+ iam_endpoint_override=self._iam_endpoint_override,
+ )
+
+ def sign_bytes(self, message):
+ from google.auth.transport.requests import AuthorizedSession
+
+ iam_sign_endpoint = iam._IAM_SIGN_ENDPOINT.replace(
+ credentials.DEFAULT_UNIVERSE_DOMAIN, self.universe_domain
+ ).format(self._target_principal)
+
+ body = {
+ "payload": base64.b64encode(message).decode("utf-8"),
+ "delegates": self._delegates,
+ }
+
+ headers = {"Content-Type": "application/json"}
+
+ authed_session = AuthorizedSession(self._source_credentials)
+
+ try:
+ retries = _exponential_backoff.ExponentialBackoff()
+ for _ in retries:
+ response = authed_session.post(
+ url=iam_sign_endpoint, headers=headers, json=body
+ )
+ if response.status_code in iam.IAM_RETRY_CODES:
+ continue
+ if response.status_code != http_client.OK:
+ raise exceptions.TransportError(
+ "Error calling sign_bytes: {}".format(response.json())
+ )
+
+ return base64.b64decode(response.json()["signedBlob"])
+ finally:
+ authed_session.close()
+ raise exceptions.TransportError("exhausted signBlob endpoint retries")
+
+ @property
+ def signer_email(self):
+ return self._target_principal
+
+ @property
+ def service_account_email(self):
+ return self._target_principal
+
+ @property
+ def signer(self):
+ return self
+
+ @property
+ def requires_scopes(self):
+ return not self._target_scopes
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def get_cred_info(self):
+ if self._cred_file_path:
+ return {
+ "credential_source": self._cred_file_path,
+ "credential_type": "impersonated credentials",
+ "principal": self._target_principal,
+ }
+ return None
+
+ def _make_copy(self):
+ cred = self.__class__(
+ self._source_credentials,
+ target_principal=self._target_principal,
+ target_scopes=self._target_scopes,
+ delegates=self._delegates,
+ lifetime=self._lifetime,
+ quota_project_id=self._quota_project_id,
+ iam_endpoint_override=self._iam_endpoint_override,
+ )
+ cred._cred_file_path = self._cred_file_path
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ cred = self._make_copy()
+ cred._quota_project_id = quota_project_id
+ return cred
+
+ @_helpers.copy_docstring(credentials.Scoped)
+ def with_scopes(self, scopes, default_scopes=None):
+ cred = self._make_copy()
+ cred._target_scopes = scopes or default_scopes
+ return cred
+
+
+class IDTokenCredentials(credentials.CredentialsWithQuotaProject):
+ """Open ID Connect ID Token-based service account credentials.
+
+ """
+
+ def __init__(
+ self,
+ target_credentials,
+ target_audience=None,
+ include_email=False,
+ quota_project_id=None,
+ ):
+ """
+ Args:
+ target_credentials (google.auth.Credentials): The target
+ credential used as to acquire the id tokens for.
+ target_audience (string): Audience to issue the token for.
+ include_email (bool): Include email in IdToken
+ quota_project_id (Optional[str]): The project ID used for
+ quota and billing.
+ """
+ super(IDTokenCredentials, self).__init__()
+
+ if not isinstance(target_credentials, Credentials):
+ raise exceptions.GoogleAuthError(
+ "Provided Credential must be " "impersonated_credentials"
+ )
+ self._target_credentials = target_credentials
+ self._target_audience = target_audience
+ self._include_email = include_email
+ self._quota_project_id = quota_project_id
+
+ def from_credentials(self, target_credentials, target_audience=None):
+ return self.__class__(
+ target_credentials=target_credentials,
+ target_audience=target_audience,
+ include_email=self._include_email,
+ quota_project_id=self._quota_project_id,
+ )
+
+ def with_target_audience(self, target_audience):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=target_audience,
+ include_email=self._include_email,
+ quota_project_id=self._quota_project_id,
+ )
+
+ def with_include_email(self, include_email):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=self._target_audience,
+ include_email=include_email,
+ quota_project_id=self._quota_project_id,
+ )
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=self._target_audience,
+ include_email=self._include_email,
+ quota_project_id=quota_project_id,
+ )
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ from google.auth.transport.requests import AuthorizedSession
+
+ iam_sign_endpoint = iam._IAM_IDTOKEN_ENDPOINT.replace(
+ credentials.DEFAULT_UNIVERSE_DOMAIN,
+ self._target_credentials.universe_domain,
+ ).format(self._target_credentials.signer_email)
+
+ body = {
+ "audience": self._target_audience,
+ "delegates": self._target_credentials._delegates,
+ "includeEmail": self._include_email,
+ }
+
+ headers = {
+ "Content-Type": "application/json",
+ metrics.API_CLIENT_HEADER: metrics.token_request_id_token_impersonate(),
+ }
+
+ authed_session = AuthorizedSession(
+ self._target_credentials._source_credentials, auth_request=request
+ )
+
+ try:
+ response = authed_session.post(
+ url=iam_sign_endpoint,
+ headers=headers,
+ data=json.dumps(body).encode("utf-8"),
+ )
+ finally:
+ authed_session.close()
+
+ if response.status_code != http_client.OK:
+ raise exceptions.RefreshError(
+ "Error getting ID token: {}".format(response.json())
+ )
+
+ id_token = response.json()["token"]
+ self.token = id_token
+ self.expiry = datetime.utcfromtimestamp(
+ jwt.decode(id_token, verify=False)["exp"]
+ )
+
+
+def _sign_jwt_request(request, principal, headers, payload, delegates=[]):
+ """Makes a request to the Google Cloud IAM service to sign a JWT using a
+ service account's system-managed private key.
+ Args:
+ request (Request): The Request object to use.
+ principal (str): The principal to request an access token for.
+ headers (Mapping[str, str]): Map of headers to transmit.
+ payload (Mapping[str, str]): The JWT payload to sign. Must be a
+ serialized JSON object that contains a JWT Claims Set.
+ delegates (Sequence[str]): The chained list of delegates required
+ to grant the final access_token. If set, the sequence of
+ identities must have "Service Account Token Creator" capability
+ granted to the prceeding identity. For example, if set to
+ [serviceAccountB, serviceAccountC], the source_credential
+ must have the Token Creator role on serviceAccountB.
+ serviceAccountB must have the Token Creator on
+ serviceAccountC.
+ Finally, C must have Token Creator on target_principal.
+ If left unset, source_credential must have that role on
+ target_principal.
+
+ Raises:
+ google.auth.exceptions.TransportError: Raised if there is an underlying
+ HTTP connection error
+ google.auth.exceptions.RefreshError: Raised if the impersonated
+ credentials are not available. Common reasons are
+ `iamcredentials.googleapis.com` is not enabled or the
+ `Service Account Token Creator` is not assigned
+ """
+ iam_endpoint = iam._IAM_SIGNJWT_ENDPOINT.format(principal)
+
+ body = {"delegates": delegates, "payload": json.dumps(payload)}
+ body = json.dumps(body).encode("utf-8")
+
+ response = request(url=iam_endpoint, method="POST", headers=headers, body=body)
+
+ # support both string and bytes type response.data
+ response_body = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+
+ if response.status != http_client.OK:
+ raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
+
+ try:
+ jwt_response = json.loads(response_body)
+ signed_jwt = jwt_response["signedJwt"]
+ return signed_jwt
+
+ except (KeyError, ValueError) as caught_exc:
+ new_exc = exceptions.RefreshError(
+ "{}: No signed JWT in response.".format(_REFRESH_ERROR), response_body
+ )
+ raise new_exc from caught_exc