diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/requests_toolbelt/auth')
5 files changed, 420 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/__init__.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/__init__.py diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/_digest_auth_compat.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/_digest_auth_compat.py new file mode 100644 index 00000000..285a6a76 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/_digest_auth_compat.py @@ -0,0 +1,29 @@ +"""Provide a compatibility layer for requests.auth.HTTPDigestAuth.""" +import requests + + +class _ThreadingDescriptor(object): + def __init__(self, prop, default): + self.prop = prop + self.default = default + + def __get__(self, obj, objtype=None): + return getattr(obj._thread_local, self.prop, self.default) + + def __set__(self, obj, value): + setattr(obj._thread_local, self.prop, value) + + +class _HTTPDigestAuth(requests.auth.HTTPDigestAuth): + init = _ThreadingDescriptor('init', True) + last_nonce = _ThreadingDescriptor('last_nonce', '') + nonce_count = _ThreadingDescriptor('nonce_count', 0) + chal = _ThreadingDescriptor('chal', {}) + pos = _ThreadingDescriptor('pos', None) + num_401_calls = _ThreadingDescriptor('num_401_calls', 1) + + +if requests.__build__ < 0x020800: + HTTPDigestAuth = requests.auth.HTTPDigestAuth +else: + HTTPDigestAuth = _HTTPDigestAuth diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/guess.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/guess.py new file mode 100644 index 00000000..ba6de504 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/guess.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +"""The module containing the code for GuessAuth.""" +from requests import auth +from requests import cookies + +from . import _digest_auth_compat as auth_compat, http_proxy_digest + + +class GuessAuth(auth.AuthBase): + """Guesses the auth type by the WWW-Authentication header.""" + def __init__(self, username, password): + self.username = username + self.password = password + self.auth = None + self.pos = None + + def _handle_basic_auth_401(self, r, kwargs): + if self.pos is not None: + r.request.body.seek(self.pos) + + # Consume content and release the original connection + # to allow our new request to reuse the same one. + r.content + r.raw.release_conn() + prep = r.request.copy() + if not hasattr(prep, '_cookies'): + prep._cookies = cookies.RequestsCookieJar() + cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw) + prep.prepare_cookies(prep._cookies) + + self.auth = auth.HTTPBasicAuth(self.username, self.password) + prep = self.auth(prep) + _r = r.connection.send(prep, **kwargs) + _r.history.append(r) + _r.request = prep + + return _r + + def _handle_digest_auth_401(self, r, kwargs): + self.auth = auth_compat.HTTPDigestAuth(self.username, self.password) + try: + self.auth.init_per_thread_state() + except AttributeError: + # If we're not on requests 2.8.0+ this method does not exist and + # is not relevant. + pass + + # Check that the attr exists because much older versions of requests + # set this attribute lazily. For example: + # https://github.com/kennethreitz/requests/blob/33735480f77891754304e7f13e3cdf83aaaa76aa/requests/auth.py#L59 + if (hasattr(self.auth, 'num_401_calls') and + self.auth.num_401_calls is None): + self.auth.num_401_calls = 1 + # Digest auth would resend the request by itself. We can take a + # shortcut here. + return self.auth.handle_401(r, **kwargs) + + def handle_401(self, r, **kwargs): + """Resends a request with auth headers, if needed.""" + + www_authenticate = r.headers.get('www-authenticate', '').lower() + + if 'basic' in www_authenticate: + return self._handle_basic_auth_401(r, kwargs) + + if 'digest' in www_authenticate: + return self._handle_digest_auth_401(r, kwargs) + + def __call__(self, request): + if self.auth is not None: + return self.auth(request) + + try: + self.pos = request.body.tell() + except AttributeError: + pass + + request.register_hook('response', self.handle_401) + return request + + +class GuessProxyAuth(GuessAuth): + """ + Guesses the auth type by WWW-Authentication and Proxy-Authentication + headers + """ + def __init__(self, username=None, password=None, + proxy_username=None, proxy_password=None): + super(GuessProxyAuth, self).__init__(username, password) + self.proxy_username = proxy_username + self.proxy_password = proxy_password + self.proxy_auth = None + + def _handle_basic_auth_407(self, r, kwargs): + if self.pos is not None: + r.request.body.seek(self.pos) + + r.content + r.raw.release_conn() + prep = r.request.copy() + if not hasattr(prep, '_cookies'): + prep._cookies = cookies.RequestsCookieJar() + cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw) + prep.prepare_cookies(prep._cookies) + + self.proxy_auth = auth.HTTPProxyAuth(self.proxy_username, + self.proxy_password) + prep = self.proxy_auth(prep) + _r = r.connection.send(prep, **kwargs) + _r.history.append(r) + _r.request = prep + + return _r + + def _handle_digest_auth_407(self, r, kwargs): + self.proxy_auth = http_proxy_digest.HTTPProxyDigestAuth( + username=self.proxy_username, + password=self.proxy_password) + + try: + self.auth.init_per_thread_state() + except AttributeError: + pass + + return self.proxy_auth.handle_407(r, **kwargs) + + def handle_407(self, r, **kwargs): + proxy_authenticate = r.headers.get('Proxy-Authenticate', '').lower() + + if 'basic' in proxy_authenticate: + return self._handle_basic_auth_407(r, kwargs) + + if 'digest' in proxy_authenticate: + return self._handle_digest_auth_407(r, kwargs) + + def __call__(self, request): + if self.proxy_auth is not None: + request = self.proxy_auth(request) + + try: + self.pos = request.body.tell() + except AttributeError: + pass + + request.register_hook('response', self.handle_407) + return super(GuessProxyAuth, self).__call__(request) diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/handler.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/handler.py new file mode 100644 index 00000000..0b4051a8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/handler.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +""" + +requests_toolbelt.auth.handler +============================== + +This holds all of the implementation details of the Authentication Handler. + +""" + +from requests.auth import AuthBase, HTTPBasicAuth +from requests.compat import urlparse, urlunparse + + +class AuthHandler(AuthBase): + + """ + + The ``AuthHandler`` object takes a dictionary of domains paired with + authentication strategies and will use this to determine which credentials + to use when making a request. For example, you could do the following: + + .. code-block:: python + + from requests import HTTPDigestAuth + from requests_toolbelt.auth.handler import AuthHandler + + import requests + + auth = AuthHandler({ + 'https://api.github.com': ('sigmavirus24', 'fakepassword'), + 'https://example.com': HTTPDigestAuth('username', 'password') + }) + + r = requests.get('https://api.github.com/user', auth=auth) + # => <Response [200]> + r = requests.get('https://example.com/some/path', auth=auth) + # => <Response [200]> + + s = requests.Session() + s.auth = auth + r = s.get('https://api.github.com/user') + # => <Response [200]> + + .. warning:: + + :class:`requests.auth.HTTPDigestAuth` is not yet thread-safe. If you + use :class:`AuthHandler` across multiple threads you should + instantiate a new AuthHandler for each thread with a new + HTTPDigestAuth instance for each thread. + + """ + + def __init__(self, strategies): + self.strategies = dict(strategies) + self._make_uniform() + + def __call__(self, request): + auth = self.get_strategy_for(request.url) + return auth(request) + + def __repr__(self): + return '<AuthHandler({!r})>'.format(self.strategies) + + def _make_uniform(self): + existing_strategies = list(self.strategies.items()) + self.strategies = {} + + for (k, v) in existing_strategies: + self.add_strategy(k, v) + + @staticmethod + def _key_from_url(url): + parsed = urlparse(url) + return urlunparse((parsed.scheme.lower(), + parsed.netloc.lower(), + '', '', '', '')) + + def add_strategy(self, domain, strategy): + """Add a new domain and authentication strategy. + + :param str domain: The domain you wish to match against. For example: + ``'https://api.github.com'`` + :param str strategy: The authentication strategy you wish to use for + that domain. For example: ``('username', 'password')`` or + ``requests.HTTPDigestAuth('username', 'password')`` + + .. code-block:: python + + a = AuthHandler({}) + a.add_strategy('https://api.github.com', ('username', 'password')) + + """ + # Turn tuples into Basic Authentication objects + if isinstance(strategy, tuple): + strategy = HTTPBasicAuth(*strategy) + + key = self._key_from_url(domain) + self.strategies[key] = strategy + + def get_strategy_for(self, url): + """Retrieve the authentication strategy for a specified URL. + + :param str url: The full URL you will be making a request against. For + example, ``'https://api.github.com/user'`` + :returns: Callable that adds authentication to a request. + + .. code-block:: python + + import requests + a = AuthHandler({'example.com', ('foo', 'bar')}) + strategy = a.get_strategy_for('http://example.com/example') + assert isinstance(strategy, requests.auth.HTTPBasicAuth) + + """ + key = self._key_from_url(url) + return self.strategies.get(key, NullAuthStrategy()) + + def remove_strategy(self, domain): + """Remove the domain and strategy from the collection of strategies. + + :param str domain: The domain you wish remove. For example, + ``'https://api.github.com'``. + + .. code-block:: python + + a = AuthHandler({'example.com', ('foo', 'bar')}) + a.remove_strategy('example.com') + assert a.strategies == {} + + """ + key = self._key_from_url(domain) + if key in self.strategies: + del self.strategies[key] + + +class NullAuthStrategy(AuthBase): + def __repr__(self): + return '<NullAuthStrategy>' + + def __call__(self, r): + return r diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/http_proxy_digest.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/http_proxy_digest.py new file mode 100644 index 00000000..7e1f69ef --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/auth/http_proxy_digest.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +"""The module containing HTTPProxyDigestAuth.""" +import re + +from requests import cookies, utils + +from . import _digest_auth_compat as auth + + +class HTTPProxyDigestAuth(auth.HTTPDigestAuth): + """HTTP digest authentication between proxy + + :param stale_rejects: The number of rejects indicate that: + the client may wish to simply retry the request + with a new encrypted response, without reprompting the user for a + new username and password. i.e., retry build_digest_header + :type stale_rejects: int + """ + _pat = re.compile(r'digest ', flags=re.IGNORECASE) + + def __init__(self, *args, **kwargs): + super(HTTPProxyDigestAuth, self).__init__(*args, **kwargs) + self.stale_rejects = 0 + + self.init_per_thread_state() + + @property + def stale_rejects(self): + thread_local = getattr(self, '_thread_local', None) + if thread_local is None: + return self._stale_rejects + return thread_local.stale_rejects + + @stale_rejects.setter + def stale_rejects(self, value): + thread_local = getattr(self, '_thread_local', None) + if thread_local is None: + self._stale_rejects = value + else: + thread_local.stale_rejects = value + + def init_per_thread_state(self): + try: + super(HTTPProxyDigestAuth, self).init_per_thread_state() + except AttributeError: + # If we're not on requests 2.8.0+ this method does not exist + pass + + def handle_407(self, r, **kwargs): + """Handle HTTP 407 only once, otherwise give up + + :param r: current response + :returns: responses, along with the new response + """ + if r.status_code == 407 and self.stale_rejects < 2: + s_auth = r.headers.get("proxy-authenticate") + if s_auth is None: + raise IOError( + "proxy server violated RFC 7235:" + "407 response MUST contain header proxy-authenticate") + elif not self._pat.match(s_auth): + return r + + self.chal = utils.parse_dict_header( + self._pat.sub('', s_auth, count=1)) + + # if we present the user/passwd and still get rejected + # https://tools.ietf.org/html/rfc2617#section-3.2.1 + if ('Proxy-Authorization' in r.request.headers and + 'stale' in self.chal): + if self.chal['stale'].lower() == 'true': # try again + self.stale_rejects += 1 + # wrong user/passwd + elif self.chal['stale'].lower() == 'false': + raise IOError("User or password is invalid") + + # Consume content and release the original connection + # to allow our new request to reuse the same one. + r.content + r.close() + prep = r.request.copy() + cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw) + prep.prepare_cookies(prep._cookies) + + prep.headers['Proxy-Authorization'] = self.build_digest_header( + prep.method, prep.url) + _r = r.connection.send(prep, **kwargs) + _r.history.append(r) + _r.request = prep + + return _r + else: # give up authenticate + return r + + def __call__(self, r): + self.init_per_thread_state() + # if we have nonce, then just use it, otherwise server will tell us + if self.last_nonce: + r.headers['Proxy-Authorization'] = self.build_digest_header( + r.method, r.url + ) + r.register_hook('response', self.handle_407) + return r |