aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/google/auth/identity_pool.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/google/auth/identity_pool.py')
-rw-r--r--.venv/lib/python3.12/site-packages/google/auth/identity_pool.py439
1 files changed, 439 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/google/auth/identity_pool.py b/.venv/lib/python3.12/site-packages/google/auth/identity_pool.py
new file mode 100644
index 00000000..47f9a557
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/google/auth/identity_pool.py
@@ -0,0 +1,439 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Identity Pool Credentials.
+
+This module provides credentials to access Google Cloud resources from on-prem
+or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
+tokens) retrieved from local file locations or local servers. This includes
+Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
+Hub with Hub workload identity enabled).
+
+These credentials are recommended over the use of service account credentials
+in on-prem/non-Google Cloud platforms as they do not involve the management of
+long-live service account private keys.
+
+Identity Pool Credentials are initialized using external_account
+arguments which are typically loaded from an external credentials file or
+an external credentials URL.
+
+This module also provides a definition for an abstract subject token supplier.
+This supplier can be implemented to return a valid OIDC or SAML2.0 subject token
+and used to create Identity Pool credentials. The credentials will then call the
+supplier instead of using pre-defined methods such as reading a local file or
+calling a URL.
+"""
+
+try:
+ from collections.abc import Mapping
+# Python 2.7 compatibility
+except ImportError: # pragma: NO COVER
+ from collections import Mapping # type: ignore
+import abc
+import json
+import os
+from typing import NamedTuple
+
+from google.auth import _helpers
+from google.auth import exceptions
+from google.auth import external_account
+from google.auth.transport import _mtls_helper
+
+
+class SubjectTokenSupplier(metaclass=abc.ABCMeta):
+ """Base class for subject token suppliers. This can be implemented with custom logic to retrieve
+ a subject token to exchange for a Google Cloud access token when using Workload or
+ Workforce Identity Federation. The identity pool credential does not cache the subject token,
+ so caching logic should be added in the implementation.
+ """
+
+ @abc.abstractmethod
+ def get_subject_token(self, context, request):
+ """Returns the requested subject token. The subject token must be valid.
+
+ .. warning: This is not cached by the calling Google credential, so caching logic should be implemented in the supplier.
+
+ Args:
+ context (google.auth.externalaccount.SupplierContext): The context object
+ containing information about the requested audience and subject token type.
+ request (google.auth.transport.Request): The object used to make
+ HTTP requests.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If an error is encountered during
+ subject token retrieval logic.
+
+ Returns:
+ str: The requested subject token string.
+ """
+ raise NotImplementedError("")
+
+
+class _TokenContent(NamedTuple):
+ """Models the token content response from file and url internal suppliers.
+ Attributes:
+ content (str): The string content of the file or URL response.
+ location (str): The location the content was retrieved from. This will either be a file location or a URL.
+ """
+
+ content: str
+ location: str
+
+
+class _FileSupplier(SubjectTokenSupplier):
+ """ Internal implementation of subject token supplier which supports reading a subject token from a file."""
+
+ def __init__(self, path, format_type, subject_token_field_name):
+ self._path = path
+ self._format_type = format_type
+ self._subject_token_field_name = subject_token_field_name
+
+ @_helpers.copy_docstring(SubjectTokenSupplier)
+ def get_subject_token(self, context, request):
+ if not os.path.exists(self._path):
+ raise exceptions.RefreshError("File '{}' was not found.".format(self._path))
+
+ with open(self._path, "r", encoding="utf-8") as file_obj:
+ token_content = _TokenContent(file_obj.read(), self._path)
+
+ return _parse_token_data(
+ token_content, self._format_type, self._subject_token_field_name
+ )
+
+
+class _UrlSupplier(SubjectTokenSupplier):
+ """ Internal implementation of subject token supplier which supports retrieving a subject token by calling a URL endpoint."""
+
+ def __init__(self, url, format_type, subject_token_field_name, headers):
+ self._url = url
+ self._format_type = format_type
+ self._subject_token_field_name = subject_token_field_name
+ self._headers = headers
+
+ @_helpers.copy_docstring(SubjectTokenSupplier)
+ def get_subject_token(self, context, request):
+ response = request(url=self._url, method="GET", headers=self._headers)
+
+ # support both string and bytes type response.data
+ response_body = (
+ response.data.decode("utf-8")
+ if hasattr(response.data, "decode")
+ else response.data
+ )
+
+ if response.status != 200:
+ raise exceptions.RefreshError(
+ "Unable to retrieve Identity Pool subject token", response_body
+ )
+ token_content = _TokenContent(response_body, self._url)
+ return _parse_token_data(
+ token_content, self._format_type, self._subject_token_field_name
+ )
+
+
+class _X509Supplier(SubjectTokenSupplier):
+ """Internal supplier for X509 workload credentials. This class is used internally and always returns an empty string as the subject token."""
+
+ @_helpers.copy_docstring(SubjectTokenSupplier)
+ def get_subject_token(self, context, request):
+ return ""
+
+
+def _parse_token_data(token_content, format_type="text", subject_token_field_name=None):
+ if format_type == "text":
+ token = token_content.content
+ else:
+ try:
+ # Parse file content as JSON.
+ response_data = json.loads(token_content.content)
+ # Get the subject_token.
+ token = response_data[subject_token_field_name]
+ except (KeyError, ValueError):
+ raise exceptions.RefreshError(
+ "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
+ token_content.location, subject_token_field_name
+ )
+ )
+ if not token:
+ raise exceptions.RefreshError(
+ "Missing subject_token in the credential_source file"
+ )
+ return token
+
+
+class Credentials(external_account.Credentials):
+ """External account credentials sourced from files and URLs."""
+
+ def __init__(
+ self,
+ audience,
+ subject_token_type,
+ token_url=external_account._DEFAULT_TOKEN_URL,
+ credential_source=None,
+ subject_token_supplier=None,
+ *args,
+ **kwargs
+ ):
+ """Instantiates an external account credentials object from a file/URL.
+
+ Args:
+ audience (str): The STS audience field.
+ subject_token_type (str): The subject token type based on the Oauth2.0 token exchange spec.
+ Expected values include::
+
+ “urn:ietf:params:oauth:token-type:jwt”
+ “urn:ietf:params:oauth:token-type:id-token”
+ “urn:ietf:params:oauth:token-type:saml2”
+
+ token_url (Optional [str]): The STS endpoint URL. If not provided, will default to "https://sts.googleapis.com/v1/token".
+ credential_source (Optional [Mapping]): The credential source dictionary used to
+ provide instructions on how to retrieve external credential to be
+ exchanged for Google access tokens. Either a credential source or
+ a subject token supplier must be provided.
+
+ Example credential_source for url-sourced credential::
+
+ {
+ "url": "http://www.example.com",
+ "format": {
+ "type": "json",
+ "subject_token_field_name": "access_token",
+ },
+ "headers": {"foo": "bar"},
+ }
+
+ Example credential_source for file-sourced credential::
+
+ {
+ "file": "/path/to/token/file.txt"
+ }
+ subject_token_supplier (Optional [SubjectTokenSupplier]): Optional subject token supplier.
+ This will be called to supply a valid subject token which will then
+ be exchanged for Google access tokens. Either a subject token supplier
+ or a credential source must be provided.
+ args (List): Optional positional arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
+ kwargs (Mapping): Optional keyword arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If an error is encountered during
+ access token retrieval logic.
+ ValueError: For invalid parameters.
+
+ .. note:: Typically one of the helper constructors
+ :meth:`from_file` or
+ :meth:`from_info` are used instead of calling the constructor directly.
+ """
+
+ super(Credentials, self).__init__(
+ audience=audience,
+ subject_token_type=subject_token_type,
+ token_url=token_url,
+ credential_source=credential_source,
+ *args,
+ **kwargs
+ )
+ if credential_source is None and subject_token_supplier is None:
+ raise exceptions.InvalidValue(
+ "A valid credential source or a subject token supplier must be provided."
+ )
+ if credential_source is not None and subject_token_supplier is not None:
+ raise exceptions.InvalidValue(
+ "Identity pool credential cannot have both a credential source and a subject token supplier."
+ )
+
+ if subject_token_supplier is not None:
+ self._subject_token_supplier = subject_token_supplier
+ self._credential_source_file = None
+ self._credential_source_url = None
+ self._credential_source_certificate = None
+ else:
+ if not isinstance(credential_source, Mapping):
+ self._credential_source_executable = None
+ raise exceptions.MalformedError(
+ "Invalid credential_source. The credential_source is not a dict."
+ )
+ self._credential_source_file = credential_source.get("file")
+ self._credential_source_url = credential_source.get("url")
+ self._credential_source_certificate = credential_source.get("certificate")
+
+ # environment_id is only supported in AWS or dedicated future external
+ # account credentials.
+ if "environment_id" in credential_source:
+ raise exceptions.MalformedError(
+ "Invalid Identity Pool credential_source field 'environment_id'"
+ )
+
+ # check that only one of file, url, or certificate are provided.
+ self._validate_single_source()
+
+ if self._credential_source_certificate:
+ self._validate_certificate_config()
+ else:
+ self._validate_file_or_url_config(credential_source)
+
+ if self._credential_source_file:
+ self._subject_token_supplier = _FileSupplier(
+ self._credential_source_file,
+ self._credential_source_format_type,
+ self._credential_source_field_name,
+ )
+ elif self._credential_source_url:
+ self._subject_token_supplier = _UrlSupplier(
+ self._credential_source_url,
+ self._credential_source_format_type,
+ self._credential_source_field_name,
+ self._credential_source_headers,
+ )
+ else: # self._credential_source_certificate
+ self._subject_token_supplier = _X509Supplier()
+
+ @_helpers.copy_docstring(external_account.Credentials)
+ def retrieve_subject_token(self, request):
+ return self._subject_token_supplier.get_subject_token(
+ self._supplier_context, request
+ )
+
+ def _get_mtls_cert_and_key_paths(self):
+ if self._credential_source_certificate is None:
+ raise exceptions.RefreshError(
+ 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.'
+ )
+ else:
+ return _mtls_helper._get_workload_cert_and_key_paths(
+ self._certificate_config_location
+ )
+
+ def _mtls_required(self):
+ return self._credential_source_certificate is not None
+
+ def _create_default_metrics_options(self):
+ metrics_options = super(Credentials, self)._create_default_metrics_options()
+ # Check that credential source is a dict before checking for credential type. This check needs to be done
+ # here because the external_account credential constructor needs to pass the metrics options to the
+ # impersonated credential object before the identity_pool credentials are validated.
+ if isinstance(self._credential_source, Mapping):
+ if self._credential_source.get("file"):
+ metrics_options["source"] = "file"
+ elif self._credential_source.get("url"):
+ metrics_options["source"] = "url"
+ else:
+ metrics_options["source"] = "x509"
+ else:
+ metrics_options["source"] = "programmatic"
+ return metrics_options
+
+ def _has_custom_supplier(self):
+ return self._credential_source is None
+
+ def _constructor_args(self):
+ args = super(Credentials, self)._constructor_args()
+ # If a custom supplier was used, append it to the args dict.
+ if self._has_custom_supplier():
+ args.update({"subject_token_supplier": self._subject_token_supplier})
+ return args
+
+ def _validate_certificate_config(self):
+ self._certificate_config_location = self._credential_source_certificate.get(
+ "certificate_config_location"
+ )
+ use_default = self._credential_source_certificate.get(
+ "use_default_certificate_config"
+ )
+ if self._certificate_config_location and use_default:
+ raise exceptions.MalformedError(
+ "Invalid certificate configuration, certificate_config_location cannot be specified when use_default_certificate_config = true."
+ )
+ if not self._certificate_config_location and not use_default:
+ raise exceptions.MalformedError(
+ "Invalid certificate configuration, use_default_certificate_config should be true if no certificate_config_location is provided."
+ )
+
+ def _validate_file_or_url_config(self, credential_source):
+ self._credential_source_headers = credential_source.get("headers")
+ credential_source_format = credential_source.get("format", {})
+ # Get credential_source format type. When not provided, this
+ # defaults to text.
+ self._credential_source_format_type = (
+ credential_source_format.get("type") or "text"
+ )
+ if self._credential_source_format_type not in ["text", "json"]:
+ raise exceptions.MalformedError(
+ "Invalid credential_source format '{}'".format(
+ self._credential_source_format_type
+ )
+ )
+ # For JSON types, get the required subject_token field name.
+ if self._credential_source_format_type == "json":
+ self._credential_source_field_name = credential_source_format.get(
+ "subject_token_field_name"
+ )
+ if self._credential_source_field_name is None:
+ raise exceptions.MalformedError(
+ "Missing subject_token_field_name for JSON credential_source format"
+ )
+ else:
+ self._credential_source_field_name = None
+
+ def _validate_single_source(self):
+ credential_sources = [
+ self._credential_source_file,
+ self._credential_source_url,
+ self._credential_source_certificate,
+ ]
+ valid_credential_sources = list(
+ filter(lambda source: source is not None, credential_sources)
+ )
+
+ if len(valid_credential_sources) > 1:
+ raise exceptions.MalformedError(
+ "Ambiguous credential_source. 'file', 'url', and 'certificate' are mutually exclusive.."
+ )
+ if len(valid_credential_sources) != 1:
+ raise exceptions.MalformedError(
+ "Missing credential_source. A 'file', 'url', or 'certificate' must be provided."
+ )
+
+ @classmethod
+ def from_info(cls, info, **kwargs):
+ """Creates an Identity Pool Credentials instance from parsed external account info.
+
+ Args:
+ info (Mapping[str, str]): The Identity Pool external account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.identity_pool.Credentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: For invalid parameters.
+ """
+ subject_token_supplier = info.get("subject_token_supplier")
+ kwargs.update({"subject_token_supplier": subject_token_supplier})
+ return super(Credentials, cls).from_info(info, **kwargs)
+
+ @classmethod
+ def from_file(cls, filename, **kwargs):
+ """Creates an IdentityPool Credentials instance from an external account json file.
+
+ Args:
+ filename (str): The path to the IdentityPool external account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.identity_pool.Credentials: The constructed
+ credentials.
+ """
+ return super(Credentials, cls).from_file(filename, **kwargs)