aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/requests_toolbelt/adapters')
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/__init__.py15
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/appengine.py206
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/fingerprint.py48
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/host_header_ssl.py43
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py129
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/source.py67
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/ssl.py66
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/x509.py196
8 files changed, 770 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/__init__.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/__init__.py
new file mode 100644
index 00000000..7195f43e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/__init__.py
@@ -0,0 +1,15 @@
+# -*- coding: utf-8 -*-
+"""
+requests-toolbelt.adapters
+==========================
+
+See https://toolbelt.readthedocs.io/ for documentation
+
+:copyright: (c) 2014 by Ian Cordasco and Cory Benfield
+:license: Apache v2.0, see LICENSE for more details
+"""
+
+from .ssl import SSLAdapter
+from .source import SourceAddressAdapter
+
+__all__ = ['SSLAdapter', 'SourceAddressAdapter']
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/appengine.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/appengine.py
new file mode 100644
index 00000000..25a70a17
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/appengine.py
@@ -0,0 +1,206 @@
+# -*- coding: utf-8 -*-
+"""The App Engine Transport Adapter for requests.
+
+.. versionadded:: 0.6.0
+
+This requires a version of requests >= 2.10.0 and Python 2.
+
+There are two ways to use this library:
+
+#. If you're using requests directly, you can use code like:
+
+ .. code-block:: python
+
+ >>> import requests
+ >>> import ssl
+ >>> import requests.packages.urllib3.contrib.appengine as ul_appengine
+ >>> from requests_toolbelt.adapters import appengine
+ >>> s = requests.Session()
+ >>> if ul_appengine.is_appengine_sandbox():
+ ... s.mount('http://', appengine.AppEngineAdapter())
+ ... s.mount('https://', appengine.AppEngineAdapter())
+
+#. If you depend on external libraries which use requests, you can use code
+ like:
+
+ .. code-block:: python
+
+ >>> from requests_toolbelt.adapters import appengine
+ >>> appengine.monkeypatch()
+
+which will ensure all requests.Session objects use AppEngineAdapter properly.
+
+You are also able to :ref:`disable certificate validation <insecure_appengine>`
+when monkey-patching.
+"""
+import requests
+import warnings
+from requests import adapters
+from requests import sessions
+
+from .. import exceptions as exc
+from .._compat import gaecontrib
+from .._compat import timeout
+
+
+class AppEngineMROHack(adapters.HTTPAdapter):
+ """Resolves infinite recursion when monkeypatching.
+
+ This works by injecting itself as the base class of both the
+ :class:`AppEngineAdapter` and Requests' default HTTPAdapter, which needs to
+ be done because default HTTPAdapter's MRO is recompiled when we
+ monkeypatch, at which point this class becomes HTTPAdapter's base class.
+ In addition, we use an instantiation flag to avoid infinite recursion.
+ """
+ _initialized = False
+
+ def __init__(self, *args, **kwargs):
+ if not self._initialized:
+ self._initialized = True
+ super(AppEngineMROHack, self).__init__(*args, **kwargs)
+
+
+class AppEngineAdapter(AppEngineMROHack, adapters.HTTPAdapter):
+ """The transport adapter for Requests to use urllib3's GAE support.
+
+ Implements Requests's HTTPAdapter API.
+
+ When deploying to Google's App Engine service, some of Requests'
+ functionality is broken. There is underlying support for GAE in urllib3.
+ This functionality, however, is opt-in and needs to be enabled explicitly
+ for Requests to be able to use it.
+ """
+
+ __attrs__ = adapters.HTTPAdapter.__attrs__ + ['_validate_certificate']
+
+ def __init__(self, validate_certificate=True, *args, **kwargs):
+ _check_version()
+ self._validate_certificate = validate_certificate
+ super(AppEngineAdapter, self).__init__(*args, **kwargs)
+
+ def init_poolmanager(self, connections, maxsize, block=False):
+ self.poolmanager = _AppEnginePoolManager(self._validate_certificate)
+
+
+class InsecureAppEngineAdapter(AppEngineAdapter):
+ """An always-insecure GAE adapter for Requests.
+
+ This is a variant of the the transport adapter for Requests to use
+ urllib3's GAE support that does not validate certificates. Use with
+ caution!
+
+ .. note::
+ The ``validate_certificate`` keyword argument will not be honored here
+ and is not part of the signature because we always force it to
+ ``False``.
+
+ See :class:`AppEngineAdapter` for further details.
+ """
+
+ def __init__(self, *args, **kwargs):
+ if kwargs.pop("validate_certificate", False):
+ warnings.warn("Certificate validation cannot be specified on the "
+ "InsecureAppEngineAdapter, but was present. This "
+ "will be ignored and certificate validation will "
+ "remain off.", exc.IgnoringGAECertificateValidation)
+
+ super(InsecureAppEngineAdapter, self).__init__(
+ validate_certificate=False, *args, **kwargs)
+
+
+class _AppEnginePoolManager(object):
+ """Implements urllib3's PoolManager API expected by requests.
+
+ While a real PoolManager map hostnames to reusable Connections,
+ AppEngine has no concept of a reusable connection to a host.
+ So instead, this class constructs a small Connection per request,
+ that is returned to the Adapter and used to access the URL.
+ """
+
+ def __init__(self, validate_certificate=True):
+ self.appengine_manager = gaecontrib.AppEngineManager(
+ validate_certificate=validate_certificate)
+
+ def connection_from_url(self, url):
+ return _AppEngineConnection(self.appengine_manager, url)
+
+ def clear(self):
+ pass
+
+
+class _AppEngineConnection(object):
+ """Implements urllib3's HTTPConnectionPool API's urlopen().
+
+ This Connection's urlopen() is called with a host-relative path,
+ so in order to properly support opening the URL, we need to store
+ the full URL when this Connection is constructed from the PoolManager.
+
+ This code wraps AppEngineManager.urlopen(), which exposes a different
+ API than in the original urllib3 urlopen(), and thus needs this adapter.
+ """
+
+ def __init__(self, appengine_manager, url):
+ self.appengine_manager = appengine_manager
+ self.url = url
+
+ def urlopen(self, method, url, body=None, headers=None, retries=None,
+ redirect=True, assert_same_host=True,
+ timeout=timeout.Timeout.DEFAULT_TIMEOUT,
+ pool_timeout=None, release_conn=None, **response_kw):
+ # This function's url argument is a host-relative URL,
+ # but the AppEngineManager expects an absolute URL.
+ # So we saved out the self.url when the AppEngineConnection
+ # was constructed, which we then can use down below instead.
+
+ # We once tried to verify our assumptions here, but sometimes the
+ # passed-in URL differs on url fragments, or "http://a.com" vs "/".
+
+ # urllib3's App Engine adapter only uses Timeout.total, not read or
+ # connect.
+ if not timeout.total:
+ timeout.total = timeout._read or timeout._connect
+
+ # Jump through the hoops necessary to call AppEngineManager's API.
+ return self.appengine_manager.urlopen(
+ method,
+ self.url,
+ body=body,
+ headers=headers,
+ retries=retries,
+ redirect=redirect,
+ timeout=timeout,
+ **response_kw)
+
+
+def monkeypatch(validate_certificate=True):
+ """Sets up all Sessions to use AppEngineAdapter by default.
+
+ If you don't want to deal with configuring your own Sessions,
+ or if you use libraries that use requests directly (ie requests.post),
+ then you may prefer to monkeypatch and auto-configure all Sessions.
+
+ .. warning: :
+
+ If ``validate_certificate`` is ``False``, certification validation will
+ effectively be disabled for all requests.
+ """
+ _check_version()
+ # HACK: We should consider modifying urllib3 to support this cleanly,
+ # so that we can set a module-level variable in the sessions module,
+ # instead of overriding an imported HTTPAdapter as is done here.
+ adapter = AppEngineAdapter
+ if not validate_certificate:
+ adapter = InsecureAppEngineAdapter
+
+ sessions.HTTPAdapter = adapter
+ adapters.HTTPAdapter = adapter
+
+
+def _check_version():
+ if gaecontrib is None:
+ raise exc.VersionMismatchError(
+ "The toolbelt requires at least Requests 2.10.0 to be "
+ "installed. Version {} was found instead.".format(
+ requests.__version__
+ )
+ )
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/fingerprint.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/fingerprint.py
new file mode 100644
index 00000000..6645d349
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/fingerprint.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+"""Submodule containing the implementation for the FingerprintAdapter.
+
+This file contains an implementation of a Transport Adapter that validates
+the fingerprints of SSL certificates presented upon connection.
+"""
+from requests.adapters import HTTPAdapter
+
+from .._compat import poolmanager
+
+
+class FingerprintAdapter(HTTPAdapter):
+ """
+ A HTTPS Adapter for Python Requests that verifies certificate fingerprints,
+ instead of certificate hostnames.
+
+ Example usage:
+
+ .. code-block:: python
+
+ import requests
+ import ssl
+ from requests_toolbelt.adapters.fingerprint import FingerprintAdapter
+
+ twitter_fingerprint = '...'
+ s = requests.Session()
+ s.mount(
+ 'https://twitter.com',
+ FingerprintAdapter(twitter_fingerprint)
+ )
+
+ The fingerprint should be provided as a hexadecimal string, optionally
+ containing colons.
+ """
+
+ __attrs__ = HTTPAdapter.__attrs__ + ['fingerprint']
+
+ def __init__(self, fingerprint, **kwargs):
+ self.fingerprint = fingerprint
+
+ super(FingerprintAdapter, self).__init__(**kwargs)
+
+ def init_poolmanager(self, connections, maxsize, block=False):
+ self.poolmanager = poolmanager.PoolManager(
+ num_pools=connections,
+ maxsize=maxsize,
+ block=block,
+ assert_fingerprint=self.fingerprint)
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/host_header_ssl.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/host_header_ssl.py
new file mode 100644
index 00000000..f34ed1aa
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/host_header_ssl.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+"""
+requests_toolbelt.adapters.host_header_ssl
+==========================================
+
+This file contains an implementation of the HostHeaderSSLAdapter.
+"""
+
+from requests.adapters import HTTPAdapter
+
+
+class HostHeaderSSLAdapter(HTTPAdapter):
+ """
+ A HTTPS Adapter for Python Requests that sets the hostname for certificate
+ verification based on the Host header.
+
+ This allows requesting the IP address directly via HTTPS without getting
+ a "hostname doesn't match" exception.
+
+ Example usage:
+
+ >>> s.mount('https://', HostHeaderSSLAdapter())
+ >>> s.get("https://93.184.216.34", headers={"Host": "example.org"})
+
+ """
+
+ def send(self, request, **kwargs):
+ # HTTP headers are case-insensitive (RFC 7230)
+ host_header = None
+ for header in request.headers:
+ if header.lower() == "host":
+ host_header = request.headers[header]
+ break
+
+ connection_pool_kwargs = self.poolmanager.connection_pool_kw
+
+ if host_header:
+ connection_pool_kwargs["assert_hostname"] = host_header
+ elif "assert_hostname" in connection_pool_kwargs:
+ # an assert_hostname from a previous request may have been left
+ connection_pool_kwargs.pop("assert_hostname", None)
+
+ return super(HostHeaderSSLAdapter, self).send(request, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py
new file mode 100644
index 00000000..86ebe136
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+"""The implementation of the SocketOptionsAdapter."""
+import socket
+import warnings
+import sys
+
+import requests
+from requests import adapters
+
+from .._compat import connection
+from .._compat import poolmanager
+from .. import exceptions as exc
+
+
+class SocketOptionsAdapter(adapters.HTTPAdapter):
+ """An adapter for requests that allows users to specify socket options.
+
+ Since version 2.4.0 of requests, it is possible to specify a custom list
+ of socket options that need to be set before establishing the connection.
+
+ Example usage::
+
+ >>> import socket
+ >>> import requests
+ >>> from requests_toolbelt.adapters import socket_options
+ >>> s = requests.Session()
+ >>> opts = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0)]
+ >>> adapter = socket_options.SocketOptionsAdapter(socket_options=opts)
+ >>> s.mount('http://', adapter)
+
+ You can also take advantage of the list of default options on this class
+ to keep using the original options in addition to your custom options. In
+ that case, ``opts`` might look like::
+
+ >>> opts = socket_options.SocketOptionsAdapter.default_options + opts
+
+ """
+
+ if connection is not None:
+ default_options = getattr(
+ connection.HTTPConnection,
+ 'default_socket_options',
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
+ )
+ else:
+ default_options = []
+ warnings.warn(exc.RequestsVersionTooOld,
+ "This version of Requests is only compatible with a "
+ "version of urllib3 which is too old to support "
+ "setting options on a socket. This adapter is "
+ "functionally useless.")
+
+ def __init__(self, **kwargs):
+ self.socket_options = kwargs.pop('socket_options',
+ self.default_options)
+
+ super(SocketOptionsAdapter, self).__init__(**kwargs)
+
+ def init_poolmanager(self, connections, maxsize, block=False):
+ if requests.__build__ >= 0x020400:
+ # NOTE(Ian): Perhaps we should raise a warning
+ self.poolmanager = poolmanager.PoolManager(
+ num_pools=connections,
+ maxsize=maxsize,
+ block=block,
+ socket_options=self.socket_options
+ )
+ else:
+ super(SocketOptionsAdapter, self).init_poolmanager(
+ connections, maxsize, block
+ )
+
+
+class TCPKeepAliveAdapter(SocketOptionsAdapter):
+ """An adapter for requests that turns on TCP Keep-Alive by default.
+
+ The adapter sets 4 socket options:
+
+ - ``SOL_SOCKET`` ``SO_KEEPALIVE`` - This turns on TCP Keep-Alive
+ - ``IPPROTO_TCP`` ``TCP_KEEPINTVL`` 20 - Sets the keep alive interval
+ - ``IPPROTO_TCP`` ``TCP_KEEPCNT`` 5 - Sets the number of keep alive probes
+ - ``IPPROTO_TCP`` ``TCP_KEEPIDLE`` 60 - Sets the keep alive time if the
+ socket library has the ``TCP_KEEPIDLE`` constant
+
+ The latter three can be overridden by keyword arguments (respectively):
+
+ - ``interval``
+ - ``count``
+ - ``idle``
+
+ You can use this adapter like so::
+
+ >>> from requests_toolbelt.adapters import socket_options
+ >>> tcp = socket_options.TCPKeepAliveAdapter(idle=120, interval=10)
+ >>> s = requests.Session()
+ >>> s.mount('http://', tcp)
+
+ """
+
+ def __init__(self, **kwargs):
+ socket_options = kwargs.pop('socket_options',
+ SocketOptionsAdapter.default_options)
+ idle = kwargs.pop('idle', 60)
+ interval = kwargs.pop('interval', 20)
+ count = kwargs.pop('count', 5)
+ socket_options = socket_options + [
+ (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ ]
+
+ # NOTE(Ian): OSX does not have these constants defined, so we
+ # set them conditionally.
+ if getattr(socket, 'TCP_KEEPINTVL', None) is not None:
+ socket_options += [(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
+ interval)]
+ elif sys.platform == 'darwin':
+ # On OSX, TCP_KEEPALIVE from netinet/tcp.h is not exported
+ # by python's socket module
+ TCP_KEEPALIVE = getattr(socket, 'TCP_KEEPALIVE', 0x10)
+ socket_options += [(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval)]
+
+ if getattr(socket, 'TCP_KEEPCNT', None) is not None:
+ socket_options += [(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count)]
+
+ if getattr(socket, 'TCP_KEEPIDLE', None) is not None:
+ socket_options += [(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)]
+
+ super(TCPKeepAliveAdapter, self).__init__(
+ socket_options=socket_options, **kwargs
+ )
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/source.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/source.py
new file mode 100644
index 00000000..d3dda797
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/source.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+"""
+requests_toolbelt.source_adapter
+================================
+
+This file contains an implementation of the SourceAddressAdapter originally
+demonstrated on the Requests GitHub page.
+"""
+from requests.adapters import HTTPAdapter
+
+from .._compat import poolmanager, basestring
+
+
+class SourceAddressAdapter(HTTPAdapter):
+ """
+ A Source Address Adapter for Python Requests that enables you to choose the
+ local address to bind to. This allows you to send your HTTP requests from a
+ specific interface and IP address.
+
+ Two address formats are accepted. The first is a string: this will set the
+ local IP address to the address given in the string, and will also choose a
+ semi-random high port for the local port number.
+
+ The second is a two-tuple of the form (ip address, port): for example,
+ ``('10.10.10.10', 8999)``. This will set the local IP address to the first
+ element, and the local port to the second element. If ``0`` is used as the
+ port number, a semi-random high port will be selected.
+
+ .. warning:: Setting an explicit local port can have negative interactions
+ with connection-pooling in Requests: in particular, it risks
+ the possibility of getting "Address in use" errors. The
+ string-only argument is generally preferred to the tuple-form.
+
+ Example usage:
+
+ .. code-block:: python
+
+ import requests
+ from requests_toolbelt.adapters.source import SourceAddressAdapter
+
+ s = requests.Session()
+ s.mount('http://', SourceAddressAdapter('10.10.10.10'))
+ s.mount('https://', SourceAddressAdapter(('10.10.10.10', 8999)))
+ """
+ def __init__(self, source_address, **kwargs):
+ if isinstance(source_address, basestring):
+ self.source_address = (source_address, 0)
+ elif isinstance(source_address, tuple):
+ self.source_address = source_address
+ else:
+ raise TypeError(
+ "source_address must be IP address string or (ip, port) tuple"
+ )
+
+ super(SourceAddressAdapter, self).__init__(**kwargs)
+
+ def init_poolmanager(self, connections, maxsize, block=False):
+ self.poolmanager = poolmanager.PoolManager(
+ num_pools=connections,
+ maxsize=maxsize,
+ block=block,
+ source_address=self.source_address)
+
+ def proxy_manager_for(self, *args, **kwargs):
+ kwargs['source_address'] = self.source_address
+ return super(SourceAddressAdapter, self).proxy_manager_for(
+ *args, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/ssl.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/ssl.py
new file mode 100644
index 00000000..c4a76ae4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/ssl.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+"""
+
+requests_toolbelt.ssl_adapter
+=============================
+
+This file contains an implementation of the SSLAdapter originally demonstrated
+in this blog post:
+https://lukasa.co.uk/2013/01/Choosing_SSL_Version_In_Requests/
+
+"""
+import requests
+
+from requests.adapters import HTTPAdapter
+
+from .._compat import poolmanager
+
+
+class SSLAdapter(HTTPAdapter):
+ """
+ A HTTPS Adapter for Python Requests that allows the choice of the SSL/TLS
+ version negotiated by Requests. This can be used either to enforce the
+ choice of high-security TLS versions (where supported), or to work around
+ misbehaving servers that fail to correctly negotiate the default TLS
+ version being offered.
+
+ Example usage:
+
+ >>> import requests
+ >>> import ssl
+ >>> from requests_toolbelt import SSLAdapter
+ >>> s = requests.Session()
+ >>> s.mount('https://', SSLAdapter(ssl.PROTOCOL_TLSv1))
+
+ You can replace the chosen protocol with any that are available in the
+ default Python SSL module. All subsequent requests that match the adapter
+ prefix will use the chosen SSL version instead of the default.
+
+ This adapter will also attempt to change the SSL/TLS version negotiated by
+ Requests when using a proxy. However, this may not always be possible:
+ prior to Requests v2.4.0 the adapter did not have access to the proxy setup
+ code. In earlier versions of Requests, this adapter will not function
+ properly when used with proxies.
+ """
+
+ __attrs__ = HTTPAdapter.__attrs__ + ['ssl_version']
+
+ def __init__(self, ssl_version=None, **kwargs):
+ self.ssl_version = ssl_version
+
+ super(SSLAdapter, self).__init__(**kwargs)
+
+ def init_poolmanager(self, connections, maxsize, block=False):
+ self.poolmanager = poolmanager.PoolManager(
+ num_pools=connections,
+ maxsize=maxsize,
+ block=block,
+ ssl_version=self.ssl_version)
+
+ if requests.__build__ >= 0x020400:
+ # Earlier versions of requests either don't have this method or, worse,
+ # don't allow passing arbitrary keyword arguments. As a result, only
+ # conditionally define this method.
+ def proxy_manager_for(self, *args, **kwargs):
+ kwargs['ssl_version'] = self.ssl_version
+ return super(SSLAdapter, self).proxy_manager_for(*args, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/x509.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/x509.py
new file mode 100644
index 00000000..aff37706
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/x509.py
@@ -0,0 +1,196 @@
+# -*- coding: utf-8 -*-
+"""A X509Adapter for use with the requests library.
+
+This file contains an implementation of the X509Adapter that will
+allow users to authenticate a request using an arbitrary
+X.509 certificate without needing to convert it to a .pem file
+
+"""
+
+from OpenSSL.crypto import PKey, X509
+from cryptography import x509
+from cryptography.hazmat.primitives.serialization import (load_pem_private_key,
+ load_der_private_key)
+from cryptography.hazmat.primitives.serialization import Encoding
+from cryptography.hazmat.backends import default_backend
+
+from datetime import datetime
+from requests.adapters import HTTPAdapter
+import requests
+
+from .. import exceptions as exc
+
+"""
+importing the protocol constants from _ssl instead of ssl because only the
+constants are needed and to handle issues caused by importing from ssl on
+the 2.7.x line.
+"""
+try:
+ from _ssl import PROTOCOL_TLS as PROTOCOL
+except ImportError:
+ from _ssl import PROTOCOL_SSLv23 as PROTOCOL
+
+
+PyOpenSSLContext = None
+
+
+class X509Adapter(HTTPAdapter):
+ r"""Adapter for use with X.509 certificates.
+
+ Provides an interface for Requests sessions to contact HTTPS urls and
+ authenticate with an X.509 cert by implementing the Transport Adapter
+ interface. This class will need to be manually instantiated and mounted
+ to the session
+
+ :param pool_connections: The number of urllib3 connection pools to
+ cache.
+ :param pool_maxsize: The maximum number of connections to save in the
+ pool.
+ :param max_retries: The maximum number of retries each connection
+ should attempt. Note, this applies only to failed DNS lookups,
+ socket connections and connection timeouts, never to requests where
+ data has made it to the server. By default, Requests does not retry
+ failed connections. If you need granular control over the
+ conditions under which we retry a request, import urllib3's
+ ``Retry`` class and pass that instead.
+ :param pool_block: Whether the connection pool should block for
+ connections.
+
+ :param bytes cert_bytes:
+ bytes object containing contents of a cryptography.x509Certificate
+ object using the encoding specified by the ``encoding`` parameter.
+ :param bytes pk_bytes:
+ bytes object containing contents of a object that implements
+ ``cryptography.hazmat.primitives.serialization.PrivateFormat``
+ using the encoding specified by the ``encoding`` parameter.
+ :param password:
+ string or utf8 encoded bytes containing the passphrase used for the
+ private key. None if unencrypted. Defaults to None.
+ :param encoding:
+ Enumeration detailing the encoding method used on the ``cert_bytes``
+ parameter. Can be either PEM or DER. Defaults to PEM.
+ :type encoding:
+ :class: `cryptography.hazmat.primitives.serialization.Encoding`
+
+ Usage::
+
+ >>> import requests
+ >>> from requests_toolbelt.adapters.x509 import X509Adapter
+ >>> s = requests.Session()
+ >>> a = X509Adapter(max_retries=3,
+ cert_bytes=b'...', pk_bytes=b'...', encoding='...'
+ >>> s.mount('https://', a)
+ """
+
+ def __init__(self, *args, **kwargs):
+ self._import_pyopensslcontext()
+ self._check_version()
+ cert_bytes = kwargs.pop('cert_bytes', None)
+ pk_bytes = kwargs.pop('pk_bytes', None)
+ password = kwargs.pop('password', None)
+ encoding = kwargs.pop('encoding', Encoding.PEM)
+
+ password_bytes = None
+
+ if cert_bytes is None or not isinstance(cert_bytes, bytes):
+ raise ValueError('Invalid cert content provided. '
+ 'You must provide an X.509 cert '
+ 'formatted as a byte array.')
+ if pk_bytes is None or not isinstance(pk_bytes, bytes):
+ raise ValueError('Invalid private key content provided. '
+ 'You must provide a private key '
+ 'formatted as a byte array.')
+
+ if isinstance(password, bytes):
+ password_bytes = password
+ elif password:
+ password_bytes = password.encode('utf8')
+
+ self.ssl_context = create_ssl_context(cert_bytes, pk_bytes,
+ password_bytes, encoding)
+
+ super(X509Adapter, self).__init__(*args, **kwargs)
+
+ def init_poolmanager(self, *args, **kwargs):
+ if self.ssl_context:
+ kwargs['ssl_context'] = self.ssl_context
+ return super(X509Adapter, self).init_poolmanager(*args, **kwargs)
+
+ def proxy_manager_for(self, *args, **kwargs):
+ if self.ssl_context:
+ kwargs['ssl_context'] = self.ssl_context
+ return super(X509Adapter, self).proxy_manager_for(*args, **kwargs)
+
+ def _import_pyopensslcontext(self):
+ global PyOpenSSLContext
+
+ if requests.__build__ < 0x021200:
+ PyOpenSSLContext = None
+ else:
+ try:
+ from requests.packages.urllib3.contrib.pyopenssl \
+ import PyOpenSSLContext
+ except ImportError:
+ try:
+ from urllib3.contrib.pyopenssl import PyOpenSSLContext
+ except ImportError:
+ PyOpenSSLContext = None
+
+ def _check_version(self):
+ if PyOpenSSLContext is None:
+ raise exc.VersionMismatchError(
+ "The X509Adapter requires at least Requests 2.12.0 to be "
+ "installed. Version {} was found instead.".format(
+ requests.__version__
+ )
+ )
+
+
+def check_cert_dates(cert):
+ """Verify that the supplied client cert is not invalid."""
+
+ now = datetime.utcnow()
+ if cert.not_valid_after < now or cert.not_valid_before > now:
+ raise ValueError('Client certificate expired: Not After: '
+ '{:%Y-%m-%d %H:%M:%SZ} '
+ 'Not Before: {:%Y-%m-%d %H:%M:%SZ}'
+ .format(cert.not_valid_after, cert.not_valid_before))
+
+
+def create_ssl_context(cert_byes, pk_bytes, password=None,
+ encoding=Encoding.PEM):
+ """Create an SSL Context with the supplied cert/password.
+
+ :param cert_bytes array of bytes containing the cert encoded
+ using the method supplied in the ``encoding`` parameter
+ :param pk_bytes array of bytes containing the private key encoded
+ using the method supplied in the ``encoding`` parameter
+ :param password array of bytes containing the passphrase to be used
+ with the supplied private key. None if unencrypted.
+ Defaults to None.
+ :param encoding ``cryptography.hazmat.primitives.serialization.Encoding``
+ details the encoding method used on the ``cert_bytes`` and
+ ``pk_bytes`` parameters. Can be either PEM or DER.
+ Defaults to PEM.
+ """
+ backend = default_backend()
+
+ cert = None
+ key = None
+ if encoding == Encoding.PEM:
+ cert = x509.load_pem_x509_certificate(cert_byes, backend)
+ key = load_pem_private_key(pk_bytes, password, backend)
+ elif encoding == Encoding.DER:
+ cert = x509.load_der_x509_certificate(cert_byes, backend)
+ key = load_der_private_key(pk_bytes, password, backend)
+ else:
+ raise ValueError('Invalid encoding provided: Must be PEM or DER')
+
+ if not (cert and key):
+ raise ValueError('Cert and key could not be parsed from '
+ 'provided data')
+ check_cert_dates(cert)
+ ssl_context = PyOpenSSLContext(PROTOCOL)
+ ssl_context._ctx.use_certificate(X509.from_cryptography(cert))
+ ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key))
+ return ssl_context