diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/requests_toolbelt/adapters')
8 files changed, 770 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/__init__.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/__init__.py new file mode 100644 index 00000000..7195f43e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +""" +requests-toolbelt.adapters +========================== + +See https://toolbelt.readthedocs.io/ for documentation + +:copyright: (c) 2014 by Ian Cordasco and Cory Benfield +:license: Apache v2.0, see LICENSE for more details +""" + +from .ssl import SSLAdapter +from .source import SourceAddressAdapter + +__all__ = ['SSLAdapter', 'SourceAddressAdapter'] diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/appengine.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/appengine.py new file mode 100644 index 00000000..25a70a17 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/appengine.py @@ -0,0 +1,206 @@ +# -*- coding: utf-8 -*- +"""The App Engine Transport Adapter for requests. + +.. versionadded:: 0.6.0 + +This requires a version of requests >= 2.10.0 and Python 2. + +There are two ways to use this library: + +#. If you're using requests directly, you can use code like: + + .. code-block:: python + + >>> import requests + >>> import ssl + >>> import requests.packages.urllib3.contrib.appengine as ul_appengine + >>> from requests_toolbelt.adapters import appengine + >>> s = requests.Session() + >>> if ul_appengine.is_appengine_sandbox(): + ... s.mount('http://', appengine.AppEngineAdapter()) + ... s.mount('https://', appengine.AppEngineAdapter()) + +#. If you depend on external libraries which use requests, you can use code + like: + + .. code-block:: python + + >>> from requests_toolbelt.adapters import appengine + >>> appengine.monkeypatch() + +which will ensure all requests.Session objects use AppEngineAdapter properly. + +You are also able to :ref:`disable certificate validation <insecure_appengine>` +when monkey-patching. +""" +import requests +import warnings +from requests import adapters +from requests import sessions + +from .. import exceptions as exc +from .._compat import gaecontrib +from .._compat import timeout + + +class AppEngineMROHack(adapters.HTTPAdapter): + """Resolves infinite recursion when monkeypatching. + + This works by injecting itself as the base class of both the + :class:`AppEngineAdapter` and Requests' default HTTPAdapter, which needs to + be done because default HTTPAdapter's MRO is recompiled when we + monkeypatch, at which point this class becomes HTTPAdapter's base class. + In addition, we use an instantiation flag to avoid infinite recursion. + """ + _initialized = False + + def __init__(self, *args, **kwargs): + if not self._initialized: + self._initialized = True + super(AppEngineMROHack, self).__init__(*args, **kwargs) + + +class AppEngineAdapter(AppEngineMROHack, adapters.HTTPAdapter): + """The transport adapter for Requests to use urllib3's GAE support. + + Implements Requests's HTTPAdapter API. + + When deploying to Google's App Engine service, some of Requests' + functionality is broken. There is underlying support for GAE in urllib3. + This functionality, however, is opt-in and needs to be enabled explicitly + for Requests to be able to use it. + """ + + __attrs__ = adapters.HTTPAdapter.__attrs__ + ['_validate_certificate'] + + def __init__(self, validate_certificate=True, *args, **kwargs): + _check_version() + self._validate_certificate = validate_certificate + super(AppEngineAdapter, self).__init__(*args, **kwargs) + + def init_poolmanager(self, connections, maxsize, block=False): + self.poolmanager = _AppEnginePoolManager(self._validate_certificate) + + +class InsecureAppEngineAdapter(AppEngineAdapter): + """An always-insecure GAE adapter for Requests. + + This is a variant of the the transport adapter for Requests to use + urllib3's GAE support that does not validate certificates. Use with + caution! + + .. note:: + The ``validate_certificate`` keyword argument will not be honored here + and is not part of the signature because we always force it to + ``False``. + + See :class:`AppEngineAdapter` for further details. + """ + + def __init__(self, *args, **kwargs): + if kwargs.pop("validate_certificate", False): + warnings.warn("Certificate validation cannot be specified on the " + "InsecureAppEngineAdapter, but was present. This " + "will be ignored and certificate validation will " + "remain off.", exc.IgnoringGAECertificateValidation) + + super(InsecureAppEngineAdapter, self).__init__( + validate_certificate=False, *args, **kwargs) + + +class _AppEnginePoolManager(object): + """Implements urllib3's PoolManager API expected by requests. + + While a real PoolManager map hostnames to reusable Connections, + AppEngine has no concept of a reusable connection to a host. + So instead, this class constructs a small Connection per request, + that is returned to the Adapter and used to access the URL. + """ + + def __init__(self, validate_certificate=True): + self.appengine_manager = gaecontrib.AppEngineManager( + validate_certificate=validate_certificate) + + def connection_from_url(self, url): + return _AppEngineConnection(self.appengine_manager, url) + + def clear(self): + pass + + +class _AppEngineConnection(object): + """Implements urllib3's HTTPConnectionPool API's urlopen(). + + This Connection's urlopen() is called with a host-relative path, + so in order to properly support opening the URL, we need to store + the full URL when this Connection is constructed from the PoolManager. + + This code wraps AppEngineManager.urlopen(), which exposes a different + API than in the original urllib3 urlopen(), and thus needs this adapter. + """ + + def __init__(self, appengine_manager, url): + self.appengine_manager = appengine_manager + self.url = url + + def urlopen(self, method, url, body=None, headers=None, retries=None, + redirect=True, assert_same_host=True, + timeout=timeout.Timeout.DEFAULT_TIMEOUT, + pool_timeout=None, release_conn=None, **response_kw): + # This function's url argument is a host-relative URL, + # but the AppEngineManager expects an absolute URL. + # So we saved out the self.url when the AppEngineConnection + # was constructed, which we then can use down below instead. + + # We once tried to verify our assumptions here, but sometimes the + # passed-in URL differs on url fragments, or "http://a.com" vs "/". + + # urllib3's App Engine adapter only uses Timeout.total, not read or + # connect. + if not timeout.total: + timeout.total = timeout._read or timeout._connect + + # Jump through the hoops necessary to call AppEngineManager's API. + return self.appengine_manager.urlopen( + method, + self.url, + body=body, + headers=headers, + retries=retries, + redirect=redirect, + timeout=timeout, + **response_kw) + + +def monkeypatch(validate_certificate=True): + """Sets up all Sessions to use AppEngineAdapter by default. + + If you don't want to deal with configuring your own Sessions, + or if you use libraries that use requests directly (ie requests.post), + then you may prefer to monkeypatch and auto-configure all Sessions. + + .. warning: : + + If ``validate_certificate`` is ``False``, certification validation will + effectively be disabled for all requests. + """ + _check_version() + # HACK: We should consider modifying urllib3 to support this cleanly, + # so that we can set a module-level variable in the sessions module, + # instead of overriding an imported HTTPAdapter as is done here. + adapter = AppEngineAdapter + if not validate_certificate: + adapter = InsecureAppEngineAdapter + + sessions.HTTPAdapter = adapter + adapters.HTTPAdapter = adapter + + +def _check_version(): + if gaecontrib is None: + raise exc.VersionMismatchError( + "The toolbelt requires at least Requests 2.10.0 to be " + "installed. Version {} was found instead.".format( + requests.__version__ + ) + ) diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/fingerprint.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/fingerprint.py new file mode 100644 index 00000000..6645d349 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/fingerprint.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +"""Submodule containing the implementation for the FingerprintAdapter. + +This file contains an implementation of a Transport Adapter that validates +the fingerprints of SSL certificates presented upon connection. +""" +from requests.adapters import HTTPAdapter + +from .._compat import poolmanager + + +class FingerprintAdapter(HTTPAdapter): + """ + A HTTPS Adapter for Python Requests that verifies certificate fingerprints, + instead of certificate hostnames. + + Example usage: + + .. code-block:: python + + import requests + import ssl + from requests_toolbelt.adapters.fingerprint import FingerprintAdapter + + twitter_fingerprint = '...' + s = requests.Session() + s.mount( + 'https://twitter.com', + FingerprintAdapter(twitter_fingerprint) + ) + + The fingerprint should be provided as a hexadecimal string, optionally + containing colons. + """ + + __attrs__ = HTTPAdapter.__attrs__ + ['fingerprint'] + + def __init__(self, fingerprint, **kwargs): + self.fingerprint = fingerprint + + super(FingerprintAdapter, self).__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, block=False): + self.poolmanager = poolmanager.PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + assert_fingerprint=self.fingerprint) diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/host_header_ssl.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/host_header_ssl.py new file mode 100644 index 00000000..f34ed1aa --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/host_header_ssl.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +""" +requests_toolbelt.adapters.host_header_ssl +========================================== + +This file contains an implementation of the HostHeaderSSLAdapter. +""" + +from requests.adapters import HTTPAdapter + + +class HostHeaderSSLAdapter(HTTPAdapter): + """ + A HTTPS Adapter for Python Requests that sets the hostname for certificate + verification based on the Host header. + + This allows requesting the IP address directly via HTTPS without getting + a "hostname doesn't match" exception. + + Example usage: + + >>> s.mount('https://', HostHeaderSSLAdapter()) + >>> s.get("https://93.184.216.34", headers={"Host": "example.org"}) + + """ + + def send(self, request, **kwargs): + # HTTP headers are case-insensitive (RFC 7230) + host_header = None + for header in request.headers: + if header.lower() == "host": + host_header = request.headers[header] + break + + connection_pool_kwargs = self.poolmanager.connection_pool_kw + + if host_header: + connection_pool_kwargs["assert_hostname"] = host_header + elif "assert_hostname" in connection_pool_kwargs: + # an assert_hostname from a previous request may have been left + connection_pool_kwargs.pop("assert_hostname", None) + + return super(HostHeaderSSLAdapter, self).send(request, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py new file mode 100644 index 00000000..86ebe136 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +"""The implementation of the SocketOptionsAdapter.""" +import socket +import warnings +import sys + +import requests +from requests import adapters + +from .._compat import connection +from .._compat import poolmanager +from .. import exceptions as exc + + +class SocketOptionsAdapter(adapters.HTTPAdapter): + """An adapter for requests that allows users to specify socket options. + + Since version 2.4.0 of requests, it is possible to specify a custom list + of socket options that need to be set before establishing the connection. + + Example usage:: + + >>> import socket + >>> import requests + >>> from requests_toolbelt.adapters import socket_options + >>> s = requests.Session() + >>> opts = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0)] + >>> adapter = socket_options.SocketOptionsAdapter(socket_options=opts) + >>> s.mount('http://', adapter) + + You can also take advantage of the list of default options on this class + to keep using the original options in addition to your custom options. In + that case, ``opts`` might look like:: + + >>> opts = socket_options.SocketOptionsAdapter.default_options + opts + + """ + + if connection is not None: + default_options = getattr( + connection.HTTPConnection, + 'default_socket_options', + [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] + ) + else: + default_options = [] + warnings.warn(exc.RequestsVersionTooOld, + "This version of Requests is only compatible with a " + "version of urllib3 which is too old to support " + "setting options on a socket. This adapter is " + "functionally useless.") + + def __init__(self, **kwargs): + self.socket_options = kwargs.pop('socket_options', + self.default_options) + + super(SocketOptionsAdapter, self).__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, block=False): + if requests.__build__ >= 0x020400: + # NOTE(Ian): Perhaps we should raise a warning + self.poolmanager = poolmanager.PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + socket_options=self.socket_options + ) + else: + super(SocketOptionsAdapter, self).init_poolmanager( + connections, maxsize, block + ) + + +class TCPKeepAliveAdapter(SocketOptionsAdapter): + """An adapter for requests that turns on TCP Keep-Alive by default. + + The adapter sets 4 socket options: + + - ``SOL_SOCKET`` ``SO_KEEPALIVE`` - This turns on TCP Keep-Alive + - ``IPPROTO_TCP`` ``TCP_KEEPINTVL`` 20 - Sets the keep alive interval + - ``IPPROTO_TCP`` ``TCP_KEEPCNT`` 5 - Sets the number of keep alive probes + - ``IPPROTO_TCP`` ``TCP_KEEPIDLE`` 60 - Sets the keep alive time if the + socket library has the ``TCP_KEEPIDLE`` constant + + The latter three can be overridden by keyword arguments (respectively): + + - ``interval`` + - ``count`` + - ``idle`` + + You can use this adapter like so:: + + >>> from requests_toolbelt.adapters import socket_options + >>> tcp = socket_options.TCPKeepAliveAdapter(idle=120, interval=10) + >>> s = requests.Session() + >>> s.mount('http://', tcp) + + """ + + def __init__(self, **kwargs): + socket_options = kwargs.pop('socket_options', + SocketOptionsAdapter.default_options) + idle = kwargs.pop('idle', 60) + interval = kwargs.pop('interval', 20) + count = kwargs.pop('count', 5) + socket_options = socket_options + [ + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + ] + + # NOTE(Ian): OSX does not have these constants defined, so we + # set them conditionally. + if getattr(socket, 'TCP_KEEPINTVL', None) is not None: + socket_options += [(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, + interval)] + elif sys.platform == 'darwin': + # On OSX, TCP_KEEPALIVE from netinet/tcp.h is not exported + # by python's socket module + TCP_KEEPALIVE = getattr(socket, 'TCP_KEEPALIVE', 0x10) + socket_options += [(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval)] + + if getattr(socket, 'TCP_KEEPCNT', None) is not None: + socket_options += [(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count)] + + if getattr(socket, 'TCP_KEEPIDLE', None) is not None: + socket_options += [(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)] + + super(TCPKeepAliveAdapter, self).__init__( + socket_options=socket_options, **kwargs + ) diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/source.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/source.py new file mode 100644 index 00000000..d3dda797 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/source.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +""" +requests_toolbelt.source_adapter +================================ + +This file contains an implementation of the SourceAddressAdapter originally +demonstrated on the Requests GitHub page. +""" +from requests.adapters import HTTPAdapter + +from .._compat import poolmanager, basestring + + +class SourceAddressAdapter(HTTPAdapter): + """ + A Source Address Adapter for Python Requests that enables you to choose the + local address to bind to. This allows you to send your HTTP requests from a + specific interface and IP address. + + Two address formats are accepted. The first is a string: this will set the + local IP address to the address given in the string, and will also choose a + semi-random high port for the local port number. + + The second is a two-tuple of the form (ip address, port): for example, + ``('10.10.10.10', 8999)``. This will set the local IP address to the first + element, and the local port to the second element. If ``0`` is used as the + port number, a semi-random high port will be selected. + + .. warning:: Setting an explicit local port can have negative interactions + with connection-pooling in Requests: in particular, it risks + the possibility of getting "Address in use" errors. The + string-only argument is generally preferred to the tuple-form. + + Example usage: + + .. code-block:: python + + import requests + from requests_toolbelt.adapters.source import SourceAddressAdapter + + s = requests.Session() + s.mount('http://', SourceAddressAdapter('10.10.10.10')) + s.mount('https://', SourceAddressAdapter(('10.10.10.10', 8999))) + """ + def __init__(self, source_address, **kwargs): + if isinstance(source_address, basestring): + self.source_address = (source_address, 0) + elif isinstance(source_address, tuple): + self.source_address = source_address + else: + raise TypeError( + "source_address must be IP address string or (ip, port) tuple" + ) + + super(SourceAddressAdapter, self).__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, block=False): + self.poolmanager = poolmanager.PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + source_address=self.source_address) + + def proxy_manager_for(self, *args, **kwargs): + kwargs['source_address'] = self.source_address + return super(SourceAddressAdapter, self).proxy_manager_for( + *args, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/ssl.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/ssl.py new file mode 100644 index 00000000..c4a76ae4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/ssl.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +""" + +requests_toolbelt.ssl_adapter +============================= + +This file contains an implementation of the SSLAdapter originally demonstrated +in this blog post: +https://lukasa.co.uk/2013/01/Choosing_SSL_Version_In_Requests/ + +""" +import requests + +from requests.adapters import HTTPAdapter + +from .._compat import poolmanager + + +class SSLAdapter(HTTPAdapter): + """ + A HTTPS Adapter for Python Requests that allows the choice of the SSL/TLS + version negotiated by Requests. This can be used either to enforce the + choice of high-security TLS versions (where supported), or to work around + misbehaving servers that fail to correctly negotiate the default TLS + version being offered. + + Example usage: + + >>> import requests + >>> import ssl + >>> from requests_toolbelt import SSLAdapter + >>> s = requests.Session() + >>> s.mount('https://', SSLAdapter(ssl.PROTOCOL_TLSv1)) + + You can replace the chosen protocol with any that are available in the + default Python SSL module. All subsequent requests that match the adapter + prefix will use the chosen SSL version instead of the default. + + This adapter will also attempt to change the SSL/TLS version negotiated by + Requests when using a proxy. However, this may not always be possible: + prior to Requests v2.4.0 the adapter did not have access to the proxy setup + code. In earlier versions of Requests, this adapter will not function + properly when used with proxies. + """ + + __attrs__ = HTTPAdapter.__attrs__ + ['ssl_version'] + + def __init__(self, ssl_version=None, **kwargs): + self.ssl_version = ssl_version + + super(SSLAdapter, self).__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, block=False): + self.poolmanager = poolmanager.PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + ssl_version=self.ssl_version) + + if requests.__build__ >= 0x020400: + # Earlier versions of requests either don't have this method or, worse, + # don't allow passing arbitrary keyword arguments. As a result, only + # conditionally define this method. + def proxy_manager_for(self, *args, **kwargs): + kwargs['ssl_version'] = self.ssl_version + return super(SSLAdapter, self).proxy_manager_for(*args, **kwargs) diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/x509.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/x509.py new file mode 100644 index 00000000..aff37706 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/adapters/x509.py @@ -0,0 +1,196 @@ +# -*- coding: utf-8 -*- +"""A X509Adapter for use with the requests library. + +This file contains an implementation of the X509Adapter that will +allow users to authenticate a request using an arbitrary +X.509 certificate without needing to convert it to a .pem file + +""" + +from OpenSSL.crypto import PKey, X509 +from cryptography import x509 +from cryptography.hazmat.primitives.serialization import (load_pem_private_key, + load_der_private_key) +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.hazmat.backends import default_backend + +from datetime import datetime +from requests.adapters import HTTPAdapter +import requests + +from .. import exceptions as exc + +""" +importing the protocol constants from _ssl instead of ssl because only the +constants are needed and to handle issues caused by importing from ssl on +the 2.7.x line. +""" +try: + from _ssl import PROTOCOL_TLS as PROTOCOL +except ImportError: + from _ssl import PROTOCOL_SSLv23 as PROTOCOL + + +PyOpenSSLContext = None + + +class X509Adapter(HTTPAdapter): + r"""Adapter for use with X.509 certificates. + + Provides an interface for Requests sessions to contact HTTPS urls and + authenticate with an X.509 cert by implementing the Transport Adapter + interface. This class will need to be manually instantiated and mounted + to the session + + :param pool_connections: The number of urllib3 connection pools to + cache. + :param pool_maxsize: The maximum number of connections to save in the + pool. + :param max_retries: The maximum number of retries each connection + should attempt. Note, this applies only to failed DNS lookups, + socket connections and connection timeouts, never to requests where + data has made it to the server. By default, Requests does not retry + failed connections. If you need granular control over the + conditions under which we retry a request, import urllib3's + ``Retry`` class and pass that instead. + :param pool_block: Whether the connection pool should block for + connections. + + :param bytes cert_bytes: + bytes object containing contents of a cryptography.x509Certificate + object using the encoding specified by the ``encoding`` parameter. + :param bytes pk_bytes: + bytes object containing contents of a object that implements + ``cryptography.hazmat.primitives.serialization.PrivateFormat`` + using the encoding specified by the ``encoding`` parameter. + :param password: + string or utf8 encoded bytes containing the passphrase used for the + private key. None if unencrypted. Defaults to None. + :param encoding: + Enumeration detailing the encoding method used on the ``cert_bytes`` + parameter. Can be either PEM or DER. Defaults to PEM. + :type encoding: + :class: `cryptography.hazmat.primitives.serialization.Encoding` + + Usage:: + + >>> import requests + >>> from requests_toolbelt.adapters.x509 import X509Adapter + >>> s = requests.Session() + >>> a = X509Adapter(max_retries=3, + cert_bytes=b'...', pk_bytes=b'...', encoding='...' + >>> s.mount('https://', a) + """ + + def __init__(self, *args, **kwargs): + self._import_pyopensslcontext() + self._check_version() + cert_bytes = kwargs.pop('cert_bytes', None) + pk_bytes = kwargs.pop('pk_bytes', None) + password = kwargs.pop('password', None) + encoding = kwargs.pop('encoding', Encoding.PEM) + + password_bytes = None + + if cert_bytes is None or not isinstance(cert_bytes, bytes): + raise ValueError('Invalid cert content provided. ' + 'You must provide an X.509 cert ' + 'formatted as a byte array.') + if pk_bytes is None or not isinstance(pk_bytes, bytes): + raise ValueError('Invalid private key content provided. ' + 'You must provide a private key ' + 'formatted as a byte array.') + + if isinstance(password, bytes): + password_bytes = password + elif password: + password_bytes = password.encode('utf8') + + self.ssl_context = create_ssl_context(cert_bytes, pk_bytes, + password_bytes, encoding) + + super(X509Adapter, self).__init__(*args, **kwargs) + + def init_poolmanager(self, *args, **kwargs): + if self.ssl_context: + kwargs['ssl_context'] = self.ssl_context + return super(X509Adapter, self).init_poolmanager(*args, **kwargs) + + def proxy_manager_for(self, *args, **kwargs): + if self.ssl_context: + kwargs['ssl_context'] = self.ssl_context + return super(X509Adapter, self).proxy_manager_for(*args, **kwargs) + + def _import_pyopensslcontext(self): + global PyOpenSSLContext + + if requests.__build__ < 0x021200: + PyOpenSSLContext = None + else: + try: + from requests.packages.urllib3.contrib.pyopenssl \ + import PyOpenSSLContext + except ImportError: + try: + from urllib3.contrib.pyopenssl import PyOpenSSLContext + except ImportError: + PyOpenSSLContext = None + + def _check_version(self): + if PyOpenSSLContext is None: + raise exc.VersionMismatchError( + "The X509Adapter requires at least Requests 2.12.0 to be " + "installed. Version {} was found instead.".format( + requests.__version__ + ) + ) + + +def check_cert_dates(cert): + """Verify that the supplied client cert is not invalid.""" + + now = datetime.utcnow() + if cert.not_valid_after < now or cert.not_valid_before > now: + raise ValueError('Client certificate expired: Not After: ' + '{:%Y-%m-%d %H:%M:%SZ} ' + 'Not Before: {:%Y-%m-%d %H:%M:%SZ}' + .format(cert.not_valid_after, cert.not_valid_before)) + + +def create_ssl_context(cert_byes, pk_bytes, password=None, + encoding=Encoding.PEM): + """Create an SSL Context with the supplied cert/password. + + :param cert_bytes array of bytes containing the cert encoded + using the method supplied in the ``encoding`` parameter + :param pk_bytes array of bytes containing the private key encoded + using the method supplied in the ``encoding`` parameter + :param password array of bytes containing the passphrase to be used + with the supplied private key. None if unencrypted. + Defaults to None. + :param encoding ``cryptography.hazmat.primitives.serialization.Encoding`` + details the encoding method used on the ``cert_bytes`` and + ``pk_bytes`` parameters. Can be either PEM or DER. + Defaults to PEM. + """ + backend = default_backend() + + cert = None + key = None + if encoding == Encoding.PEM: + cert = x509.load_pem_x509_certificate(cert_byes, backend) + key = load_pem_private_key(pk_bytes, password, backend) + elif encoding == Encoding.DER: + cert = x509.load_der_x509_certificate(cert_byes, backend) + key = load_der_private_key(pk_bytes, password, backend) + else: + raise ValueError('Invalid encoding provided: Must be PEM or DER') + + if not (cert and key): + raise ValueError('Cert and key could not be parsed from ' + 'provided data') + check_cert_dates(cert) + ssl_context = PyOpenSSLContext(PROTOCOL) + ssl_context._ctx.use_certificate(X509.from_cryptography(cert)) + ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key)) + return ssl_context |