about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/requests_toolbelt/auth
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/requests_toolbelt/auth')
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/auth/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/auth/_digest_auth_compat.py29
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/auth/guess.py146
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/auth/handler.py142
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/auth/http_proxy_digest.py103
5 files changed, 420 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/__init__.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/_digest_auth_compat.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/_digest_auth_compat.py
new file mode 100644
index 00000000..285a6a76
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/_digest_auth_compat.py
@@ -0,0 +1,29 @@
+"""Provide a compatibility layer for requests.auth.HTTPDigestAuth."""
+import requests
+
+
+class _ThreadingDescriptor(object):
+    def __init__(self, prop, default):
+        self.prop = prop
+        self.default = default
+
+    def __get__(self, obj, objtype=None):
+        return getattr(obj._thread_local, self.prop, self.default)
+
+    def __set__(self, obj, value):
+        setattr(obj._thread_local, self.prop, value)
+
+
+class _HTTPDigestAuth(requests.auth.HTTPDigestAuth):
+    init = _ThreadingDescriptor('init', True)
+    last_nonce = _ThreadingDescriptor('last_nonce', '')
+    nonce_count = _ThreadingDescriptor('nonce_count', 0)
+    chal = _ThreadingDescriptor('chal', {})
+    pos = _ThreadingDescriptor('pos', None)
+    num_401_calls = _ThreadingDescriptor('num_401_calls', 1)
+
+
+if requests.__build__ < 0x020800:
+    HTTPDigestAuth = requests.auth.HTTPDigestAuth
+else:
+    HTTPDigestAuth = _HTTPDigestAuth
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/guess.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/guess.py
new file mode 100644
index 00000000..ba6de504
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/guess.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+"""The module containing the code for GuessAuth."""
+from requests import auth
+from requests import cookies
+
+from . import _digest_auth_compat as auth_compat, http_proxy_digest
+
+
+class GuessAuth(auth.AuthBase):
+    """Guesses the auth type by the WWW-Authentication header."""
+    def __init__(self, username, password):
+        self.username = username
+        self.password = password
+        self.auth = None
+        self.pos = None
+
+    def _handle_basic_auth_401(self, r, kwargs):
+        if self.pos is not None:
+            r.request.body.seek(self.pos)
+
+        # Consume content and release the original connection
+        # to allow our new request to reuse the same one.
+        r.content
+        r.raw.release_conn()
+        prep = r.request.copy()
+        if not hasattr(prep, '_cookies'):
+            prep._cookies = cookies.RequestsCookieJar()
+        cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
+        prep.prepare_cookies(prep._cookies)
+
+        self.auth = auth.HTTPBasicAuth(self.username, self.password)
+        prep = self.auth(prep)
+        _r = r.connection.send(prep, **kwargs)
+        _r.history.append(r)
+        _r.request = prep
+
+        return _r
+
+    def _handle_digest_auth_401(self, r, kwargs):
+        self.auth = auth_compat.HTTPDigestAuth(self.username, self.password)
+        try:
+            self.auth.init_per_thread_state()
+        except AttributeError:
+            # If we're not on requests 2.8.0+ this method does not exist and
+            # is not relevant.
+            pass
+
+        # Check that the attr exists because much older versions of requests
+        # set this attribute lazily. For example:
+        # https://github.com/kennethreitz/requests/blob/33735480f77891754304e7f13e3cdf83aaaa76aa/requests/auth.py#L59
+        if (hasattr(self.auth, 'num_401_calls') and
+                self.auth.num_401_calls is None):
+            self.auth.num_401_calls = 1
+        # Digest auth would resend the request by itself. We can take a
+        # shortcut here.
+        return self.auth.handle_401(r, **kwargs)
+
+    def handle_401(self, r, **kwargs):
+        """Resends a request with auth headers, if needed."""
+
+        www_authenticate = r.headers.get('www-authenticate', '').lower()
+
+        if 'basic' in www_authenticate:
+            return self._handle_basic_auth_401(r, kwargs)
+
+        if 'digest' in www_authenticate:
+            return self._handle_digest_auth_401(r, kwargs)
+
+    def __call__(self, request):
+        if self.auth is not None:
+            return self.auth(request)
+
+        try:
+            self.pos = request.body.tell()
+        except AttributeError:
+            pass
+
+        request.register_hook('response', self.handle_401)
+        return request
+
+
+class GuessProxyAuth(GuessAuth):
+    """
+    Guesses the auth type by WWW-Authentication and Proxy-Authentication
+    headers
+    """
+    def __init__(self, username=None, password=None,
+                 proxy_username=None, proxy_password=None):
+        super(GuessProxyAuth, self).__init__(username, password)
+        self.proxy_username = proxy_username
+        self.proxy_password = proxy_password
+        self.proxy_auth = None
+
+    def _handle_basic_auth_407(self, r, kwargs):
+        if self.pos is not None:
+            r.request.body.seek(self.pos)
+
+        r.content
+        r.raw.release_conn()
+        prep = r.request.copy()
+        if not hasattr(prep, '_cookies'):
+            prep._cookies = cookies.RequestsCookieJar()
+        cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
+        prep.prepare_cookies(prep._cookies)
+
+        self.proxy_auth = auth.HTTPProxyAuth(self.proxy_username,
+                                             self.proxy_password)
+        prep = self.proxy_auth(prep)
+        _r = r.connection.send(prep, **kwargs)
+        _r.history.append(r)
+        _r.request = prep
+
+        return _r
+
+    def _handle_digest_auth_407(self, r, kwargs):
+        self.proxy_auth = http_proxy_digest.HTTPProxyDigestAuth(
+            username=self.proxy_username,
+            password=self.proxy_password)
+
+        try:
+            self.auth.init_per_thread_state()
+        except AttributeError:
+            pass
+
+        return self.proxy_auth.handle_407(r, **kwargs)
+
+    def handle_407(self, r, **kwargs):
+        proxy_authenticate = r.headers.get('Proxy-Authenticate', '').lower()
+
+        if 'basic' in proxy_authenticate:
+            return self._handle_basic_auth_407(r, kwargs)
+
+        if 'digest' in proxy_authenticate:
+            return self._handle_digest_auth_407(r, kwargs)
+
+    def __call__(self, request):
+        if self.proxy_auth is not None:
+            request = self.proxy_auth(request)
+
+        try:
+            self.pos = request.body.tell()
+        except AttributeError:
+            pass
+
+        request.register_hook('response', self.handle_407)
+        return super(GuessProxyAuth, self).__call__(request)
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/handler.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/handler.py
new file mode 100644
index 00000000..0b4051a8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/handler.py
@@ -0,0 +1,142 @@
+# -*- coding: utf-8 -*-
+"""
+
+requests_toolbelt.auth.handler
+==============================
+
+This holds all of the implementation details of the Authentication Handler.
+
+"""
+
+from requests.auth import AuthBase, HTTPBasicAuth
+from requests.compat import urlparse, urlunparse
+
+
+class AuthHandler(AuthBase):
+
+    """
+
+    The ``AuthHandler`` object takes a dictionary of domains paired with
+    authentication strategies and will use this to determine which credentials
+    to use when making a request. For example, you could do the following:
+
+    .. code-block:: python
+
+        from requests import HTTPDigestAuth
+        from requests_toolbelt.auth.handler import AuthHandler
+
+        import requests
+
+        auth = AuthHandler({
+            'https://api.github.com': ('sigmavirus24', 'fakepassword'),
+            'https://example.com': HTTPDigestAuth('username', 'password')
+        })
+
+        r = requests.get('https://api.github.com/user', auth=auth)
+        # => <Response [200]>
+        r = requests.get('https://example.com/some/path', auth=auth)
+        # => <Response [200]>
+
+        s = requests.Session()
+        s.auth = auth
+        r = s.get('https://api.github.com/user')
+        # => <Response [200]>
+
+    .. warning::
+
+        :class:`requests.auth.HTTPDigestAuth` is not yet thread-safe. If you
+        use :class:`AuthHandler` across multiple threads you should
+        instantiate a new AuthHandler for each thread with a new
+        HTTPDigestAuth instance for each thread.
+
+    """
+
+    def __init__(self, strategies):
+        self.strategies = dict(strategies)
+        self._make_uniform()
+
+    def __call__(self, request):
+        auth = self.get_strategy_for(request.url)
+        return auth(request)
+
+    def __repr__(self):
+        return '<AuthHandler({!r})>'.format(self.strategies)
+
+    def _make_uniform(self):
+        existing_strategies = list(self.strategies.items())
+        self.strategies = {}
+
+        for (k, v) in existing_strategies:
+            self.add_strategy(k, v)
+
+    @staticmethod
+    def _key_from_url(url):
+        parsed = urlparse(url)
+        return urlunparse((parsed.scheme.lower(),
+                           parsed.netloc.lower(),
+                           '', '', '', ''))
+
+    def add_strategy(self, domain, strategy):
+        """Add a new domain and authentication strategy.
+
+        :param str domain: The domain you wish to match against. For example:
+            ``'https://api.github.com'``
+        :param str strategy: The authentication strategy you wish to use for
+            that domain. For example: ``('username', 'password')`` or
+            ``requests.HTTPDigestAuth('username', 'password')``
+
+        .. code-block:: python
+
+            a = AuthHandler({})
+            a.add_strategy('https://api.github.com', ('username', 'password'))
+
+        """
+        # Turn tuples into Basic Authentication objects
+        if isinstance(strategy, tuple):
+            strategy = HTTPBasicAuth(*strategy)
+
+        key = self._key_from_url(domain)
+        self.strategies[key] = strategy
+
+    def get_strategy_for(self, url):
+        """Retrieve the authentication strategy for a specified URL.
+
+        :param str url: The full URL you will be making a request against. For
+            example, ``'https://api.github.com/user'``
+        :returns: Callable that adds authentication to a request.
+
+        .. code-block:: python
+
+            import requests
+            a = AuthHandler({'example.com', ('foo', 'bar')})
+            strategy = a.get_strategy_for('http://example.com/example')
+            assert isinstance(strategy, requests.auth.HTTPBasicAuth)
+
+        """
+        key = self._key_from_url(url)
+        return self.strategies.get(key, NullAuthStrategy())
+
+    def remove_strategy(self, domain):
+        """Remove the domain and strategy from the collection of strategies.
+
+        :param str domain: The domain you wish remove. For example,
+            ``'https://api.github.com'``.
+
+        .. code-block:: python
+
+            a = AuthHandler({'example.com', ('foo', 'bar')})
+            a.remove_strategy('example.com')
+            assert a.strategies == {}
+
+        """
+        key = self._key_from_url(domain)
+        if key in self.strategies:
+            del self.strategies[key]
+
+
+class NullAuthStrategy(AuthBase):
+    def __repr__(self):
+        return '<NullAuthStrategy>'
+
+    def __call__(self, r):
+        return r
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/http_proxy_digest.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/http_proxy_digest.py
new file mode 100644
index 00000000..7e1f69ef
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/http_proxy_digest.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+"""The module containing HTTPProxyDigestAuth."""
+import re
+
+from requests import cookies, utils
+
+from . import _digest_auth_compat as auth
+
+
+class HTTPProxyDigestAuth(auth.HTTPDigestAuth):
+    """HTTP digest authentication between proxy
+
+    :param stale_rejects: The number of rejects indicate that:
+        the client may wish to simply retry the request
+        with a new encrypted response, without reprompting the user for a
+        new username and password. i.e., retry build_digest_header
+    :type stale_rejects: int
+    """
+    _pat = re.compile(r'digest ', flags=re.IGNORECASE)
+
+    def __init__(self, *args, **kwargs):
+        super(HTTPProxyDigestAuth, self).__init__(*args, **kwargs)
+        self.stale_rejects = 0
+
+        self.init_per_thread_state()
+
+    @property
+    def stale_rejects(self):
+        thread_local = getattr(self, '_thread_local', None)
+        if thread_local is None:
+            return self._stale_rejects
+        return thread_local.stale_rejects
+
+    @stale_rejects.setter
+    def stale_rejects(self, value):
+        thread_local = getattr(self, '_thread_local', None)
+        if thread_local is None:
+            self._stale_rejects = value
+        else:
+            thread_local.stale_rejects = value
+
+    def init_per_thread_state(self):
+        try:
+            super(HTTPProxyDigestAuth, self).init_per_thread_state()
+        except AttributeError:
+            # If we're not on requests 2.8.0+ this method does not exist
+            pass
+
+    def handle_407(self, r, **kwargs):
+        """Handle HTTP 407 only once, otherwise give up
+
+        :param r: current response
+        :returns: responses, along with the new response
+        """
+        if r.status_code == 407 and self.stale_rejects < 2:
+            s_auth = r.headers.get("proxy-authenticate")
+            if s_auth is None:
+                raise IOError(
+                    "proxy server violated RFC 7235:"
+                    "407 response MUST contain header proxy-authenticate")
+            elif not self._pat.match(s_auth):
+                return r
+
+            self.chal = utils.parse_dict_header(
+                self._pat.sub('', s_auth, count=1))
+
+            # if we present the user/passwd and still get rejected
+            # https://tools.ietf.org/html/rfc2617#section-3.2.1
+            if ('Proxy-Authorization' in r.request.headers and
+                    'stale' in self.chal):
+                if self.chal['stale'].lower() == 'true':  # try again
+                    self.stale_rejects += 1
+                # wrong user/passwd
+                elif self.chal['stale'].lower() == 'false':
+                    raise IOError("User or password is invalid")
+
+            # Consume content and release the original connection
+            # to allow our new request to reuse the same one.
+            r.content
+            r.close()
+            prep = r.request.copy()
+            cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
+            prep.prepare_cookies(prep._cookies)
+
+            prep.headers['Proxy-Authorization'] = self.build_digest_header(
+                prep.method, prep.url)
+            _r = r.connection.send(prep, **kwargs)
+            _r.history.append(r)
+            _r.request = prep
+
+            return _r
+        else:  # give up authenticate
+            return r
+
+    def __call__(self, r):
+        self.init_per_thread_state()
+        # if we have nonce, then just use it, otherwise server will tell us
+        if self.last_nonce:
+            r.headers['Proxy-Authorization'] = self.build_digest_header(
+                r.method, r.url
+            )
+        r.register_hook('response', self.handle_407)
+        return r