about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/botocore/credentials.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/credentials.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/credentials.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/credentials.py2391
1 files changed, 2391 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/credentials.py b/.venv/lib/python3.12/site-packages/botocore/credentials.py
new file mode 100644
index 00000000..a425cb79
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/credentials.py
@@ -0,0 +1,2391 @@
+# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
+# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import datetime
+import getpass
+import json
+import logging
+import os
+import subprocess
+import threading
+import time
+from collections import namedtuple
+from copy import deepcopy
+from hashlib import sha1
+
+from dateutil.parser import parse
+from dateutil.tz import tzlocal, tzutc
+
+import botocore.compat
+import botocore.configloader
+from botocore import UNSIGNED
+from botocore.compat import compat_shell_split, total_seconds
+from botocore.config import Config
+from botocore.exceptions import (
+    ConfigNotFound,
+    CredentialRetrievalError,
+    InfiniteLoopConfigError,
+    InvalidConfigError,
+    MetadataRetrievalError,
+    PartialCredentialsError,
+    RefreshWithMFAUnsupportedError,
+    UnauthorizedSSOTokenError,
+    UnknownCredentialError,
+)
+from botocore.tokens import SSOTokenProvider
+from botocore.utils import (
+    ArnParser,
+    ContainerMetadataFetcher,
+    FileWebIdentityTokenLoader,
+    InstanceMetadataFetcher,
+    JSONFileCache,
+    SSOTokenLoader,
+    parse_key_val_file,
+    resolve_imds_endpoint_mode,
+)
+
+logger = logging.getLogger(__name__)
+ReadOnlyCredentials = namedtuple(
+    'ReadOnlyCredentials',
+    ['access_key', 'secret_key', 'token', 'account_id'],
+    defaults=(None,),
+)
+
+_DEFAULT_MANDATORY_REFRESH_TIMEOUT = 10 * 60  # 10 min
+_DEFAULT_ADVISORY_REFRESH_TIMEOUT = 15 * 60  # 15 min
+
+
+def create_credential_resolver(session, cache=None, region_name=None):
+    """Create a default credential resolver.
+
+    This creates a pre-configured credential resolver
+    that includes the default lookup chain for
+    credentials.
+
+    """
+    profile_name = session.get_config_variable('profile') or 'default'
+    metadata_timeout = session.get_config_variable('metadata_service_timeout')
+    num_attempts = session.get_config_variable('metadata_service_num_attempts')
+    disable_env_vars = session.instance_variables().get('profile') is not None
+
+    imds_config = {
+        'ec2_metadata_service_endpoint': session.get_config_variable(
+            'ec2_metadata_service_endpoint'
+        ),
+        'ec2_metadata_service_endpoint_mode': resolve_imds_endpoint_mode(
+            session
+        ),
+        'ec2_credential_refresh_window': _DEFAULT_ADVISORY_REFRESH_TIMEOUT,
+        'ec2_metadata_v1_disabled': session.get_config_variable(
+            'ec2_metadata_v1_disabled'
+        ),
+    }
+
+    if cache is None:
+        cache = {}
+
+    env_provider = EnvProvider()
+    container_provider = ContainerProvider()
+    instance_metadata_provider = InstanceMetadataProvider(
+        iam_role_fetcher=InstanceMetadataFetcher(
+            timeout=metadata_timeout,
+            num_attempts=num_attempts,
+            user_agent=session.user_agent(),
+            config=imds_config,
+        )
+    )
+
+    profile_provider_builder = ProfileProviderBuilder(
+        session, cache=cache, region_name=region_name
+    )
+    assume_role_provider = AssumeRoleProvider(
+        load_config=lambda: session.full_config,
+        client_creator=_get_client_creator(session, region_name),
+        cache=cache,
+        profile_name=profile_name,
+        credential_sourcer=CanonicalNameCredentialSourcer(
+            [env_provider, container_provider, instance_metadata_provider]
+        ),
+        profile_provider_builder=profile_provider_builder,
+    )
+
+    pre_profile = [
+        env_provider,
+        assume_role_provider,
+    ]
+    profile_providers = profile_provider_builder.providers(
+        profile_name=profile_name,
+        disable_env_vars=disable_env_vars,
+    )
+    post_profile = [
+        OriginalEC2Provider(),
+        BotoProvider(),
+        container_provider,
+        instance_metadata_provider,
+    ]
+    providers = pre_profile + profile_providers + post_profile
+
+    if disable_env_vars:
+        # An explicitly provided profile will negate an EnvProvider.
+        # We will defer to providers that understand the "profile"
+        # concept to retrieve credentials.
+        # The one edge case if is all three values are provided via
+        # env vars:
+        # export AWS_ACCESS_KEY_ID=foo
+        # export AWS_SECRET_ACCESS_KEY=bar
+        # export AWS_PROFILE=baz
+        # Then, just like our client() calls, the explicit credentials
+        # will take precedence.
+        #
+        # This precedence is enforced by leaving the EnvProvider in the chain.
+        # This means that the only way a "profile" would win is if the
+        # EnvProvider does not return credentials, which is what we want
+        # in this scenario.
+        providers.remove(env_provider)
+        logger.debug(
+            'Skipping environment variable credential check'
+            ' because profile name was explicitly set.'
+        )
+
+    resolver = CredentialResolver(providers=providers)
+    return resolver
+
+
+class ProfileProviderBuilder:
+    """This class handles the creation of profile based providers.
+
+    NOTE: This class is only intended for internal use.
+
+    This class handles the creation and ordering of the various credential
+    providers that primarly source their configuration from the shared config.
+    This is needed to enable sharing between the default credential chain and
+    the source profile chain created by the assume role provider.
+    """
+
+    def __init__(
+        self, session, cache=None, region_name=None, sso_token_cache=None
+    ):
+        self._session = session
+        self._cache = cache
+        self._region_name = region_name
+        self._sso_token_cache = sso_token_cache
+
+    def providers(self, profile_name, disable_env_vars=False):
+        return [
+            self._create_web_identity_provider(
+                profile_name,
+                disable_env_vars,
+            ),
+            self._create_sso_provider(profile_name),
+            self._create_shared_credential_provider(profile_name),
+            self._create_process_provider(profile_name),
+            self._create_config_provider(profile_name),
+        ]
+
+    def _create_process_provider(self, profile_name):
+        return ProcessProvider(
+            profile_name=profile_name,
+            load_config=lambda: self._session.full_config,
+        )
+
+    def _create_shared_credential_provider(self, profile_name):
+        credential_file = self._session.get_config_variable('credentials_file')
+        return SharedCredentialProvider(
+            profile_name=profile_name,
+            creds_filename=credential_file,
+        )
+
+    def _create_config_provider(self, profile_name):
+        config_file = self._session.get_config_variable('config_file')
+        return ConfigProvider(
+            profile_name=profile_name,
+            config_filename=config_file,
+        )
+
+    def _create_web_identity_provider(self, profile_name, disable_env_vars):
+        return AssumeRoleWithWebIdentityProvider(
+            load_config=lambda: self._session.full_config,
+            client_creator=_get_client_creator(
+                self._session, self._region_name
+            ),
+            cache=self._cache,
+            profile_name=profile_name,
+            disable_env_vars=disable_env_vars,
+        )
+
+    def _create_sso_provider(self, profile_name):
+        return SSOProvider(
+            load_config=lambda: self._session.full_config,
+            client_creator=self._session.create_client,
+            profile_name=profile_name,
+            cache=self._cache,
+            token_cache=self._sso_token_cache,
+            token_provider=SSOTokenProvider(
+                self._session,
+                cache=self._sso_token_cache,
+                profile_name=profile_name,
+            ),
+        )
+
+
+def get_credentials(session):
+    resolver = create_credential_resolver(session)
+    return resolver.load_credentials()
+
+
+def _local_now():
+    return datetime.datetime.now(tzlocal())
+
+
+def _parse_if_needed(value):
+    if isinstance(value, datetime.datetime):
+        return value
+    return parse(value)
+
+
+def _serialize_if_needed(value, iso=False):
+    if isinstance(value, datetime.datetime):
+        if iso:
+            return value.isoformat()
+        return value.strftime('%Y-%m-%dT%H:%M:%S%Z')
+    return value
+
+
+def _get_client_creator(session, region_name):
+    def client_creator(service_name, **kwargs):
+        create_client_kwargs = {'region_name': region_name}
+        create_client_kwargs.update(**kwargs)
+        return session.create_client(service_name, **create_client_kwargs)
+
+    return client_creator
+
+
+def create_assume_role_refresher(client, params):
+    def refresh():
+        response = client.assume_role(**params)
+        credentials = response['Credentials']
+        # We need to normalize the credential names to
+        # the values expected by the refresh creds.
+        return {
+            'access_key': credentials['AccessKeyId'],
+            'secret_key': credentials['SecretAccessKey'],
+            'token': credentials['SessionToken'],
+            'expiry_time': _serialize_if_needed(credentials['Expiration']),
+        }
+
+    return refresh
+
+
+def create_mfa_serial_refresher(actual_refresh):
+    class _Refresher:
+        def __init__(self, refresh):
+            self._refresh = refresh
+            self._has_been_called = False
+
+        def __call__(self):
+            if self._has_been_called:
+                # We can explore an option in the future to support
+                # reprompting for MFA, but for now we just error out
+                # when the temp creds expire.
+                raise RefreshWithMFAUnsupportedError()
+            self._has_been_called = True
+            return self._refresh()
+
+    return _Refresher(actual_refresh)
+
+
+class Credentials:
+    """
+    Holds the credentials needed to authenticate requests.
+
+    :param str access_key: The access key part of the credentials.
+    :param str secret_key: The secret key part of the credentials.
+    :param str token: The security token, valid only for session credentials.
+    :param str method: A string which identifies where the credentials
+        were found.
+    :param str account_id: (optional) An account ID associated with the credentials.
+    """
+
+    def __init__(
+        self, access_key, secret_key, token=None, method=None, account_id=None
+    ):
+        self.access_key = access_key
+        self.secret_key = secret_key
+        self.token = token
+
+        if method is None:
+            method = 'explicit'
+        self.method = method
+        self.account_id = account_id
+
+        self._normalize()
+
+    def _normalize(self):
+        # Keys would sometimes (accidentally) contain non-ascii characters.
+        # It would cause a confusing UnicodeDecodeError in Python 2.
+        # We explicitly convert them into unicode to avoid such error.
+        #
+        # Eventually the service will decide whether to accept the credential.
+        # This also complies with the behavior in Python 3.
+        self.access_key = botocore.compat.ensure_unicode(self.access_key)
+        self.secret_key = botocore.compat.ensure_unicode(self.secret_key)
+
+    def get_frozen_credentials(self):
+        return ReadOnlyCredentials(
+            self.access_key, self.secret_key, self.token, self.account_id
+        )
+
+    def get_deferred_property(self, property_name):
+        def get_property():
+            return getattr(self, property_name, None)
+
+        return get_property
+
+
+class RefreshableCredentials(Credentials):
+    """
+    Holds the credentials needed to authenticate requests. In addition, it
+    knows how to refresh itself.
+
+    :param str access_key: The access key part of the credentials.
+    :param str secret_key: The secret key part of the credentials.
+    :param str token: The security token, valid only for session credentials.
+    :param datetime expiry_time: The expiration time of the credentials.
+    :param function refresh_using: Callback function to refresh the credentials.
+    :param str method: A string which identifies where the credentials
+        were found.
+    :param function time_fetcher: Callback function to retrieve current time.
+    """
+
+    # The time at which we'll attempt to refresh, but not
+    # block if someone else is refreshing.
+    _advisory_refresh_timeout = _DEFAULT_ADVISORY_REFRESH_TIMEOUT
+    # The time at which all threads will block waiting for
+    # refreshed credentials.
+    _mandatory_refresh_timeout = _DEFAULT_MANDATORY_REFRESH_TIMEOUT
+
+    def __init__(
+        self,
+        access_key,
+        secret_key,
+        token,
+        expiry_time,
+        refresh_using,
+        method,
+        time_fetcher=_local_now,
+        advisory_timeout=None,
+        mandatory_timeout=None,
+        account_id=None,
+    ):
+        self._refresh_using = refresh_using
+        self._access_key = access_key
+        self._secret_key = secret_key
+        self._token = token
+        self._account_id = account_id
+        self._expiry_time = expiry_time
+        self._time_fetcher = time_fetcher
+        self._refresh_lock = threading.Lock()
+        self.method = method
+        self._frozen_credentials = ReadOnlyCredentials(
+            access_key, secret_key, token, account_id
+        )
+        self._normalize()
+        if advisory_timeout is not None:
+            self._advisory_refresh_timeout = advisory_timeout
+        if mandatory_timeout is not None:
+            self._mandatory_refresh_timeout = mandatory_timeout
+
+    def _normalize(self):
+        self._access_key = botocore.compat.ensure_unicode(self._access_key)
+        self._secret_key = botocore.compat.ensure_unicode(self._secret_key)
+
+    @classmethod
+    def create_from_metadata(
+        cls,
+        metadata,
+        refresh_using,
+        method,
+        advisory_timeout=None,
+        mandatory_timeout=None,
+    ):
+        kwargs = {}
+        if advisory_timeout is not None:
+            kwargs['advisory_timeout'] = advisory_timeout
+        if mandatory_timeout is not None:
+            kwargs['mandatory_timeout'] = mandatory_timeout
+
+        instance = cls(
+            access_key=metadata['access_key'],
+            secret_key=metadata['secret_key'],
+            token=metadata['token'],
+            expiry_time=cls._expiry_datetime(metadata['expiry_time']),
+            method=method,
+            refresh_using=refresh_using,
+            account_id=metadata.get('account_id'),
+            **kwargs,
+        )
+        return instance
+
+    @property
+    def access_key(self):
+        """Warning: Using this property can lead to race conditions if you
+        access another property subsequently along the refresh boundary.
+        Please use get_frozen_credentials instead.
+        """
+        self._refresh()
+        return self._access_key
+
+    @access_key.setter
+    def access_key(self, value):
+        self._access_key = value
+
+    @property
+    def secret_key(self):
+        """Warning: Using this property can lead to race conditions if you
+        access another property subsequently along the refresh boundary.
+        Please use get_frozen_credentials instead.
+        """
+        self._refresh()
+        return self._secret_key
+
+    @secret_key.setter
+    def secret_key(self, value):
+        self._secret_key = value
+
+    @property
+    def token(self):
+        """Warning: Using this property can lead to race conditions if you
+        access another property subsequently along the refresh boundary.
+        Please use get_frozen_credentials instead.
+        """
+        self._refresh()
+        return self._token
+
+    @token.setter
+    def token(self, value):
+        self._token = value
+
+    @property
+    def account_id(self):
+        """Warning: Using this property can lead to race conditions if you
+        access another property subsequently along the refresh boundary.
+        Please use get_frozen_credentials instead.
+        """
+        self._refresh()
+        return self._account_id
+
+    @account_id.setter
+    def account_id(self, value):
+        self._account_id = value
+
+    def _seconds_remaining(self):
+        delta = self._expiry_time - self._time_fetcher()
+        return total_seconds(delta)
+
+    def refresh_needed(self, refresh_in=None):
+        """Check if a refresh is needed.
+
+        A refresh is needed if the expiry time associated
+        with the temporary credentials is less than the
+        provided ``refresh_in``.  If ``time_delta`` is not
+        provided, ``self.advisory_refresh_needed`` will be used.
+
+        For example, if your temporary credentials expire
+        in 10 minutes and the provided ``refresh_in`` is
+        ``15 * 60``, then this function will return ``True``.
+
+        :type refresh_in: int
+        :param refresh_in: The number of seconds before the
+            credentials expire in which refresh attempts should
+            be made.
+
+        :return: True if refresh needed, False otherwise.
+
+        """
+        if self._expiry_time is None:
+            # No expiration, so assume we don't need to refresh.
+            return False
+
+        if refresh_in is None:
+            refresh_in = self._advisory_refresh_timeout
+        # The credentials should be refreshed if they're going to expire
+        # in less than 5 minutes.
+        if self._seconds_remaining() >= refresh_in:
+            # There's enough time left. Don't refresh.
+            return False
+        logger.debug("Credentials need to be refreshed.")
+        return True
+
+    def _is_expired(self):
+        # Checks if the current credentials are expired.
+        return self.refresh_needed(refresh_in=0)
+
+    def _refresh(self):
+        # In the common case where we don't need a refresh, we
+        # can immediately exit and not require acquiring the
+        # refresh lock.
+        if not self.refresh_needed(self._advisory_refresh_timeout):
+            return
+
+        # acquire() doesn't accept kwargs, but False is indicating
+        # that we should not block if we can't acquire the lock.
+        # If we aren't able to acquire the lock, we'll trigger
+        # the else clause.
+        if self._refresh_lock.acquire(False):
+            try:
+                if not self.refresh_needed(self._advisory_refresh_timeout):
+                    return
+                is_mandatory_refresh = self.refresh_needed(
+                    self._mandatory_refresh_timeout
+                )
+                self._protected_refresh(is_mandatory=is_mandatory_refresh)
+                return
+            finally:
+                self._refresh_lock.release()
+        elif self.refresh_needed(self._mandatory_refresh_timeout):
+            # If we're within the mandatory refresh window,
+            # we must block until we get refreshed credentials.
+            with self._refresh_lock:
+                if not self.refresh_needed(self._mandatory_refresh_timeout):
+                    return
+                self._protected_refresh(is_mandatory=True)
+
+    def _protected_refresh(self, is_mandatory):
+        # precondition: this method should only be called if you've acquired
+        # the self._refresh_lock.
+        try:
+            metadata = self._refresh_using()
+        except Exception:
+            period_name = 'mandatory' if is_mandatory else 'advisory'
+            logger.warning(
+                "Refreshing temporary credentials failed "
+                "during %s refresh period.",
+                period_name,
+                exc_info=True,
+            )
+            if is_mandatory:
+                # If this is a mandatory refresh, then
+                # all errors that occur when we attempt to refresh
+                # credentials are propagated back to the user.
+                raise
+            # Otherwise we'll just return.
+            # The end result will be that we'll use the current
+            # set of temporary credentials we have.
+            return
+        self._set_from_data(metadata)
+        self._frozen_credentials = ReadOnlyCredentials(
+            self._access_key, self._secret_key, self._token, self._account_id
+        )
+        if self._is_expired():
+            # We successfully refreshed credentials but for whatever
+            # reason, our refreshing function returned credentials
+            # that are still expired.  In this scenario, the only
+            # thing we can do is let the user know and raise
+            # an exception.
+            msg = (
+                "Credentials were refreshed, but the "
+                "refreshed credentials are still expired."
+            )
+            logger.warning(msg)
+            raise RuntimeError(msg)
+
+    @staticmethod
+    def _expiry_datetime(time_str):
+        return parse(time_str)
+
+    def _set_from_data(self, data):
+        expected_keys = ['access_key', 'secret_key', 'token', 'expiry_time']
+        if not data:
+            missing_keys = expected_keys
+        else:
+            missing_keys = [k for k in expected_keys if k not in data]
+
+        if missing_keys:
+            message = "Credential refresh failed, response did not contain: %s"
+            raise CredentialRetrievalError(
+                provider=self.method,
+                error_msg=message % ', '.join(missing_keys),
+            )
+
+        self.access_key = data['access_key']
+        self.secret_key = data['secret_key']
+        self.token = data['token']
+        self._expiry_time = parse(data['expiry_time'])
+        self.account_id = data.get('account_id')
+        logger.debug(
+            "Retrieved credentials will expire at: %s", self._expiry_time
+        )
+        self._normalize()
+
+    def get_frozen_credentials(self):
+        """Return immutable credentials.
+
+        The ``access_key``, ``secret_key``, and ``token`` properties
+        on this class will always check and refresh credentials if
+        needed before returning the particular credentials.
+
+        This has an edge case where you can get inconsistent
+        credentials.  Imagine this:
+
+            # Current creds are "t1"
+            tmp.access_key  ---> expired? no, so return t1.access_key
+            # ---- time is now expired, creds need refreshing to "t2" ----
+            tmp.secret_key  ---> expired? yes, refresh and return t2.secret_key
+
+        This means we're using the access key from t1 with the secret key
+        from t2.  To fix this issue, you can request a frozen credential object
+        which is guaranteed not to change.
+
+        The frozen credentials returned from this method should be used
+        immediately and then discarded.  The typical usage pattern would
+        be::
+
+            creds = RefreshableCredentials(...)
+            some_code = SomeSignerObject()
+            # I'm about to sign the request.
+            # The frozen credentials are only used for the
+            # duration of generate_presigned_url and will be
+            # immediately thrown away.
+            request = some_code.sign_some_request(
+                with_credentials=creds.get_frozen_credentials())
+            print("Signed request:", request)
+
+        """
+        self._refresh()
+        return self._frozen_credentials
+
+
+class DeferredRefreshableCredentials(RefreshableCredentials):
+    """Refreshable credentials that don't require initial credentials.
+
+    refresh_using will be called upon first access.
+    """
+
+    def __init__(self, refresh_using, method, time_fetcher=_local_now):
+        self._refresh_using = refresh_using
+        self._access_key = None
+        self._secret_key = None
+        self._token = None
+        self._account_id = None
+        self._expiry_time = None
+        self._time_fetcher = time_fetcher
+        self._refresh_lock = threading.Lock()
+        self.method = method
+        self._frozen_credentials = None
+
+    def refresh_needed(self, refresh_in=None):
+        if self._frozen_credentials is None:
+            return True
+        return super().refresh_needed(refresh_in)
+
+
+class CachedCredentialFetcher:
+    DEFAULT_EXPIRY_WINDOW_SECONDS = 60 * 15
+
+    def __init__(self, cache=None, expiry_window_seconds=None):
+        if cache is None:
+            cache = {}
+        self._cache = cache
+        self._cache_key = self._create_cache_key()
+        if expiry_window_seconds is None:
+            expiry_window_seconds = self.DEFAULT_EXPIRY_WINDOW_SECONDS
+        self._expiry_window_seconds = expiry_window_seconds
+
+    def _create_cache_key(self):
+        raise NotImplementedError('_create_cache_key()')
+
+    def _make_file_safe(self, filename):
+        # Replace :, path sep, and / to make it the string filename safe.
+        filename = filename.replace(':', '_').replace(os.sep, '_')
+        return filename.replace('/', '_')
+
+    def _get_credentials(self):
+        raise NotImplementedError('_get_credentials()')
+
+    def fetch_credentials(self):
+        return self._get_cached_credentials()
+
+    def _get_cached_credentials(self):
+        """Get up-to-date credentials.
+
+        This will check the cache for up-to-date credentials, calling assume
+        role if none are available.
+        """
+        response = self._load_from_cache()
+        if response is None:
+            response = self._get_credentials()
+            self._write_to_cache(response)
+        else:
+            logger.debug("Credentials for role retrieved from cache.")
+
+        creds = response['Credentials']
+        expiration = _serialize_if_needed(creds['Expiration'], iso=True)
+        credentials = {
+            'access_key': creds['AccessKeyId'],
+            'secret_key': creds['SecretAccessKey'],
+            'token': creds['SessionToken'],
+            'expiry_time': expiration,
+            'account_id': creds.get('AccountId'),
+        }
+
+        return credentials
+
+    def _load_from_cache(self):
+        if self._cache_key in self._cache:
+            creds = deepcopy(self._cache[self._cache_key])
+            if not self._is_expired(creds):
+                return creds
+            else:
+                logger.debug(
+                    "Credentials were found in cache, but they are expired."
+                )
+        return None
+
+    def _write_to_cache(self, response):
+        self._cache[self._cache_key] = deepcopy(response)
+
+    def _is_expired(self, credentials):
+        """Check if credentials are expired."""
+        end_time = _parse_if_needed(credentials['Credentials']['Expiration'])
+        seconds = total_seconds(end_time - _local_now())
+        return seconds < self._expiry_window_seconds
+
+
+class BaseAssumeRoleCredentialFetcher(CachedCredentialFetcher):
+    def __init__(
+        self,
+        client_creator,
+        role_arn,
+        extra_args=None,
+        cache=None,
+        expiry_window_seconds=None,
+    ):
+        self._client_creator = client_creator
+        self._role_arn = role_arn
+
+        if extra_args is None:
+            self._assume_kwargs = {}
+        else:
+            self._assume_kwargs = deepcopy(extra_args)
+        self._assume_kwargs['RoleArn'] = self._role_arn
+
+        self._role_session_name = self._assume_kwargs.get('RoleSessionName')
+        self._using_default_session_name = False
+        if not self._role_session_name:
+            self._generate_assume_role_name()
+
+        super().__init__(cache, expiry_window_seconds)
+
+    def _generate_assume_role_name(self):
+        self._role_session_name = f'botocore-session-{int(time.time())}'
+        self._assume_kwargs['RoleSessionName'] = self._role_session_name
+        self._using_default_session_name = True
+
+    def _create_cache_key(self):
+        """Create a predictable cache key for the current configuration.
+
+        The cache key is intended to be compatible with file names.
+        """
+        args = deepcopy(self._assume_kwargs)
+
+        # The role session name gets randomly generated, so we don't want it
+        # in the hash.
+        if self._using_default_session_name:
+            del args['RoleSessionName']
+
+        if 'Policy' in args:
+            # To have a predictable hash, the keys of the policy must be
+            # sorted, so we have to load it here to make sure it gets sorted
+            # later on.
+            args['Policy'] = json.loads(args['Policy'])
+
+        args = json.dumps(args, sort_keys=True)
+        argument_hash = sha1(args.encode('utf-8')).hexdigest()
+        return self._make_file_safe(argument_hash)
+
+    def _add_account_id_to_response(self, response):
+        role_arn = response.get('AssumedRoleUser', {}).get('Arn')
+        if ArnParser.is_arn(role_arn):
+            arn_parser = ArnParser()
+            account_id = arn_parser.parse_arn(role_arn)['account']
+            response['Credentials']['AccountId'] = account_id
+        else:
+            logger.debug(f"Unable to extract account ID from Arn: {role_arn}")
+
+
+class AssumeRoleCredentialFetcher(BaseAssumeRoleCredentialFetcher):
+    def __init__(
+        self,
+        client_creator,
+        source_credentials,
+        role_arn,
+        extra_args=None,
+        mfa_prompter=None,
+        cache=None,
+        expiry_window_seconds=None,
+    ):
+        """
+        :type client_creator: callable
+        :param client_creator: A callable that creates a client taking
+            arguments like ``Session.create_client``.
+
+        :type source_credentials: Credentials
+        :param source_credentials: The credentials to use to create the
+            client for the call to AssumeRole.
+
+        :type role_arn: str
+        :param role_arn: The ARN of the role to be assumed.
+
+        :type extra_args: dict
+        :param extra_args: Any additional arguments to add to the assume
+            role request using the format of the botocore operation.
+            Possible keys include, but may not be limited to,
+            DurationSeconds, Policy, SerialNumber, ExternalId and
+            RoleSessionName.
+
+        :type mfa_prompter: callable
+        :param mfa_prompter: A callable that returns input provided by the
+            user (i.e raw_input, getpass.getpass, etc.).
+
+        :type cache: dict
+        :param cache: An object that supports ``__getitem__``,
+            ``__setitem__``, and ``__contains__``.  An example of this is
+            the ``JSONFileCache`` class in aws-cli.
+
+        :type expiry_window_seconds: int
+        :param expiry_window_seconds: The amount of time, in seconds,
+        """
+        self._source_credentials = source_credentials
+        self._mfa_prompter = mfa_prompter
+        if self._mfa_prompter is None:
+            self._mfa_prompter = getpass.getpass
+
+        super().__init__(
+            client_creator,
+            role_arn,
+            extra_args=extra_args,
+            cache=cache,
+            expiry_window_seconds=expiry_window_seconds,
+        )
+
+    def _get_credentials(self):
+        """Get credentials by calling assume role."""
+        kwargs = self._assume_role_kwargs()
+        client = self._create_client()
+        response = client.assume_role(**kwargs)
+        self._add_account_id_to_response(response)
+        return response
+
+    def _assume_role_kwargs(self):
+        """Get the arguments for assume role based on current configuration."""
+        assume_role_kwargs = deepcopy(self._assume_kwargs)
+
+        mfa_serial = assume_role_kwargs.get('SerialNumber')
+
+        if mfa_serial is not None:
+            prompt = f'Enter MFA code for {mfa_serial}: '
+            token_code = self._mfa_prompter(prompt)
+            assume_role_kwargs['TokenCode'] = token_code
+
+        duration_seconds = assume_role_kwargs.get('DurationSeconds')
+
+        if duration_seconds is not None:
+            assume_role_kwargs['DurationSeconds'] = duration_seconds
+
+        return assume_role_kwargs
+
+    def _create_client(self):
+        """Create an STS client using the source credentials."""
+        frozen_credentials = self._source_credentials.get_frozen_credentials()
+        return self._client_creator(
+            'sts',
+            aws_access_key_id=frozen_credentials.access_key,
+            aws_secret_access_key=frozen_credentials.secret_key,
+            aws_session_token=frozen_credentials.token,
+        )
+
+
+class AssumeRoleWithWebIdentityCredentialFetcher(
+    BaseAssumeRoleCredentialFetcher
+):
+    def __init__(
+        self,
+        client_creator,
+        web_identity_token_loader,
+        role_arn,
+        extra_args=None,
+        cache=None,
+        expiry_window_seconds=None,
+    ):
+        """
+        :type client_creator: callable
+        :param client_creator: A callable that creates a client taking
+            arguments like ``Session.create_client``.
+
+        :type web_identity_token_loader: callable
+        :param web_identity_token_loader: A callable that takes no arguments
+        and returns a web identity token str.
+
+        :type role_arn: str
+        :param role_arn: The ARN of the role to be assumed.
+
+        :type extra_args: dict
+        :param extra_args: Any additional arguments to add to the assume
+            role request using the format of the botocore operation.
+            Possible keys include, but may not be limited to,
+            DurationSeconds, Policy, SerialNumber, ExternalId and
+            RoleSessionName.
+
+        :type cache: dict
+        :param cache: An object that supports ``__getitem__``,
+            ``__setitem__``, and ``__contains__``.  An example of this is
+            the ``JSONFileCache`` class in aws-cli.
+
+        :type expiry_window_seconds: int
+        :param expiry_window_seconds: The amount of time, in seconds,
+        """
+        self._web_identity_token_loader = web_identity_token_loader
+
+        super().__init__(
+            client_creator,
+            role_arn,
+            extra_args=extra_args,
+            cache=cache,
+            expiry_window_seconds=expiry_window_seconds,
+        )
+
+    def _get_credentials(self):
+        """Get credentials by calling assume role."""
+        kwargs = self._assume_role_kwargs()
+        # Assume role with web identity does not require credentials other than
+        # the token, explicitly configure the client to not sign requests.
+        config = Config(signature_version=UNSIGNED)
+        client = self._client_creator('sts', config=config)
+        response = client.assume_role_with_web_identity(**kwargs)
+        self._add_account_id_to_response(response)
+        return response
+
+    def _assume_role_kwargs(self):
+        """Get the arguments for assume role based on current configuration."""
+        assume_role_kwargs = deepcopy(self._assume_kwargs)
+        identity_token = self._web_identity_token_loader()
+        assume_role_kwargs['WebIdentityToken'] = identity_token
+
+        return assume_role_kwargs
+
+
+class CredentialProvider:
+    # A short name to identify the provider within botocore.
+    METHOD = None
+
+    # A name to identify the provider for use in cross-sdk features like
+    # assume role's `credential_source` configuration option. These names
+    # are to be treated in a case-insensitive way. NOTE: any providers not
+    # implemented in botocore MUST prefix their canonical names with
+    # 'custom' or we DO NOT guarantee that it will work with any features
+    # that this provides.
+    CANONICAL_NAME = None
+
+    def __init__(self, session=None):
+        self.session = session
+
+    def load(self):
+        """
+        Loads the credentials from their source & sets them on the object.
+
+        Subclasses should implement this method (by reading from disk, the
+        environment, the network or wherever), returning ``True`` if they were
+        found & loaded.
+
+        If not found, this method should return ``False``, indictating that the
+        ``CredentialResolver`` should fall back to the next available method.
+
+        The default implementation does nothing, assuming the user has set the
+        ``access_key/secret_key/token`` themselves.
+
+        :returns: Whether credentials were found & set
+        :rtype: Credentials
+        """
+        return True
+
+    def _extract_creds_from_mapping(self, mapping, *key_names):
+        found = []
+        for key_name in key_names:
+            try:
+                found.append(mapping[key_name])
+            except KeyError:
+                raise PartialCredentialsError(
+                    provider=self.METHOD, cred_var=key_name
+                )
+        return found
+
+
+class ProcessProvider(CredentialProvider):
+    METHOD = 'custom-process'
+
+    def __init__(self, profile_name, load_config, popen=subprocess.Popen):
+        self._profile_name = profile_name
+        self._load_config = load_config
+        self._loaded_config = None
+        self._popen = popen
+
+    def load(self):
+        credential_process = self._credential_process
+        if credential_process is None:
+            return
+
+        creds_dict = self._retrieve_credentials_using(credential_process)
+        if creds_dict.get('expiry_time') is not None:
+            return RefreshableCredentials.create_from_metadata(
+                creds_dict,
+                lambda: self._retrieve_credentials_using(credential_process),
+                self.METHOD,
+            )
+
+        return Credentials(
+            access_key=creds_dict['access_key'],
+            secret_key=creds_dict['secret_key'],
+            token=creds_dict.get('token'),
+            method=self.METHOD,
+            account_id=creds_dict.get('account_id'),
+        )
+
+    def _retrieve_credentials_using(self, credential_process):
+        # We're not using shell=True, so we need to pass the
+        # command and all arguments as a list.
+        process_list = compat_shell_split(credential_process)
+        p = self._popen(
+            process_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+        )
+        stdout, stderr = p.communicate()
+        if p.returncode != 0:
+            raise CredentialRetrievalError(
+                provider=self.METHOD, error_msg=stderr.decode('utf-8')
+            )
+        parsed = botocore.compat.json.loads(stdout.decode('utf-8'))
+        version = parsed.get('Version', '<Version key not provided>')
+        if version != 1:
+            raise CredentialRetrievalError(
+                provider=self.METHOD,
+                error_msg=(
+                    f"Unsupported version '{version}' for credential process "
+                    f"provider, supported versions: 1"
+                ),
+            )
+        try:
+            return {
+                'access_key': parsed['AccessKeyId'],
+                'secret_key': parsed['SecretAccessKey'],
+                'token': parsed.get('SessionToken'),
+                'expiry_time': parsed.get('Expiration'),
+                'account_id': self._get_account_id(parsed),
+            }
+        except KeyError as e:
+            raise CredentialRetrievalError(
+                provider=self.METHOD,
+                error_msg=f"Missing required key in response: {e}",
+            )
+
+    @property
+    def _credential_process(self):
+        return self.profile_config.get('credential_process')
+
+    @property
+    def profile_config(self):
+        if self._loaded_config is None:
+            self._loaded_config = self._load_config()
+        profile_config = self._loaded_config.get('profiles', {}).get(
+            self._profile_name, {}
+        )
+        return profile_config
+
+    def _get_account_id(self, parsed):
+        account_id = parsed.get('AccountId')
+        return account_id or self.profile_config.get('aws_account_id')
+
+
+class InstanceMetadataProvider(CredentialProvider):
+    METHOD = 'iam-role'
+    CANONICAL_NAME = 'Ec2InstanceMetadata'
+
+    def __init__(self, iam_role_fetcher):
+        self._role_fetcher = iam_role_fetcher
+
+    def load(self):
+        fetcher = self._role_fetcher
+        # We do the first request, to see if we get useful data back.
+        # If not, we'll pass & move on to whatever's next in the credential
+        # chain.
+        metadata = fetcher.retrieve_iam_role_credentials()
+        if not metadata:
+            return None
+        logger.info(
+            'Found credentials from IAM Role: %s', metadata['role_name']
+        )
+        # We manually set the data here, since we already made the request &
+        # have it. When the expiry is hit, the credentials will auto-refresh
+        # themselves.
+        creds = RefreshableCredentials.create_from_metadata(
+            metadata,
+            method=self.METHOD,
+            refresh_using=fetcher.retrieve_iam_role_credentials,
+        )
+        return creds
+
+
+class EnvProvider(CredentialProvider):
+    METHOD = 'env'
+    CANONICAL_NAME = 'Environment'
+    ACCESS_KEY = 'AWS_ACCESS_KEY_ID'
+    SECRET_KEY = 'AWS_SECRET_ACCESS_KEY'
+    # The token can come from either of these env var.
+    # AWS_SESSION_TOKEN is what other AWS SDKs have standardized on.
+    TOKENS = ['AWS_SECURITY_TOKEN', 'AWS_SESSION_TOKEN']
+    EXPIRY_TIME = 'AWS_CREDENTIAL_EXPIRATION'
+    ACCOUNT_ID = 'AWS_ACCOUNT_ID'
+
+    def __init__(self, environ=None, mapping=None):
+        """
+
+        :param environ: The environment variables (defaults to
+            ``os.environ`` if no value is provided).
+        :param mapping: An optional mapping of variable names to
+            environment variable names.  Use this if you want to
+            change the mapping of access_key->AWS_ACCESS_KEY_ID, etc.
+            The dict can have up to 5 keys:
+            * ``access_key``
+            * ``secret_key``
+            * ``token``
+            * ``expiry_time``
+            * ``account_id``
+        """
+        if environ is None:
+            environ = os.environ
+        self.environ = environ
+        self._mapping = self._build_mapping(mapping)
+
+    def _build_mapping(self, mapping):
+        # Mapping of variable name to env var name.
+        var_mapping = {}
+        if mapping is None:
+            # Use the class var default.
+            var_mapping['access_key'] = self.ACCESS_KEY
+            var_mapping['secret_key'] = self.SECRET_KEY
+            var_mapping['token'] = self.TOKENS
+            var_mapping['expiry_time'] = self.EXPIRY_TIME
+            var_mapping['account_id'] = self.ACCOUNT_ID
+        else:
+            var_mapping['access_key'] = mapping.get(
+                'access_key', self.ACCESS_KEY
+            )
+            var_mapping['secret_key'] = mapping.get(
+                'secret_key', self.SECRET_KEY
+            )
+            var_mapping['token'] = mapping.get('token', self.TOKENS)
+            if not isinstance(var_mapping['token'], list):
+                var_mapping['token'] = [var_mapping['token']]
+            var_mapping['expiry_time'] = mapping.get(
+                'expiry_time', self.EXPIRY_TIME
+            )
+            var_mapping['account_id'] = mapping.get(
+                'account_id', self.ACCOUNT_ID
+            )
+        return var_mapping
+
+    def load(self):
+        """
+        Search for credentials in explicit environment variables.
+        """
+
+        access_key = self.environ.get(self._mapping['access_key'], '')
+
+        if access_key:
+            logger.info('Found credentials in environment variables.')
+            fetcher = self._create_credentials_fetcher()
+            credentials = fetcher(require_expiry=False)
+
+            expiry_time = credentials['expiry_time']
+            if expiry_time is not None:
+                expiry_time = parse(expiry_time)
+                return RefreshableCredentials(
+                    credentials['access_key'],
+                    credentials['secret_key'],
+                    credentials['token'],
+                    expiry_time,
+                    refresh_using=fetcher,
+                    method=self.METHOD,
+                    account_id=credentials['account_id'],
+                )
+
+            return Credentials(
+                credentials['access_key'],
+                credentials['secret_key'],
+                credentials['token'],
+                method=self.METHOD,
+                account_id=credentials['account_id'],
+            )
+        else:
+            return None
+
+    def _create_credentials_fetcher(self):
+        mapping = self._mapping
+        method = self.METHOD
+        environ = self.environ
+
+        def fetch_credentials(require_expiry=True):
+            credentials = {}
+
+            access_key = environ.get(mapping['access_key'], '')
+            if not access_key:
+                raise PartialCredentialsError(
+                    provider=method, cred_var=mapping['access_key']
+                )
+            credentials['access_key'] = access_key
+
+            secret_key = environ.get(mapping['secret_key'], '')
+            if not secret_key:
+                raise PartialCredentialsError(
+                    provider=method, cred_var=mapping['secret_key']
+                )
+            credentials['secret_key'] = secret_key
+
+            credentials['token'] = None
+            for token_env_var in mapping['token']:
+                token = environ.get(token_env_var, '')
+                if token:
+                    credentials['token'] = token
+                    break
+
+            credentials['expiry_time'] = None
+            expiry_time = environ.get(mapping['expiry_time'], '')
+            if expiry_time:
+                credentials['expiry_time'] = expiry_time
+            if require_expiry and not expiry_time:
+                raise PartialCredentialsError(
+                    provider=method, cred_var=mapping['expiry_time']
+                )
+
+            credentials['account_id'] = None
+            account_id = environ.get(mapping['account_id'], '')
+            if account_id:
+                credentials['account_id'] = account_id
+
+            return credentials
+
+        return fetch_credentials
+
+
+class OriginalEC2Provider(CredentialProvider):
+    METHOD = 'ec2-credentials-file'
+    CANONICAL_NAME = 'Ec2Config'
+
+    CRED_FILE_ENV = 'AWS_CREDENTIAL_FILE'
+    ACCESS_KEY = 'AWSAccessKeyId'
+    SECRET_KEY = 'AWSSecretKey'
+
+    def __init__(self, environ=None, parser=None):
+        if environ is None:
+            environ = os.environ
+        if parser is None:
+            parser = parse_key_val_file
+        self._environ = environ
+        self._parser = parser
+
+    def load(self):
+        """
+        Search for a credential file used by original EC2 CLI tools.
+        """
+        if 'AWS_CREDENTIAL_FILE' in self._environ:
+            full_path = os.path.expanduser(
+                self._environ['AWS_CREDENTIAL_FILE']
+            )
+            creds = self._parser(full_path)
+            if self.ACCESS_KEY in creds:
+                logger.info('Found credentials in AWS_CREDENTIAL_FILE.')
+                access_key = creds[self.ACCESS_KEY]
+                secret_key = creds[self.SECRET_KEY]
+                # EC2 creds file doesn't support session tokens.
+                return Credentials(access_key, secret_key, method=self.METHOD)
+        else:
+            return None
+
+
+class SharedCredentialProvider(CredentialProvider):
+    METHOD = 'shared-credentials-file'
+    CANONICAL_NAME = 'SharedCredentials'
+
+    ACCESS_KEY = 'aws_access_key_id'
+    SECRET_KEY = 'aws_secret_access_key'
+    # Same deal as the EnvProvider above.  Botocore originally supported
+    # aws_security_token, but the SDKs are standardizing on aws_session_token
+    # so we support both.
+    TOKENS = ['aws_security_token', 'aws_session_token']
+    ACCOUNT_ID = 'aws_account_id'
+
+    def __init__(self, creds_filename, profile_name=None, ini_parser=None):
+        self._creds_filename = creds_filename
+        if profile_name is None:
+            profile_name = 'default'
+        self._profile_name = profile_name
+        if ini_parser is None:
+            ini_parser = botocore.configloader.raw_config_parse
+        self._ini_parser = ini_parser
+
+    def load(self):
+        try:
+            available_creds = self._ini_parser(self._creds_filename)
+        except ConfigNotFound:
+            return None
+        if self._profile_name in available_creds:
+            config = available_creds[self._profile_name]
+            if self.ACCESS_KEY in config:
+                logger.info(
+                    "Found credentials in shared credentials file: %s",
+                    self._creds_filename,
+                )
+                access_key, secret_key = self._extract_creds_from_mapping(
+                    config, self.ACCESS_KEY, self.SECRET_KEY
+                )
+                token = self._get_session_token(config)
+                account_id = self._get_account_id(config)
+                return Credentials(
+                    access_key,
+                    secret_key,
+                    token,
+                    method=self.METHOD,
+                    account_id=account_id,
+                )
+
+    def _get_session_token(self, config):
+        for token_envvar in self.TOKENS:
+            if token_envvar in config:
+                return config[token_envvar]
+
+    def _get_account_id(self, config):
+        return config.get(self.ACCOUNT_ID)
+
+
+class ConfigProvider(CredentialProvider):
+    """INI based config provider with profile sections."""
+
+    METHOD = 'config-file'
+    CANONICAL_NAME = 'SharedConfig'
+
+    ACCESS_KEY = 'aws_access_key_id'
+    SECRET_KEY = 'aws_secret_access_key'
+    # Same deal as the EnvProvider above.  Botocore originally supported
+    # aws_security_token, but the SDKs are standardizing on aws_session_token
+    # so we support both.
+    TOKENS = ['aws_security_token', 'aws_session_token']
+    ACCOUNT_ID = 'aws_account_id'
+
+    def __init__(self, config_filename, profile_name, config_parser=None):
+        """
+
+        :param config_filename: The session configuration scoped to the current
+            profile.  This is available via ``session.config``.
+        :param profile_name: The name of the current profile.
+        :param config_parser: A config parser callable.
+
+        """
+        self._config_filename = config_filename
+        self._profile_name = profile_name
+        if config_parser is None:
+            config_parser = botocore.configloader.load_config
+        self._config_parser = config_parser
+
+    def load(self):
+        """
+        If there is are credentials in the configuration associated with
+        the session, use those.
+        """
+        try:
+            full_config = self._config_parser(self._config_filename)
+        except ConfigNotFound:
+            return None
+        if self._profile_name in full_config['profiles']:
+            profile_config = full_config['profiles'][self._profile_name]
+            if self.ACCESS_KEY in profile_config:
+                logger.info(
+                    "Credentials found in config file: %s",
+                    self._config_filename,
+                )
+                access_key, secret_key = self._extract_creds_from_mapping(
+                    profile_config, self.ACCESS_KEY, self.SECRET_KEY
+                )
+                token = self._get_session_token(profile_config)
+                account_id = self._get_account_id(profile_config)
+                return Credentials(
+                    access_key,
+                    secret_key,
+                    token,
+                    method=self.METHOD,
+                    account_id=account_id,
+                )
+        else:
+            return None
+
+    def _get_session_token(self, profile_config):
+        for token_name in self.TOKENS:
+            if token_name in profile_config:
+                return profile_config[token_name]
+
+    def _get_account_id(self, config):
+        return config.get(self.ACCOUNT_ID)
+
+
+class BotoProvider(CredentialProvider):
+    METHOD = 'boto-config'
+    CANONICAL_NAME = 'Boto2Config'
+
+    BOTO_CONFIG_ENV = 'BOTO_CONFIG'
+    DEFAULT_CONFIG_FILENAMES = ['/etc/boto.cfg', '~/.boto']
+    ACCESS_KEY = 'aws_access_key_id'
+    SECRET_KEY = 'aws_secret_access_key'
+
+    def __init__(self, environ=None, ini_parser=None):
+        if environ is None:
+            environ = os.environ
+        if ini_parser is None:
+            ini_parser = botocore.configloader.raw_config_parse
+        self._environ = environ
+        self._ini_parser = ini_parser
+
+    def load(self):
+        """
+        Look for credentials in boto config file.
+        """
+        if self.BOTO_CONFIG_ENV in self._environ:
+            potential_locations = [self._environ[self.BOTO_CONFIG_ENV]]
+        else:
+            potential_locations = self.DEFAULT_CONFIG_FILENAMES
+        for filename in potential_locations:
+            try:
+                config = self._ini_parser(filename)
+            except ConfigNotFound:
+                # Move on to the next potential config file name.
+                continue
+            if 'Credentials' in config:
+                credentials = config['Credentials']
+                if self.ACCESS_KEY in credentials:
+                    logger.info(
+                        "Found credentials in boto config file: %s", filename
+                    )
+                    access_key, secret_key = self._extract_creds_from_mapping(
+                        credentials, self.ACCESS_KEY, self.SECRET_KEY
+                    )
+                    return Credentials(
+                        access_key, secret_key, method=self.METHOD
+                    )
+
+
+class AssumeRoleProvider(CredentialProvider):
+    METHOD = 'assume-role'
+    # The AssumeRole provider is logically part of the SharedConfig and
+    # SharedCredentials providers. Since the purpose of the canonical name
+    # is to provide cross-sdk compatibility, calling code will need to be
+    # aware that either of those providers should be tied to the AssumeRole
+    # provider as much as possible.
+    CANONICAL_NAME = None
+    ROLE_CONFIG_VAR = 'role_arn'
+    WEB_IDENTITY_TOKE_FILE_VAR = 'web_identity_token_file'
+    # Credentials are considered expired (and will be refreshed) once the total
+    # remaining time left until the credentials expires is less than the
+    # EXPIRY_WINDOW.
+    EXPIRY_WINDOW_SECONDS = 60 * 15
+
+    def __init__(
+        self,
+        load_config,
+        client_creator,
+        cache,
+        profile_name,
+        prompter=getpass.getpass,
+        credential_sourcer=None,
+        profile_provider_builder=None,
+    ):
+        """
+        :type load_config: callable
+        :param load_config: A function that accepts no arguments, and
+            when called, will return the full configuration dictionary
+            for the session (``session.full_config``).
+
+        :type client_creator: callable
+        :param client_creator: A factory function that will create
+            a client when called.  Has the same interface as
+            ``botocore.session.Session.create_client``.
+
+        :type cache: dict
+        :param cache: An object that supports ``__getitem__``,
+            ``__setitem__``, and ``__contains__``.  An example
+            of this is the ``JSONFileCache`` class in the CLI.
+
+        :type profile_name: str
+        :param profile_name: The name of the profile.
+
+        :type prompter: callable
+        :param prompter: A callable that returns input provided
+            by the user (i.e raw_input, getpass.getpass, etc.).
+
+        :type credential_sourcer: CanonicalNameCredentialSourcer
+        :param credential_sourcer: A credential provider that takes a
+            configuration, which is used to provide the source credentials
+            for the STS call.
+        """
+        #: The cache used to first check for assumed credentials.
+        #: This is checked before making the AssumeRole API
+        #: calls and can be useful if you have short lived
+        #: scripts and you'd like to avoid calling AssumeRole
+        #: until the credentials are expired.
+        self.cache = cache
+        self._load_config = load_config
+        # client_creator is a callable that creates function.
+        # It's basically session.create_client
+        self._client_creator = client_creator
+        self._profile_name = profile_name
+        self._prompter = prompter
+        # The _loaded_config attribute will be populated from the
+        # load_config() function once the configuration is actually
+        # loaded.  The reason we go through all this instead of just
+        # requiring that the loaded_config be passed to us is to that
+        # we can defer configuration loaded until we actually try
+        # to load credentials (as opposed to when the object is
+        # instantiated).
+        self._loaded_config = {}
+        self._credential_sourcer = credential_sourcer
+        self._profile_provider_builder = profile_provider_builder
+        self._visited_profiles = [self._profile_name]
+
+    def load(self):
+        self._loaded_config = self._load_config()
+        profiles = self._loaded_config.get('profiles', {})
+        profile = profiles.get(self._profile_name, {})
+        if self._has_assume_role_config_vars(profile):
+            return self._load_creds_via_assume_role(self._profile_name)
+
+    def _has_assume_role_config_vars(self, profile):
+        return (
+            self.ROLE_CONFIG_VAR in profile
+            and
+            # We need to ensure this provider doesn't look at a profile when
+            # the profile has configuration for web identity. Simply relying on
+            # the order in the credential chain is insufficient as it doesn't
+            # prevent the case when we're doing an assume role chain.
+            self.WEB_IDENTITY_TOKE_FILE_VAR not in profile
+        )
+
+    def _load_creds_via_assume_role(self, profile_name):
+        role_config = self._get_role_config(profile_name)
+        source_credentials = self._resolve_source_credentials(
+            role_config, profile_name
+        )
+
+        extra_args = {}
+        role_session_name = role_config.get('role_session_name')
+        if role_session_name is not None:
+            extra_args['RoleSessionName'] = role_session_name
+
+        external_id = role_config.get('external_id')
+        if external_id is not None:
+            extra_args['ExternalId'] = external_id
+
+        mfa_serial = role_config.get('mfa_serial')
+        if mfa_serial is not None:
+            extra_args['SerialNumber'] = mfa_serial
+
+        duration_seconds = role_config.get('duration_seconds')
+        if duration_seconds is not None:
+            extra_args['DurationSeconds'] = duration_seconds
+
+        fetcher = AssumeRoleCredentialFetcher(
+            client_creator=self._client_creator,
+            source_credentials=source_credentials,
+            role_arn=role_config['role_arn'],
+            extra_args=extra_args,
+            mfa_prompter=self._prompter,
+            cache=self.cache,
+        )
+        refresher = fetcher.fetch_credentials
+        if mfa_serial is not None:
+            refresher = create_mfa_serial_refresher(refresher)
+
+        # The initial credentials are empty and the expiration time is set
+        # to now so that we can delay the call to assume role until it is
+        # strictly needed.
+        return DeferredRefreshableCredentials(
+            method=self.METHOD,
+            refresh_using=refresher,
+            time_fetcher=_local_now,
+        )
+
+    def _get_role_config(self, profile_name):
+        """Retrieves and validates the role configuration for the profile."""
+        profiles = self._loaded_config.get('profiles', {})
+
+        profile = profiles[profile_name]
+        source_profile = profile.get('source_profile')
+        role_arn = profile['role_arn']
+        credential_source = profile.get('credential_source')
+        mfa_serial = profile.get('mfa_serial')
+        external_id = profile.get('external_id')
+        role_session_name = profile.get('role_session_name')
+        duration_seconds = profile.get('duration_seconds')
+
+        role_config = {
+            'role_arn': role_arn,
+            'external_id': external_id,
+            'mfa_serial': mfa_serial,
+            'role_session_name': role_session_name,
+            'source_profile': source_profile,
+            'credential_source': credential_source,
+        }
+
+        if duration_seconds is not None:
+            try:
+                role_config['duration_seconds'] = int(duration_seconds)
+            except ValueError:
+                pass
+
+        # Either the credential source or the source profile must be
+        # specified, but not both.
+        if credential_source is not None and source_profile is not None:
+            raise InvalidConfigError(
+                error_msg=(
+                    f'The profile "{profile_name}" contains both '
+                    'source_profile and credential_source.'
+                )
+            )
+        elif credential_source is None and source_profile is None:
+            raise PartialCredentialsError(
+                provider=self.METHOD,
+                cred_var='source_profile or credential_source',
+            )
+        elif credential_source is not None:
+            self._validate_credential_source(profile_name, credential_source)
+        else:
+            self._validate_source_profile(profile_name, source_profile)
+
+        return role_config
+
+    def _validate_credential_source(self, parent_profile, credential_source):
+        if self._credential_sourcer is None:
+            raise InvalidConfigError(
+                error_msg=(
+                    f"The credential_source \"{credential_source}\" is specified "
+                    f"in profile \"{parent_profile}\", "
+                    f"but no source provider was configured."
+                )
+            )
+        if not self._credential_sourcer.is_supported(credential_source):
+            raise InvalidConfigError(
+                error_msg=(
+                    f"The credential source \"{credential_source}\" referenced "
+                    f"in profile \"{parent_profile}\" is not valid."
+                )
+            )
+
+    def _source_profile_has_credentials(self, profile):
+        return any(
+            [
+                self._has_static_credentials(profile),
+                self._has_assume_role_config_vars(profile),
+            ]
+        )
+
+    def _validate_source_profile(
+        self, parent_profile_name, source_profile_name
+    ):
+        profiles = self._loaded_config.get('profiles', {})
+        if source_profile_name not in profiles:
+            raise InvalidConfigError(
+                error_msg=(
+                    f"The source_profile \"{source_profile_name}\" referenced in "
+                    f"the profile \"{parent_profile_name}\" does not exist."
+                )
+            )
+
+        source_profile = profiles[source_profile_name]
+
+        # Make sure we aren't going into an infinite loop. If we haven't
+        # visited the profile yet, we're good.
+        if source_profile_name not in self._visited_profiles:
+            return
+
+        # If we have visited the profile and the profile isn't simply
+        # referencing itself, that's an infinite loop.
+        if source_profile_name != parent_profile_name:
+            raise InfiniteLoopConfigError(
+                source_profile=source_profile_name,
+                visited_profiles=self._visited_profiles,
+            )
+
+        # A profile is allowed to reference itself so that it can source
+        # static credentials and have configuration all in the same
+        # profile. This will only ever work for the top level assume
+        # role because the static credentials will otherwise take
+        # precedence.
+        if not self._has_static_credentials(source_profile):
+            raise InfiniteLoopConfigError(
+                source_profile=source_profile_name,
+                visited_profiles=self._visited_profiles,
+            )
+
+    def _has_static_credentials(self, profile):
+        static_keys = ['aws_secret_access_key', 'aws_access_key_id']
+        return any(static_key in profile for static_key in static_keys)
+
+    def _resolve_source_credentials(self, role_config, profile_name):
+        credential_source = role_config.get('credential_source')
+        if credential_source is not None:
+            return self._resolve_credentials_from_source(
+                credential_source, profile_name
+            )
+
+        source_profile = role_config['source_profile']
+        self._visited_profiles.append(source_profile)
+        return self._resolve_credentials_from_profile(source_profile)
+
+    def _resolve_credentials_from_profile(self, profile_name):
+        profiles = self._loaded_config.get('profiles', {})
+        profile = profiles[profile_name]
+
+        if (
+            self._has_static_credentials(profile)
+            and not self._profile_provider_builder
+        ):
+            # This is only here for backwards compatibility. If this provider
+            # isn't given a profile provider builder we still want to be able
+            # to handle the basic static credential case as we would before the
+            # profile provider builder parameter was added.
+            return self._resolve_static_credentials_from_profile(profile)
+        elif self._has_static_credentials(
+            profile
+        ) or not self._has_assume_role_config_vars(profile):
+            profile_providers = self._profile_provider_builder.providers(
+                profile_name=profile_name,
+                disable_env_vars=True,
+            )
+            profile_chain = CredentialResolver(profile_providers)
+            credentials = profile_chain.load_credentials()
+            if credentials is None:
+                error_message = (
+                    'The source profile "%s" must have credentials.'
+                )
+                raise InvalidConfigError(
+                    error_msg=error_message % profile_name,
+                )
+            return credentials
+
+        return self._load_creds_via_assume_role(profile_name)
+
+    def _resolve_static_credentials_from_profile(self, profile):
+        try:
+            return Credentials(
+                access_key=profile['aws_access_key_id'],
+                secret_key=profile['aws_secret_access_key'],
+                token=profile.get('aws_session_token'),
+            )
+        except KeyError as e:
+            raise PartialCredentialsError(
+                provider=self.METHOD, cred_var=str(e)
+            )
+
+    def _resolve_credentials_from_source(
+        self, credential_source, profile_name
+    ):
+        credentials = self._credential_sourcer.source_credentials(
+            credential_source
+        )
+        if credentials is None:
+            raise CredentialRetrievalError(
+                provider=credential_source,
+                error_msg=(
+                    'No credentials found in credential_source referenced '
+                    f'in profile {profile_name}'
+                ),
+            )
+        return credentials
+
+
+class AssumeRoleWithWebIdentityProvider(CredentialProvider):
+    METHOD = 'assume-role-with-web-identity'
+    CANONICAL_NAME = None
+    _CONFIG_TO_ENV_VAR = {
+        'web_identity_token_file': 'AWS_WEB_IDENTITY_TOKEN_FILE',
+        'role_session_name': 'AWS_ROLE_SESSION_NAME',
+        'role_arn': 'AWS_ROLE_ARN',
+    }
+
+    def __init__(
+        self,
+        load_config,
+        client_creator,
+        profile_name,
+        cache=None,
+        disable_env_vars=False,
+        token_loader_cls=None,
+    ):
+        self.cache = cache
+        self._load_config = load_config
+        self._client_creator = client_creator
+        self._profile_name = profile_name
+        self._profile_config = None
+        self._disable_env_vars = disable_env_vars
+        if token_loader_cls is None:
+            token_loader_cls = FileWebIdentityTokenLoader
+        self._token_loader_cls = token_loader_cls
+
+    def load(self):
+        return self._assume_role_with_web_identity()
+
+    def _get_profile_config(self, key):
+        if self._profile_config is None:
+            loaded_config = self._load_config()
+            profiles = loaded_config.get('profiles', {})
+            self._profile_config = profiles.get(self._profile_name, {})
+        return self._profile_config.get(key)
+
+    def _get_env_config(self, key):
+        if self._disable_env_vars:
+            return None
+        env_key = self._CONFIG_TO_ENV_VAR.get(key)
+        if env_key and env_key in os.environ:
+            return os.environ[env_key]
+        return None
+
+    def _get_config(self, key):
+        env_value = self._get_env_config(key)
+        if env_value is not None:
+            return env_value
+        return self._get_profile_config(key)
+
+    def _assume_role_with_web_identity(self):
+        token_path = self._get_config('web_identity_token_file')
+        if not token_path:
+            return None
+        token_loader = self._token_loader_cls(token_path)
+
+        role_arn = self._get_config('role_arn')
+        if not role_arn:
+            error_msg = (
+                'The provided profile or the current environment is '
+                'configured to assume role with web identity but has no '
+                'role ARN configured. Ensure that the profile has the role_arn'
+                'configuration set or the AWS_ROLE_ARN env var is set.'
+            )
+            raise InvalidConfigError(error_msg=error_msg)
+
+        extra_args = {}
+        role_session_name = self._get_config('role_session_name')
+        if role_session_name is not None:
+            extra_args['RoleSessionName'] = role_session_name
+
+        fetcher = AssumeRoleWithWebIdentityCredentialFetcher(
+            client_creator=self._client_creator,
+            web_identity_token_loader=token_loader,
+            role_arn=role_arn,
+            extra_args=extra_args,
+            cache=self.cache,
+        )
+        # The initial credentials are empty and the expiration time is set
+        # to now so that we can delay the call to assume role until it is
+        # strictly needed.
+        return DeferredRefreshableCredentials(
+            method=self.METHOD,
+            refresh_using=fetcher.fetch_credentials,
+        )
+
+
+class CanonicalNameCredentialSourcer:
+    def __init__(self, providers):
+        self._providers = providers
+
+    def is_supported(self, source_name):
+        """Validates a given source name.
+
+        :type source_name: str
+        :param source_name: The value of credential_source in the config
+            file. This is the canonical name of the credential provider.
+
+        :rtype: bool
+        :returns: True if the credential provider is supported,
+            False otherwise.
+        """
+        return source_name in [p.CANONICAL_NAME for p in self._providers]
+
+    def source_credentials(self, source_name):
+        """Loads source credentials based on the provided configuration.
+
+        :type source_name: str
+        :param source_name: The value of credential_source in the config
+            file. This is the canonical name of the credential provider.
+
+        :rtype: Credentials
+        """
+        source = self._get_provider(source_name)
+        if isinstance(source, CredentialResolver):
+            return source.load_credentials()
+        return source.load()
+
+    def _get_provider(self, canonical_name):
+        """Return a credential provider by its canonical name.
+
+        :type canonical_name: str
+        :param canonical_name: The canonical name of the provider.
+
+        :raises UnknownCredentialError: Raised if no
+            credential provider by the provided name
+            is found.
+        """
+        provider = self._get_provider_by_canonical_name(canonical_name)
+
+        # The AssumeRole provider should really be part of the SharedConfig
+        # provider rather than being its own thing, but it is not. It is
+        # effectively part of both the SharedConfig provider and the
+        # SharedCredentials provider now due to the way it behaves.
+        # Therefore if we want either of those providers we should return
+        # the AssumeRole provider with it.
+        if canonical_name.lower() in ['sharedconfig', 'sharedcredentials']:
+            assume_role_provider = self._get_provider_by_method('assume-role')
+            if assume_role_provider is not None:
+                # The SharedConfig or SharedCredentials provider may not be
+                # present if it was removed for some reason, but the
+                # AssumeRole provider could still be present. In that case,
+                # return the assume role provider by itself.
+                if provider is None:
+                    return assume_role_provider
+
+                # If both are present, return them both as a
+                # CredentialResolver so that calling code can treat them as
+                # a single entity.
+                return CredentialResolver([assume_role_provider, provider])
+
+        if provider is None:
+            raise UnknownCredentialError(name=canonical_name)
+
+        return provider
+
+    def _get_provider_by_canonical_name(self, canonical_name):
+        """Return a credential provider by its canonical name.
+
+        This function is strict, it does not attempt to address
+        compatibility issues.
+        """
+        for provider in self._providers:
+            name = provider.CANONICAL_NAME
+            # Canonical names are case-insensitive
+            if name and name.lower() == canonical_name.lower():
+                return provider
+
+    def _get_provider_by_method(self, method):
+        """Return a credential provider by its METHOD name."""
+        for provider in self._providers:
+            if provider.METHOD == method:
+                return provider
+
+
+class ContainerProvider(CredentialProvider):
+    METHOD = 'container-role'
+    CANONICAL_NAME = 'EcsContainer'
+    ENV_VAR = 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI'
+    ENV_VAR_FULL = 'AWS_CONTAINER_CREDENTIALS_FULL_URI'
+    ENV_VAR_AUTH_TOKEN = 'AWS_CONTAINER_AUTHORIZATION_TOKEN'
+    ENV_VAR_AUTH_TOKEN_FILE = 'AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE'
+
+    def __init__(self, environ=None, fetcher=None):
+        if environ is None:
+            environ = os.environ
+        if fetcher is None:
+            fetcher = ContainerMetadataFetcher()
+        self._environ = environ
+        self._fetcher = fetcher
+
+    def load(self):
+        # This cred provider is only triggered if the self.ENV_VAR is set,
+        # which only happens if you opt into this feature.
+        if self.ENV_VAR in self._environ or self.ENV_VAR_FULL in self._environ:
+            return self._retrieve_or_fail()
+
+    def _retrieve_or_fail(self):
+        if self._provided_relative_uri():
+            full_uri = self._fetcher.full_url(self._environ[self.ENV_VAR])
+        else:
+            full_uri = self._environ[self.ENV_VAR_FULL]
+        fetcher = self._create_fetcher(full_uri)
+        creds = fetcher()
+        return RefreshableCredentials(
+            access_key=creds['access_key'],
+            secret_key=creds['secret_key'],
+            token=creds['token'],
+            method=self.METHOD,
+            expiry_time=_parse_if_needed(creds['expiry_time']),
+            refresh_using=fetcher,
+            account_id=creds.get('account_id'),
+        )
+
+    def _build_headers(self):
+        auth_token = None
+        if self.ENV_VAR_AUTH_TOKEN_FILE in self._environ:
+            auth_token_file_path = self._environ[self.ENV_VAR_AUTH_TOKEN_FILE]
+            with open(auth_token_file_path) as token_file:
+                auth_token = token_file.read()
+        elif self.ENV_VAR_AUTH_TOKEN in self._environ:
+            auth_token = self._environ[self.ENV_VAR_AUTH_TOKEN]
+        if auth_token is not None:
+            self._validate_auth_token(auth_token)
+            return {'Authorization': auth_token}
+
+    def _validate_auth_token(self, auth_token):
+        if "\r" in auth_token or "\n" in auth_token:
+            raise ValueError("Auth token value is not a legal header value")
+
+    def _create_fetcher(self, full_uri, *args, **kwargs):
+        def fetch_creds():
+            try:
+                headers = self._build_headers()
+                response = self._fetcher.retrieve_full_uri(
+                    full_uri, headers=headers
+                )
+            except MetadataRetrievalError as e:
+                logger.debug(
+                    "Error retrieving container metadata: %s", e, exc_info=True
+                )
+                raise CredentialRetrievalError(
+                    provider=self.METHOD, error_msg=str(e)
+                )
+            return {
+                'access_key': response['AccessKeyId'],
+                'secret_key': response['SecretAccessKey'],
+                'token': response['Token'],
+                'expiry_time': response['Expiration'],
+                'account_id': response.get('AccountId'),
+            }
+
+        return fetch_creds
+
+    def _provided_relative_uri(self):
+        return self.ENV_VAR in self._environ
+
+
+class CredentialResolver:
+    def __init__(self, providers):
+        """
+
+        :param providers: A list of ``CredentialProvider`` instances.
+
+        """
+        self.providers = providers
+
+    def insert_before(self, name, credential_provider):
+        """
+        Inserts a new instance of ``CredentialProvider`` into the chain that
+        will be tried before an existing one.
+
+        :param name: The short name of the credentials you'd like to insert the
+            new credentials before. (ex. ``env`` or ``config``). Existing names
+            & ordering can be discovered via ``self.available_methods``.
+        :type name: string
+
+        :param cred_instance: An instance of the new ``Credentials`` object
+            you'd like to add to the chain.
+        :type cred_instance: A subclass of ``Credentials``
+        """
+        try:
+            offset = [p.METHOD for p in self.providers].index(name)
+        except ValueError:
+            raise UnknownCredentialError(name=name)
+        self.providers.insert(offset, credential_provider)
+
+    def insert_after(self, name, credential_provider):
+        """
+        Inserts a new type of ``Credentials`` instance into the chain that will
+        be tried after an existing one.
+
+        :param name: The short name of the credentials you'd like to insert the
+            new credentials after. (ex. ``env`` or ``config``). Existing names
+            & ordering can be discovered via ``self.available_methods``.
+        :type name: string
+
+        :param cred_instance: An instance of the new ``Credentials`` object
+            you'd like to add to the chain.
+        :type cred_instance: A subclass of ``Credentials``
+        """
+        offset = self._get_provider_offset(name)
+        self.providers.insert(offset + 1, credential_provider)
+
+    def remove(self, name):
+        """
+        Removes a given ``Credentials`` instance from the chain.
+
+        :param name: The short name of the credentials instance to remove.
+        :type name: string
+        """
+        available_methods = [p.METHOD for p in self.providers]
+        if name not in available_methods:
+            # It's not present. Fail silently.
+            return
+
+        offset = available_methods.index(name)
+        self.providers.pop(offset)
+
+    def get_provider(self, name):
+        """Return a credential provider by name.
+
+        :type name: str
+        :param name: The name of the provider.
+
+        :raises UnknownCredentialError: Raised if no
+            credential provider by the provided name
+            is found.
+        """
+        return self.providers[self._get_provider_offset(name)]
+
+    def _get_provider_offset(self, name):
+        try:
+            return [p.METHOD for p in self.providers].index(name)
+        except ValueError:
+            raise UnknownCredentialError(name=name)
+
+    def load_credentials(self):
+        """
+        Goes through the credentials chain, returning the first ``Credentials``
+        that could be loaded.
+        """
+        # First provider to return a non-None response wins.
+        for provider in self.providers:
+            logger.debug("Looking for credentials via: %s", provider.METHOD)
+            creds = provider.load()
+            if creds is not None:
+                return creds
+
+        # If we got here, no credentials could be found.
+        # This feels like it should be an exception, but historically, ``None``
+        # is returned.
+        #
+        # +1
+        # -js
+        return None
+
+
+class SSOCredentialFetcher(CachedCredentialFetcher):
+    _UTC_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
+
+    def __init__(
+        self,
+        start_url,
+        sso_region,
+        role_name,
+        account_id,
+        client_creator,
+        token_loader=None,
+        cache=None,
+        expiry_window_seconds=None,
+        token_provider=None,
+        sso_session_name=None,
+    ):
+        self._client_creator = client_creator
+        self._sso_region = sso_region
+        self._role_name = role_name
+        self._account_id = account_id
+        self._start_url = start_url
+        self._token_loader = token_loader
+        self._token_provider = token_provider
+        self._sso_session_name = sso_session_name
+        super().__init__(cache, expiry_window_seconds)
+
+    def _create_cache_key(self):
+        """Create a predictable cache key for the current configuration.
+
+        The cache key is intended to be compatible with file names.
+        """
+        args = {
+            'roleName': self._role_name,
+            'accountId': self._account_id,
+        }
+        if self._sso_session_name:
+            args['sessionName'] = self._sso_session_name
+        else:
+            args['startUrl'] = self._start_url
+        # NOTE: It would be good to hoist this cache key construction logic
+        # into the CachedCredentialFetcher class as we should be consistent.
+        # Unfortunately, the current assume role fetchers that sub class don't
+        # pass separators resulting in non-minified JSON. In the long term,
+        # all fetchers should use the below caching scheme.
+        args = json.dumps(args, sort_keys=True, separators=(',', ':'))
+        argument_hash = sha1(args.encode('utf-8')).hexdigest()
+        return self._make_file_safe(argument_hash)
+
+    def _parse_timestamp(self, timestamp_ms):
+        # fromtimestamp expects seconds so: milliseconds / 1000 = seconds
+        timestamp_seconds = timestamp_ms / 1000.0
+        timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc())
+        return timestamp.strftime(self._UTC_DATE_FORMAT)
+
+    def _get_credentials(self):
+        """Get credentials by calling SSO get role credentials."""
+        config = Config(
+            signature_version=UNSIGNED,
+            region_name=self._sso_region,
+        )
+        client = self._client_creator('sso', config=config)
+        if self._token_provider:
+            initial_token_data = self._token_provider.load_token()
+            token = initial_token_data.get_frozen_token().token
+        else:
+            token = self._token_loader(self._start_url)['accessToken']
+
+        kwargs = {
+            'roleName': self._role_name,
+            'accountId': self._account_id,
+            'accessToken': token,
+        }
+        try:
+            response = client.get_role_credentials(**kwargs)
+        except client.exceptions.UnauthorizedException:
+            raise UnauthorizedSSOTokenError()
+        credentials = response['roleCredentials']
+
+        credentials = {
+            'ProviderType': 'sso',
+            'Credentials': {
+                'AccessKeyId': credentials['accessKeyId'],
+                'SecretAccessKey': credentials['secretAccessKey'],
+                'SessionToken': credentials['sessionToken'],
+                'Expiration': self._parse_timestamp(credentials['expiration']),
+                'AccountId': self._account_id,
+            },
+        }
+        return credentials
+
+
+class SSOProvider(CredentialProvider):
+    METHOD = 'sso'
+
+    _SSO_TOKEN_CACHE_DIR = os.path.expanduser(
+        os.path.join('~', '.aws', 'sso', 'cache')
+    )
+    _PROFILE_REQUIRED_CONFIG_VARS = (
+        'sso_role_name',
+        'sso_account_id',
+    )
+    _SSO_REQUIRED_CONFIG_VARS = (
+        'sso_start_url',
+        'sso_region',
+    )
+    _ALL_REQUIRED_CONFIG_VARS = (
+        _PROFILE_REQUIRED_CONFIG_VARS + _SSO_REQUIRED_CONFIG_VARS
+    )
+
+    def __init__(
+        self,
+        load_config,
+        client_creator,
+        profile_name,
+        cache=None,
+        token_cache=None,
+        token_provider=None,
+    ):
+        if token_cache is None:
+            token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR)
+        self._token_cache = token_cache
+        self._token_provider = token_provider
+        if cache is None:
+            cache = {}
+        self.cache = cache
+        self._load_config = load_config
+        self._client_creator = client_creator
+        self._profile_name = profile_name
+
+    def _load_sso_config(self):
+        loaded_config = self._load_config()
+        profiles = loaded_config.get('profiles', {})
+        profile_name = self._profile_name
+        profile_config = profiles.get(self._profile_name, {})
+        sso_sessions = loaded_config.get('sso_sessions', {})
+
+        # Role name & Account ID indicate the cred provider should be used
+        if all(
+            c not in profile_config for c in self._PROFILE_REQUIRED_CONFIG_VARS
+        ):
+            return None
+
+        resolved_config, extra_reqs = self._resolve_sso_session_reference(
+            profile_config, sso_sessions
+        )
+
+        config = {}
+        missing_config_vars = []
+        all_required_configs = self._ALL_REQUIRED_CONFIG_VARS + extra_reqs
+        for config_var in all_required_configs:
+            if config_var in resolved_config:
+                config[config_var] = resolved_config[config_var]
+            else:
+                missing_config_vars.append(config_var)
+
+        if missing_config_vars:
+            missing = ', '.join(missing_config_vars)
+            raise InvalidConfigError(
+                error_msg=(
+                    f'The profile "{profile_name}" is configured to use SSO '
+                    f'but is missing required configuration: {missing}'
+                )
+            )
+        return config
+
+    def _resolve_sso_session_reference(self, profile_config, sso_sessions):
+        sso_session_name = profile_config.get('sso_session')
+        if sso_session_name is None:
+            # No reference to resolve, proceed with legacy flow
+            return profile_config, ()
+
+        if sso_session_name not in sso_sessions:
+            error_msg = f'The specified sso-session does not exist: "{sso_session_name}"'
+            raise InvalidConfigError(error_msg=error_msg)
+
+        config = profile_config.copy()
+        session = sso_sessions[sso_session_name]
+        for config_var, val in session.items():
+            # Validate any keys referenced in both profile and sso_session match
+            if config.get(config_var, val) != val:
+                error_msg = (
+                    f"The value for {config_var} is inconsistent between "
+                    f"profile ({config[config_var]}) and sso-session ({val})."
+                )
+                raise InvalidConfigError(error_msg=error_msg)
+            config[config_var] = val
+        return config, ('sso_session',)
+
+    def load(self):
+        sso_config = self._load_sso_config()
+        if not sso_config:
+            return None
+
+        fetcher_kwargs = {
+            'start_url': sso_config['sso_start_url'],
+            'sso_region': sso_config['sso_region'],
+            'role_name': sso_config['sso_role_name'],
+            'account_id': sso_config['sso_account_id'],
+            'client_creator': self._client_creator,
+            'token_loader': SSOTokenLoader(cache=self._token_cache),
+            'cache': self.cache,
+        }
+        if 'sso_session' in sso_config:
+            fetcher_kwargs['sso_session_name'] = sso_config['sso_session']
+            fetcher_kwargs['token_provider'] = self._token_provider
+
+        sso_fetcher = SSOCredentialFetcher(**fetcher_kwargs)
+
+        return DeferredRefreshableCredentials(
+            method=self.METHOD,
+            refresh_using=sso_fetcher.fetch_credentials,
+        )