aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py380
1 files changed, 380 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py b/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py
new file mode 100644
index 00000000..4d0c3c68
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py
@@ -0,0 +1,380 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""External Account Authorized User Credentials.
+This module provides credentials based on OAuth 2.0 access and refresh tokens.
+These credentials usually access resources on behalf of a user (resource
+owner).
+
+Specifically, these are sourced using external identities via Workforce Identity Federation.
+
+Obtaining the initial access and refresh token can be done through the Google Cloud CLI.
+
+Example credential:
+{
+ "type": "external_account_authorized_user",
+ "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
+ "refresh_token": "refreshToken",
+ "token_url": "https://sts.googleapis.com/v1/oauth/token",
+ "token_info_url": "https://sts.googleapis.com/v1/instrospect",
+ "client_id": "clientId",
+ "client_secret": "clientSecret"
+}
+"""
+
+import datetime
+import io
+import json
+
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.oauth2 import sts
+from google.oauth2 import utils
+
+_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user"
+
+
+class Credentials(
+ credentials.CredentialsWithQuotaProject,
+ credentials.ReadOnlyScoped,
+ credentials.CredentialsWithTokenUri,
+):
+ """Credentials for External Account Authorized Users.
+
+ This is used to instantiate Credentials for exchanging refresh tokens from
+ authorized users for Google access token and authorizing requests to Google
+ APIs.
+
+ The credentials are considered immutable. If you want to modify the
+ quota project, use `with_quota_project` and if you want to modify the token
+ uri, use `with_token_uri`.
+ """
+
+ def __init__(
+ self,
+ token=None,
+ expiry=None,
+ refresh_token=None,
+ audience=None,
+ client_id=None,
+ client_secret=None,
+ token_url=None,
+ token_info_url=None,
+ revoke_url=None,
+ scopes=None,
+ quota_project_id=None,
+ universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+ ):
+ """Instantiates a external account authorized user credentials object.
+
+ Args:
+ token (str): The OAuth 2.0 access token. Can be None if refresh information
+ is provided.
+ expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access
+ token.
+ refresh_token (str): The optional OAuth 2.0 refresh token. If specified,
+ credentials can be refreshed.
+ audience (str): The optional STS audience which contains the resource name for the workforce
+ pool and the provider identifier in that pool.
+ client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as
+ None if the token can not be refreshed.
+ client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be
+ left as None if the token can not be refreshed.
+ token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for
+ refresh, can be left as None if the token can not be refreshed.
+ token_info_url (str): The optional STS endpoint URL for token introspection.
+ revoke_url (str): The optional STS endpoint URL for revoking tokens.
+ quota_project_id (str): The optional project ID used for quota and billing.
+ This project may be different from the project used to
+ create the credentials.
+ universe_domain (Optional[str]): The universe domain. The default value
+ is googleapis.com.
+
+ Returns:
+ google.auth.external_account_authorized_user.Credentials: The
+ constructed credentials.
+ """
+ super(Credentials, self).__init__()
+
+ self.token = token
+ self.expiry = expiry
+ self._audience = audience
+ self._refresh_token = refresh_token
+ self._token_url = token_url
+ self._token_info_url = token_info_url
+ self._client_id = client_id
+ self._client_secret = client_secret
+ self._revoke_url = revoke_url
+ self._quota_project_id = quota_project_id
+ self._scopes = scopes
+ self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
+ self._cred_file_path = None
+
+ if not self.valid and not self.can_refresh:
+ raise exceptions.InvalidOperation(
+ "Token should be created with fields to make it valid (`token` and "
+ "`expiry`), or fields to allow it to refresh (`refresh_token`, "
+ "`token_url`, `client_id`, `client_secret`)."
+ )
+
+ self._client_auth = None
+ if self._client_id:
+ self._client_auth = utils.ClientAuthentication(
+ utils.ClientAuthType.basic, self._client_id, self._client_secret
+ )
+ self._sts_client = sts.Client(self._token_url, self._client_auth)
+
+ @property
+ def info(self):
+ """Generates the serializable dictionary representation of the current
+ credentials.
+
+ Returns:
+ Mapping: The dictionary representation of the credentials. This is the
+ reverse of the "from_info" method defined in this class. It is
+ useful for serializing the current credentials so it can deserialized
+ later.
+ """
+ config_info = self.constructor_args()
+ config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE)
+ if config_info["expiry"]:
+ config_info["expiry"] = config_info["expiry"].isoformat() + "Z"
+
+ return {key: value for key, value in config_info.items() if value is not None}
+
+ def constructor_args(self):
+ return {
+ "audience": self._audience,
+ "refresh_token": self._refresh_token,
+ "token_url": self._token_url,
+ "token_info_url": self._token_info_url,
+ "client_id": self._client_id,
+ "client_secret": self._client_secret,
+ "token": self.token,
+ "expiry": self.expiry,
+ "revoke_url": self._revoke_url,
+ "scopes": self._scopes,
+ "quota_project_id": self._quota_project_id,
+ "universe_domain": self._universe_domain,
+ }
+
+ @property
+ def scopes(self):
+ """Optional[str]: The OAuth 2.0 permission scopes."""
+ return self._scopes
+
+ @property
+ def requires_scopes(self):
+ """ False: OAuth 2.0 credentials have their scopes set when
+ the initial token is requested and can not be changed."""
+ return False
+
+ @property
+ def client_id(self):
+ """Optional[str]: The OAuth 2.0 client ID."""
+ return self._client_id
+
+ @property
+ def client_secret(self):
+ """Optional[str]: The OAuth 2.0 client secret."""
+ return self._client_secret
+
+ @property
+ def audience(self):
+ """Optional[str]: The STS audience which contains the resource name for the
+ workforce pool and the provider identifier in that pool."""
+ return self._audience
+
+ @property
+ def refresh_token(self):
+ """Optional[str]: The OAuth 2.0 refresh token."""
+ return self._refresh_token
+
+ @property
+ def token_url(self):
+ """Optional[str]: The STS token exchange endpoint for refresh."""
+ return self._token_url
+
+ @property
+ def token_info_url(self):
+ """Optional[str]: The STS endpoint for token info."""
+ return self._token_info_url
+
+ @property
+ def revoke_url(self):
+ """Optional[str]: The STS endpoint for token revocation."""
+ return self._revoke_url
+
+ @property
+ def is_user(self):
+ """ True: This credential always represents a user."""
+ return True
+
+ @property
+ def can_refresh(self):
+ return all(
+ (self._refresh_token, self._token_url, self._client_id, self._client_secret)
+ )
+
+ def get_project_id(self, request=None):
+ """Retrieves the project ID corresponding to the workload identity or workforce pool.
+ For workforce pool credentials, it returns the project ID corresponding to
+ the workforce_pool_user_project.
+
+ When not determinable, None is returned.
+
+ Args:
+ request (google.auth.transport.requests.Request): Request object.
+ Unused here, but passed from _default.default().
+
+ Return:
+ str: project ID is not determinable for this credential type so it returns None
+ """
+
+ return None
+
+ def to_json(self, strip=None):
+ """Utility function that creates a JSON representation of this
+ credential.
+ Args:
+ strip (Sequence[str]): Optional list of members to exclude from the
+ generated JSON.
+ Returns:
+ str: A JSON representation of this instance. When converted into
+ a dictionary, it can be passed to from_info()
+ to create a new instance.
+ """
+ strip = strip if strip else []
+ return json.dumps({k: v for (k, v) in self.info.items() if k not in strip})
+
+ def refresh(self, request):
+ """Refreshes the access token.
+
+ Args:
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the credentials could
+ not be refreshed.
+ """
+ if not self.can_refresh:
+ raise exceptions.RefreshError(
+ "The credentials do not contain the necessary fields need to "
+ "refresh the access token. You must specify refresh_token, "
+ "token_url, client_id, and client_secret."
+ )
+
+ now = _helpers.utcnow()
+ response_data = self._make_sts_request(request)
+
+ self.token = response_data.get("access_token")
+
+ lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
+ self.expiry = now + lifetime
+
+ if "refresh_token" in response_data:
+ self._refresh_token = response_data["refresh_token"]
+
+ def _make_sts_request(self, request):
+ return self._sts_client.refresh_token(request, self._refresh_token)
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def get_cred_info(self):
+ if self._cred_file_path:
+ return {
+ "credential_source": self._cred_file_path,
+ "credential_type": "external account authorized user credentials",
+ }
+ return None
+
+ def _make_copy(self):
+ kwargs = self.constructor_args()
+ cred = self.__class__(**kwargs)
+ cred._cred_file_path = self._cred_file_path
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+ def with_quota_project(self, quota_project_id):
+ cred = self._make_copy()
+ cred._quota_project_id = quota_project_id
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
+ def with_token_uri(self, token_uri):
+ cred = self._make_copy()
+ cred._token_url = token_uri
+ return cred
+
+ @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
+ def with_universe_domain(self, universe_domain):
+ cred = self._make_copy()
+ cred._universe_domain = universe_domain
+ return cred
+
+ @classmethod
+ def from_info(cls, info, **kwargs):
+ """Creates a Credentials instance from parsed external account info.
+
+ Args:
+ info (Mapping[str, str]): The external account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.external_account_authorized_user.Credentials: The
+ constructed credentials.
+
+ Raises:
+ ValueError: For invalid parameters.
+ """
+ expiry = info.get("expiry")
+ if expiry:
+ expiry = datetime.datetime.strptime(
+ expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S"
+ )
+ return cls(
+ audience=info.get("audience"),
+ refresh_token=info.get("refresh_token"),
+ token_url=info.get("token_url"),
+ token_info_url=info.get("token_info_url"),
+ client_id=info.get("client_id"),
+ client_secret=info.get("client_secret"),
+ token=info.get("token"),
+ expiry=expiry,
+ revoke_url=info.get("revoke_url"),
+ quota_project_id=info.get("quota_project_id"),
+ scopes=info.get("scopes"),
+ universe_domain=info.get(
+ "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
+ ),
+ **kwargs
+ )
+
+ @classmethod
+ def from_file(cls, filename, **kwargs):
+ """Creates a Credentials instance from an external account json file.
+
+ Args:
+ filename (str): The path to the external account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.external_account_authorized_user.Credentials: The
+ constructed credentials.
+ """
+ with io.open(filename, "r", encoding="utf-8") as json_file:
+ data = json.load(json_file)
+ return cls.from_info(data, **kwargs)