about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py380
1 files changed, 380 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py b/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py
new file mode 100644
index 00000000..4d0c3c68
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/external_account_authorized_user.py
@@ -0,0 +1,380 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""External Account Authorized User Credentials.
+This module provides credentials based on OAuth 2.0 access and refresh tokens.
+These credentials usually access resources on behalf of a user (resource
+owner).
+
+Specifically, these are sourced using external identities via Workforce Identity Federation.
+
+Obtaining the initial access and refresh token can be done through the Google Cloud CLI.
+
+Example credential:
+{
+  "type": "external_account_authorized_user",
+  "audience": "//iam.googleapis.com/locations/global/workforcePools/$WORKFORCE_POOL_ID/providers/$PROVIDER_ID",
+  "refresh_token": "refreshToken",
+  "token_url": "https://sts.googleapis.com/v1/oauth/token",
+  "token_info_url": "https://sts.googleapis.com/v1/instrospect",
+  "client_id": "clientId",
+  "client_secret": "clientSecret"
+}
+"""
+
+import datetime
+import io
+import json
+
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.oauth2 import sts
+from google.oauth2 import utils
+
+_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE = "external_account_authorized_user"
+
+
+class Credentials(
+    credentials.CredentialsWithQuotaProject,
+    credentials.ReadOnlyScoped,
+    credentials.CredentialsWithTokenUri,
+):
+    """Credentials for External Account Authorized Users.
+
+    This is used to instantiate Credentials for exchanging refresh tokens from
+    authorized users for Google access token and authorizing requests to Google
+    APIs.
+
+    The credentials are considered immutable. If you want to modify the
+    quota project, use `with_quota_project` and if you want to modify the token
+    uri, use `with_token_uri`.
+    """
+
+    def __init__(
+        self,
+        token=None,
+        expiry=None,
+        refresh_token=None,
+        audience=None,
+        client_id=None,
+        client_secret=None,
+        token_url=None,
+        token_info_url=None,
+        revoke_url=None,
+        scopes=None,
+        quota_project_id=None,
+        universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN,
+    ):
+        """Instantiates a external account authorized user credentials object.
+
+        Args:
+        token (str): The OAuth 2.0 access token. Can be None if refresh information
+            is provided.
+        expiry (datetime.datetime): The optional expiration datetime of the OAuth 2.0 access
+            token.
+        refresh_token (str): The optional OAuth 2.0 refresh token. If specified,
+            credentials can be refreshed.
+        audience (str): The optional STS audience which contains the resource name for the workforce
+            pool and the provider identifier in that pool.
+        client_id (str): The OAuth 2.0 client ID. Must be specified for refresh, can be left as
+            None if the token can not be refreshed.
+        client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be
+            left as None if the token can not be refreshed.
+        token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for
+            refresh, can be left as None if the token can not be refreshed.
+        token_info_url (str): The optional STS endpoint URL for token introspection.
+        revoke_url (str): The optional STS endpoint URL for revoking tokens.
+        quota_project_id (str): The optional project ID used for quota and billing.
+            This project may be different from the project used to
+            create the credentials.
+        universe_domain (Optional[str]): The universe domain. The default value
+            is googleapis.com.
+
+        Returns:
+            google.auth.external_account_authorized_user.Credentials: The
+                constructed credentials.
+        """
+        super(Credentials, self).__init__()
+
+        self.token = token
+        self.expiry = expiry
+        self._audience = audience
+        self._refresh_token = refresh_token
+        self._token_url = token_url
+        self._token_info_url = token_info_url
+        self._client_id = client_id
+        self._client_secret = client_secret
+        self._revoke_url = revoke_url
+        self._quota_project_id = quota_project_id
+        self._scopes = scopes
+        self._universe_domain = universe_domain or credentials.DEFAULT_UNIVERSE_DOMAIN
+        self._cred_file_path = None
+
+        if not self.valid and not self.can_refresh:
+            raise exceptions.InvalidOperation(
+                "Token should be created with fields to make it valid (`token` and "
+                "`expiry`), or fields to allow it to refresh (`refresh_token`, "
+                "`token_url`, `client_id`, `client_secret`)."
+            )
+
+        self._client_auth = None
+        if self._client_id:
+            self._client_auth = utils.ClientAuthentication(
+                utils.ClientAuthType.basic, self._client_id, self._client_secret
+            )
+        self._sts_client = sts.Client(self._token_url, self._client_auth)
+
+    @property
+    def info(self):
+        """Generates the serializable dictionary representation of the current
+        credentials.
+
+        Returns:
+            Mapping: The dictionary representation of the credentials. This is the
+                reverse of the "from_info" method defined in this class. It is
+                useful for serializing the current credentials so it can deserialized
+                later.
+        """
+        config_info = self.constructor_args()
+        config_info.update(type=_EXTERNAL_ACCOUNT_AUTHORIZED_USER_JSON_TYPE)
+        if config_info["expiry"]:
+            config_info["expiry"] = config_info["expiry"].isoformat() + "Z"
+
+        return {key: value for key, value in config_info.items() if value is not None}
+
+    def constructor_args(self):
+        return {
+            "audience": self._audience,
+            "refresh_token": self._refresh_token,
+            "token_url": self._token_url,
+            "token_info_url": self._token_info_url,
+            "client_id": self._client_id,
+            "client_secret": self._client_secret,
+            "token": self.token,
+            "expiry": self.expiry,
+            "revoke_url": self._revoke_url,
+            "scopes": self._scopes,
+            "quota_project_id": self._quota_project_id,
+            "universe_domain": self._universe_domain,
+        }
+
+    @property
+    def scopes(self):
+        """Optional[str]: The OAuth 2.0 permission scopes."""
+        return self._scopes
+
+    @property
+    def requires_scopes(self):
+        """ False: OAuth 2.0 credentials have their scopes set when
+        the initial token is requested and can not be changed."""
+        return False
+
+    @property
+    def client_id(self):
+        """Optional[str]: The OAuth 2.0 client ID."""
+        return self._client_id
+
+    @property
+    def client_secret(self):
+        """Optional[str]: The OAuth 2.0 client secret."""
+        return self._client_secret
+
+    @property
+    def audience(self):
+        """Optional[str]: The STS audience which contains the resource name for the
+            workforce pool and the provider identifier in that pool."""
+        return self._audience
+
+    @property
+    def refresh_token(self):
+        """Optional[str]: The OAuth 2.0 refresh token."""
+        return self._refresh_token
+
+    @property
+    def token_url(self):
+        """Optional[str]: The STS token exchange endpoint for refresh."""
+        return self._token_url
+
+    @property
+    def token_info_url(self):
+        """Optional[str]: The STS endpoint for token info."""
+        return self._token_info_url
+
+    @property
+    def revoke_url(self):
+        """Optional[str]: The STS endpoint for token revocation."""
+        return self._revoke_url
+
+    @property
+    def is_user(self):
+        """ True: This credential always represents a user."""
+        return True
+
+    @property
+    def can_refresh(self):
+        return all(
+            (self._refresh_token, self._token_url, self._client_id, self._client_secret)
+        )
+
+    def get_project_id(self, request=None):
+        """Retrieves the project ID corresponding to the workload identity or workforce pool.
+        For workforce pool credentials, it returns the project ID corresponding to
+        the workforce_pool_user_project.
+
+        When not determinable, None is returned.
+
+        Args:
+            request (google.auth.transport.requests.Request): Request object.
+                Unused here, but passed from _default.default().
+
+        Return:
+          str: project ID is not determinable for this credential type so it returns None
+        """
+
+        return None
+
+    def to_json(self, strip=None):
+        """Utility function that creates a JSON representation of this
+        credential.
+        Args:
+            strip (Sequence[str]): Optional list of members to exclude from the
+                                   generated JSON.
+        Returns:
+            str: A JSON representation of this instance. When converted into
+            a dictionary, it can be passed to from_info()
+            to create a new instance.
+        """
+        strip = strip if strip else []
+        return json.dumps({k: v for (k, v) in self.info.items() if k not in strip})
+
+    def refresh(self, request):
+        """Refreshes the access token.
+
+        Args:
+            request (google.auth.transport.Request): The object used to make
+                HTTP requests.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If the credentials could
+                not be refreshed.
+        """
+        if not self.can_refresh:
+            raise exceptions.RefreshError(
+                "The credentials do not contain the necessary fields need to "
+                "refresh the access token. You must specify refresh_token, "
+                "token_url, client_id, and client_secret."
+            )
+
+        now = _helpers.utcnow()
+        response_data = self._make_sts_request(request)
+
+        self.token = response_data.get("access_token")
+
+        lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
+        self.expiry = now + lifetime
+
+        if "refresh_token" in response_data:
+            self._refresh_token = response_data["refresh_token"]
+
+    def _make_sts_request(self, request):
+        return self._sts_client.refresh_token(request, self._refresh_token)
+
+    @_helpers.copy_docstring(credentials.Credentials)
+    def get_cred_info(self):
+        if self._cred_file_path:
+            return {
+                "credential_source": self._cred_file_path,
+                "credential_type": "external account authorized user credentials",
+            }
+        return None
+
+    def _make_copy(self):
+        kwargs = self.constructor_args()
+        cred = self.__class__(**kwargs)
+        cred._cred_file_path = self._cred_file_path
+        return cred
+
+    @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+    def with_quota_project(self, quota_project_id):
+        cred = self._make_copy()
+        cred._quota_project_id = quota_project_id
+        return cred
+
+    @_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
+    def with_token_uri(self, token_uri):
+        cred = self._make_copy()
+        cred._token_url = token_uri
+        return cred
+
+    @_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
+    def with_universe_domain(self, universe_domain):
+        cred = self._make_copy()
+        cred._universe_domain = universe_domain
+        return cred
+
+    @classmethod
+    def from_info(cls, info, **kwargs):
+        """Creates a Credentials instance from parsed external account info.
+
+        Args:
+            info (Mapping[str, str]): The external account info in Google
+                format.
+            kwargs: Additional arguments to pass to the constructor.
+
+        Returns:
+            google.auth.external_account_authorized_user.Credentials: The
+                constructed credentials.
+
+        Raises:
+            ValueError: For invalid parameters.
+        """
+        expiry = info.get("expiry")
+        if expiry:
+            expiry = datetime.datetime.strptime(
+                expiry.rstrip("Z").split(".")[0], "%Y-%m-%dT%H:%M:%S"
+            )
+        return cls(
+            audience=info.get("audience"),
+            refresh_token=info.get("refresh_token"),
+            token_url=info.get("token_url"),
+            token_info_url=info.get("token_info_url"),
+            client_id=info.get("client_id"),
+            client_secret=info.get("client_secret"),
+            token=info.get("token"),
+            expiry=expiry,
+            revoke_url=info.get("revoke_url"),
+            quota_project_id=info.get("quota_project_id"),
+            scopes=info.get("scopes"),
+            universe_domain=info.get(
+                "universe_domain", credentials.DEFAULT_UNIVERSE_DOMAIN
+            ),
+            **kwargs
+        )
+
+    @classmethod
+    def from_file(cls, filename, **kwargs):
+        """Creates a Credentials instance from an external account json file.
+
+        Args:
+            filename (str): The path to the external account json file.
+            kwargs: Additional arguments to pass to the constructor.
+
+        Returns:
+            google.auth.external_account_authorized_user.Credentials: The
+                constructed credentials.
+        """
+        with io.open(filename, "r", encoding="utf-8") as json_file:
+            data = json.load(json_file)
+            return cls.from_info(data, **kwargs)