1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
# -*- coding: utf-8 -*-
"""
requests_toolbelt.auth.handler
==============================
This holds all of the implementation details of the Authentication Handler.
"""
from requests.auth import AuthBase, HTTPBasicAuth
from requests.compat import urlparse, urlunparse
class AuthHandler(AuthBase):
"""
The ``AuthHandler`` object takes a dictionary of domains paired with
authentication strategies and will use this to determine which credentials
to use when making a request. For example, you could do the following:
.. code-block:: python
from requests import HTTPDigestAuth
from requests_toolbelt.auth.handler import AuthHandler
import requests
auth = AuthHandler({
'https://api.github.com': ('sigmavirus24', 'fakepassword'),
'https://example.com': HTTPDigestAuth('username', 'password')
})
r = requests.get('https://api.github.com/user', auth=auth)
# => <Response [200]>
r = requests.get('https://example.com/some/path', auth=auth)
# => <Response [200]>
s = requests.Session()
s.auth = auth
r = s.get('https://api.github.com/user')
# => <Response [200]>
.. warning::
:class:`requests.auth.HTTPDigestAuth` is not yet thread-safe. If you
use :class:`AuthHandler` across multiple threads you should
instantiate a new AuthHandler for each thread with a new
HTTPDigestAuth instance for each thread.
"""
def __init__(self, strategies):
self.strategies = dict(strategies)
self._make_uniform()
def __call__(self, request):
auth = self.get_strategy_for(request.url)
return auth(request)
def __repr__(self):
return '<AuthHandler({!r})>'.format(self.strategies)
def _make_uniform(self):
existing_strategies = list(self.strategies.items())
self.strategies = {}
for (k, v) in existing_strategies:
self.add_strategy(k, v)
@staticmethod
def _key_from_url(url):
parsed = urlparse(url)
return urlunparse((parsed.scheme.lower(),
parsed.netloc.lower(),
'', '', '', ''))
def add_strategy(self, domain, strategy):
"""Add a new domain and authentication strategy.
:param str domain: The domain you wish to match against. For example:
``'https://api.github.com'``
:param str strategy: The authentication strategy you wish to use for
that domain. For example: ``('username', 'password')`` or
``requests.HTTPDigestAuth('username', 'password')``
.. code-block:: python
a = AuthHandler({})
a.add_strategy('https://api.github.com', ('username', 'password'))
"""
# Turn tuples into Basic Authentication objects
if isinstance(strategy, tuple):
strategy = HTTPBasicAuth(*strategy)
key = self._key_from_url(domain)
self.strategies[key] = strategy
def get_strategy_for(self, url):
"""Retrieve the authentication strategy for a specified URL.
:param str url: The full URL you will be making a request against. For
example, ``'https://api.github.com/user'``
:returns: Callable that adds authentication to a request.
.. code-block:: python
import requests
a = AuthHandler({'example.com', ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com/example')
assert isinstance(strategy, requests.auth.HTTPBasicAuth)
"""
key = self._key_from_url(url)
return self.strategies.get(key, NullAuthStrategy())
def remove_strategy(self, domain):
"""Remove the domain and strategy from the collection of strategies.
:param str domain: The domain you wish remove. For example,
``'https://api.github.com'``.
.. code-block:: python
a = AuthHandler({'example.com', ('foo', 'bar')})
a.remove_strategy('example.com')
assert a.strategies == {}
"""
key = self._key_from_url(domain)
if key in self.strategies:
del self.strategies[key]
class NullAuthStrategy(AuthBase):
def __repr__(self):
return '<NullAuthStrategy>'
def __call__(self, r):
return r
|