aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/requests_toolbelt/multipart')
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/__init__.py31
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/decoder.py156
-rw-r--r--.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/encoder.py655
3 files changed, 842 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/__init__.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/__init__.py
new file mode 100644
index 00000000..d3bced1c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/__init__.py
@@ -0,0 +1,31 @@
+"""
+requests_toolbelt.multipart
+===========================
+
+See https://toolbelt.readthedocs.io/ for documentation
+
+:copyright: (c) 2014 by Ian Cordasco and Cory Benfield
+:license: Apache v2.0, see LICENSE for more details
+"""
+
+from .encoder import MultipartEncoder, MultipartEncoderMonitor
+from .decoder import MultipartDecoder
+from .decoder import ImproperBodyPartContentException
+from .decoder import NonMultipartContentTypeException
+
+__title__ = 'requests-toolbelt'
+__authors__ = 'Ian Cordasco, Cory Benfield'
+__license__ = 'Apache v2.0'
+__copyright__ = 'Copyright 2014 Ian Cordasco, Cory Benfield'
+
+__all__ = [
+ 'MultipartEncoder',
+ 'MultipartEncoderMonitor',
+ 'MultipartDecoder',
+ 'ImproperBodyPartContentException',
+ 'NonMultipartContentTypeException',
+ '__title__',
+ '__authors__',
+ '__license__',
+ '__copyright__',
+]
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/decoder.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/decoder.py
new file mode 100644
index 00000000..2a0d1c46
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/decoder.py
@@ -0,0 +1,156 @@
+# -*- coding: utf-8 -*-
+"""
+
+requests_toolbelt.multipart.decoder
+===================================
+
+This holds all the implementation details of the MultipartDecoder
+
+"""
+
+import sys
+import email.parser
+from .encoder import encode_with
+from requests.structures import CaseInsensitiveDict
+
+
+def _split_on_find(content, bound):
+ point = content.find(bound)
+ return content[:point], content[point + len(bound):]
+
+
+class ImproperBodyPartContentException(Exception):
+ pass
+
+
+class NonMultipartContentTypeException(Exception):
+ pass
+
+
+def _header_parser(string, encoding):
+ major = sys.version_info[0]
+ if major == 3:
+ string = string.decode(encoding)
+ headers = email.parser.HeaderParser().parsestr(string).items()
+ return (
+ (encode_with(k, encoding), encode_with(v, encoding))
+ for k, v in headers
+ )
+
+
+class BodyPart(object):
+ """
+
+ The ``BodyPart`` object is a ``Response``-like interface to an individual
+ subpart of a multipart response. It is expected that these will
+ generally be created by objects of the ``MultipartDecoder`` class.
+
+ Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers,
+ ``content`` to access bytes, ``text`` to access unicode, and ``encoding``
+ to access the unicode codec.
+
+ """
+
+ def __init__(self, content, encoding):
+ self.encoding = encoding
+ headers = {}
+ # Split into header section (if any) and the content
+ if b'\r\n\r\n' in content:
+ first, self.content = _split_on_find(content, b'\r\n\r\n')
+ if first != b'':
+ headers = _header_parser(first.lstrip(), encoding)
+ else:
+ raise ImproperBodyPartContentException(
+ 'content does not contain CR-LF-CR-LF'
+ )
+ self.headers = CaseInsensitiveDict(headers)
+
+ @property
+ def text(self):
+ """Content of the ``BodyPart`` in unicode."""
+ return self.content.decode(self.encoding)
+
+
+class MultipartDecoder(object):
+ """
+
+ The ``MultipartDecoder`` object parses the multipart payload of
+ a bytestring into a tuple of ``Response``-like ``BodyPart`` objects.
+
+ The basic usage is::
+
+ import requests
+ from requests_toolbelt import MultipartDecoder
+
+ response = requests.get(url)
+ decoder = MultipartDecoder.from_response(response)
+ for part in decoder.parts:
+ print(part.headers['content-type'])
+
+ If the multipart content is not from a response, basic usage is::
+
+ from requests_toolbelt import MultipartDecoder
+
+ decoder = MultipartDecoder(content, content_type)
+ for part in decoder.parts:
+ print(part.headers['content-type'])
+
+ For both these usages, there is an optional ``encoding`` parameter. This is
+ a string, which is the name of the unicode codec to use (default is
+ ``'utf-8'``).
+
+ """
+ def __init__(self, content, content_type, encoding='utf-8'):
+ #: Original Content-Type header
+ self.content_type = content_type
+ #: Response body encoding
+ self.encoding = encoding
+ #: Parsed parts of the multipart response body
+ self.parts = tuple()
+ self._find_boundary()
+ self._parse_body(content)
+
+ def _find_boundary(self):
+ ct_info = tuple(x.strip() for x in self.content_type.split(';'))
+ mimetype = ct_info[0]
+ if mimetype.split('/')[0].lower() != 'multipart':
+ raise NonMultipartContentTypeException(
+ "Unexpected mimetype in content-type: '{}'".format(mimetype)
+ )
+ for item in ct_info[1:]:
+ attr, value = _split_on_find(
+ item,
+ '='
+ )
+ if attr.lower() == 'boundary':
+ self.boundary = encode_with(value.strip('"'), self.encoding)
+
+ @staticmethod
+ def _fix_first_part(part, boundary_marker):
+ bm_len = len(boundary_marker)
+ if boundary_marker == part[:bm_len]:
+ return part[bm_len:]
+ else:
+ return part
+
+ def _parse_body(self, content):
+ boundary = b''.join((b'--', self.boundary))
+
+ def body_part(part):
+ fixed = MultipartDecoder._fix_first_part(part, boundary)
+ return BodyPart(fixed, self.encoding)
+
+ def test_part(part):
+ return (part != b'' and
+ part != b'\r\n' and
+ part[:4] != b'--\r\n' and
+ part != b'--')
+
+ parts = content.split(b''.join((b'\r\n', boundary)))
+ self.parts = tuple(body_part(x) for x in parts if test_part(x))
+
+ @classmethod
+ def from_response(cls, response, encoding='utf-8'):
+ content = response.content
+ content_type = response.headers.get('content-type', None)
+ return cls(content, content_type, encoding)
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/encoder.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/encoder.py
new file mode 100644
index 00000000..2d539617
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/multipart/encoder.py
@@ -0,0 +1,655 @@
+# -*- coding: utf-8 -*-
+"""
+
+requests_toolbelt.multipart.encoder
+===================================
+
+This holds all of the implementation details of the MultipartEncoder
+
+"""
+import contextlib
+import io
+import os
+from uuid import uuid4
+
+import requests
+
+from .._compat import fields
+
+
+class FileNotSupportedError(Exception):
+ """File not supported error."""
+
+
+class MultipartEncoder(object):
+
+ """
+
+ The ``MultipartEncoder`` object is a generic interface to the engine that
+ will create a ``multipart/form-data`` body for you.
+
+ The basic usage is:
+
+ .. code-block:: python
+
+ import requests
+ from requests_toolbelt import MultipartEncoder
+
+ encoder = MultipartEncoder({'field': 'value',
+ 'other_field': 'other_value'})
+ r = requests.post('https://httpbin.org/post', data=encoder,
+ headers={'Content-Type': encoder.content_type})
+
+ If you do not need to take advantage of streaming the post body, you can
+ also do:
+
+ .. code-block:: python
+
+ r = requests.post('https://httpbin.org/post',
+ data=encoder.to_string(),
+ headers={'Content-Type': encoder.content_type})
+
+ If you want the encoder to use a specific order, you can use an
+ OrderedDict or more simply, a list of tuples:
+
+ .. code-block:: python
+
+ encoder = MultipartEncoder([('field', 'value'),
+ ('other_field', 'other_value')])
+
+ .. versionchanged:: 0.4.0
+
+ You can also provide tuples as part values as you would provide them to
+ requests' ``files`` parameter.
+
+ .. code-block:: python
+
+ encoder = MultipartEncoder({
+ 'field': ('file_name', b'{"a": "b"}', 'application/json',
+ {'X-My-Header': 'my-value'})
+ ])
+
+ .. warning::
+
+ This object will end up directly in :mod:`httplib`. Currently,
+ :mod:`httplib` has a hard-coded read size of **8192 bytes**. This
+ means that it will loop until the file has been read and your upload
+ could take a while. This is **not** a bug in requests. A feature is
+ being considered for this object to allow you, the user, to specify
+ what size should be returned on a read. If you have opinions on this,
+ please weigh in on `this issue`_.
+
+ .. _this issue:
+ https://github.com/requests/toolbelt/issues/75
+
+ """
+
+ def __init__(self, fields, boundary=None, encoding='utf-8'):
+ #: Boundary value either passed in by the user or created
+ self.boundary_value = boundary or uuid4().hex
+
+ # Computed boundary
+ self.boundary = '--{}'.format(self.boundary_value)
+
+ #: Encoding of the data being passed in
+ self.encoding = encoding
+
+ # Pre-encoded boundary
+ self._encoded_boundary = b''.join([
+ encode_with(self.boundary, self.encoding),
+ encode_with('\r\n', self.encoding)
+ ])
+
+ #: Fields provided by the user
+ self.fields = fields
+
+ #: Whether or not the encoder is finished
+ self.finished = False
+
+ #: Pre-computed parts of the upload
+ self.parts = []
+
+ # Pre-computed parts iterator
+ self._iter_parts = iter([])
+
+ # The part we're currently working with
+ self._current_part = None
+
+ # Cached computation of the body's length
+ self._len = None
+
+ # Our buffer
+ self._buffer = CustomBytesIO(encoding=encoding)
+
+ # Pre-compute each part's headers
+ self._prepare_parts()
+
+ # Load boundary into buffer
+ self._write_boundary()
+
+ @property
+ def len(self):
+ """Length of the multipart/form-data body.
+
+ requests will first attempt to get the length of the body by calling
+ ``len(body)`` and then by checking for the ``len`` attribute.
+
+ On 32-bit systems, the ``__len__`` method cannot return anything
+ larger than an integer (in C) can hold. If the total size of the body
+ is even slightly larger than 4GB users will see an OverflowError. This
+ manifested itself in `bug #80`_.
+
+ As such, we now calculate the length lazily as a property.
+
+ .. _bug #80:
+ https://github.com/requests/toolbelt/issues/80
+ """
+ # If _len isn't already calculated, calculate, return, and set it
+ return self._len or self._calculate_length()
+
+ def __repr__(self):
+ return '<MultipartEncoder: {!r}>'.format(self.fields)
+
+ def _calculate_length(self):
+ """
+ This uses the parts to calculate the length of the body.
+
+ This returns the calculated length so __len__ can be lazy.
+ """
+ boundary_len = len(self.boundary) # Length of --{boundary}
+ # boundary length + header length + body length + len('\r\n') * 2
+ self._len = sum(
+ (boundary_len + total_len(p) + 4) for p in self.parts
+ ) + boundary_len + 4
+ return self._len
+
+ def _calculate_load_amount(self, read_size):
+ """This calculates how many bytes need to be added to the buffer.
+
+ When a consumer read's ``x`` from the buffer, there are two cases to
+ satisfy:
+
+ 1. Enough data in the buffer to return the requested amount
+ 2. Not enough data
+
+ This function uses the amount of unread bytes in the buffer and
+ determines how much the Encoder has to load before it can return the
+ requested amount of bytes.
+
+ :param int read_size: the number of bytes the consumer requests
+ :returns: int -- the number of bytes that must be loaded into the
+ buffer before the read can be satisfied. This will be strictly
+ non-negative
+ """
+ amount = read_size - total_len(self._buffer)
+ return amount if amount > 0 else 0
+
+ def _load(self, amount):
+ """Load ``amount`` number of bytes into the buffer."""
+ self._buffer.smart_truncate()
+ part = self._current_part or self._next_part()
+ while amount == -1 or amount > 0:
+ written = 0
+ if part and not part.bytes_left_to_write():
+ written += self._write(b'\r\n')
+ written += self._write_boundary()
+ part = self._next_part()
+
+ if not part:
+ written += self._write_closing_boundary()
+ self.finished = True
+ break
+
+ written += part.write_to(self._buffer, amount)
+
+ if amount != -1:
+ amount -= written
+
+ def _next_part(self):
+ try:
+ p = self._current_part = next(self._iter_parts)
+ except StopIteration:
+ p = None
+ return p
+
+ def _iter_fields(self):
+ _fields = self.fields
+ if hasattr(self.fields, 'items'):
+ _fields = list(self.fields.items())
+ for k, v in _fields:
+ file_name = None
+ file_type = None
+ file_headers = None
+ if isinstance(v, (list, tuple)):
+ if len(v) == 2:
+ file_name, file_pointer = v
+ elif len(v) == 3:
+ file_name, file_pointer, file_type = v
+ else:
+ file_name, file_pointer, file_type, file_headers = v
+ else:
+ file_pointer = v
+
+ field = fields.RequestField(name=k, data=file_pointer,
+ filename=file_name,
+ headers=file_headers)
+ field.make_multipart(content_type=file_type)
+ yield field
+
+ def _prepare_parts(self):
+ """This uses the fields provided by the user and creates Part objects.
+
+ It populates the `parts` attribute and uses that to create a
+ generator for iteration.
+ """
+ enc = self.encoding
+ self.parts = [Part.from_field(f, enc) for f in self._iter_fields()]
+ self._iter_parts = iter(self.parts)
+
+ def _write(self, bytes_to_write):
+ """Write the bytes to the end of the buffer.
+
+ :param bytes bytes_to_write: byte-string (or bytearray) to append to
+ the buffer
+ :returns: int -- the number of bytes written
+ """
+ return self._buffer.append(bytes_to_write)
+
+ def _write_boundary(self):
+ """Write the boundary to the end of the buffer."""
+ return self._write(self._encoded_boundary)
+
+ def _write_closing_boundary(self):
+ """Write the bytes necessary to finish a multipart/form-data body."""
+ with reset(self._buffer):
+ self._buffer.seek(-2, 2)
+ self._buffer.write(b'--\r\n')
+ return 2
+
+ def _write_headers(self, headers):
+ """Write the current part's headers to the buffer."""
+ return self._write(encode_with(headers, self.encoding))
+
+ @property
+ def content_type(self):
+ return str(
+ 'multipart/form-data; boundary={}'.format(self.boundary_value)
+ )
+
+ def to_string(self):
+ """Return the entirety of the data in the encoder.
+
+ .. note::
+
+ This simply reads all of the data it can. If you have started
+ streaming or reading data from the encoder, this method will only
+ return whatever data is left in the encoder.
+
+ .. note::
+
+ This method affects the internal state of the encoder. Calling
+ this method will exhaust the encoder.
+
+ :returns: the multipart message
+ :rtype: bytes
+ """
+
+ return self.read()
+
+ def read(self, size=-1):
+ """Read data from the streaming encoder.
+
+ :param int size: (optional), If provided, ``read`` will return exactly
+ that many bytes. If it is not provided, it will return the
+ remaining bytes.
+ :returns: bytes
+ """
+ if self.finished:
+ return self._buffer.read(size)
+
+ bytes_to_load = size
+ if bytes_to_load != -1 and bytes_to_load is not None:
+ bytes_to_load = self._calculate_load_amount(int(size))
+
+ self._load(bytes_to_load)
+ return self._buffer.read(size)
+
+
+def IDENTITY(monitor):
+ return monitor
+
+
+class MultipartEncoderMonitor(object):
+
+ """
+ An object used to monitor the progress of a :class:`MultipartEncoder`.
+
+ The :class:`MultipartEncoder` should only be responsible for preparing and
+ streaming the data. For anyone who wishes to monitor it, they shouldn't be
+ using that instance to manage that as well. Using this class, they can
+ monitor an encoder and register a callback. The callback receives the
+ instance of the monitor.
+
+ To use this monitor, you construct your :class:`MultipartEncoder` as you
+ normally would.
+
+ .. code-block:: python
+
+ from requests_toolbelt import (MultipartEncoder,
+ MultipartEncoderMonitor)
+ import requests
+
+ def callback(monitor):
+ # Do something with this information
+ pass
+
+ m = MultipartEncoder(fields={'field0': 'value0'})
+ monitor = MultipartEncoderMonitor(m, callback)
+ headers = {'Content-Type': monitor.content_type}
+ r = requests.post('https://httpbin.org/post', data=monitor,
+ headers=headers)
+
+ Alternatively, if your use case is very simple, you can use the following
+ pattern.
+
+ .. code-block:: python
+
+ from requests_toolbelt import MultipartEncoderMonitor
+ import requests
+
+ def callback(monitor):
+ # Do something with this information
+ pass
+
+ monitor = MultipartEncoderMonitor.from_fields(
+ fields={'field0': 'value0'}, callback
+ )
+ headers = {'Content-Type': montior.content_type}
+ r = requests.post('https://httpbin.org/post', data=monitor,
+ headers=headers)
+
+ """
+
+ def __init__(self, encoder, callback=None):
+ #: Instance of the :class:`MultipartEncoder` being monitored
+ self.encoder = encoder
+
+ #: Optionally function to call after a read
+ self.callback = callback or IDENTITY
+
+ #: Number of bytes already read from the :class:`MultipartEncoder`
+ #: instance
+ self.bytes_read = 0
+
+ #: Avoid the same problem in bug #80
+ self.len = self.encoder.len
+
+ @classmethod
+ def from_fields(cls, fields, boundary=None, encoding='utf-8',
+ callback=None):
+ encoder = MultipartEncoder(fields, boundary, encoding)
+ return cls(encoder, callback)
+
+ @property
+ def content_type(self):
+ return self.encoder.content_type
+
+ def to_string(self):
+ return self.read()
+
+ def read(self, size=-1):
+ string = self.encoder.read(size)
+ self.bytes_read += len(string)
+ self.callback(self)
+ return string
+
+
+def encode_with(string, encoding):
+ """Encoding ``string`` with ``encoding`` if necessary.
+
+ :param str string: If string is a bytes object, it will not encode it.
+ Otherwise, this function will encode it with the provided encoding.
+ :param str encoding: The encoding with which to encode string.
+ :returns: encoded bytes object
+ """
+ if not (string is None or isinstance(string, bytes)):
+ return string.encode(encoding)
+ return string
+
+
+def readable_data(data, encoding):
+ """Coerce the data to an object with a ``read`` method."""
+ if hasattr(data, 'read'):
+ return data
+
+ return CustomBytesIO(data, encoding)
+
+
+def total_len(o):
+ if hasattr(o, '__len__'):
+ return len(o)
+
+ if hasattr(o, 'len'):
+ return o.len
+
+ if hasattr(o, 'fileno'):
+ try:
+ fileno = o.fileno()
+ except io.UnsupportedOperation:
+ pass
+ else:
+ return os.fstat(fileno).st_size
+
+ if hasattr(o, 'getvalue'):
+ # e.g. BytesIO, cStringIO.StringIO
+ return len(o.getvalue())
+
+
+@contextlib.contextmanager
+def reset(buffer):
+ """Keep track of the buffer's current position and write to the end.
+
+ This is a context manager meant to be used when adding data to the buffer.
+ It eliminates the need for every function to be concerned with the
+ position of the cursor in the buffer.
+ """
+ original_position = buffer.tell()
+ buffer.seek(0, 2)
+ yield
+ buffer.seek(original_position, 0)
+
+
+def coerce_data(data, encoding):
+ """Ensure that every object's __len__ behaves uniformly."""
+ if not isinstance(data, CustomBytesIO):
+ if hasattr(data, 'getvalue'):
+ return CustomBytesIO(data.getvalue(), encoding)
+
+ if hasattr(data, 'fileno'):
+ return FileWrapper(data)
+
+ if not hasattr(data, 'read'):
+ return CustomBytesIO(data, encoding)
+
+ return data
+
+
+def to_list(fields):
+ if hasattr(fields, 'items'):
+ return list(fields.items())
+ return list(fields)
+
+
+class Part(object):
+ def __init__(self, headers, body):
+ self.headers = headers
+ self.body = body
+ self.headers_unread = True
+ self.len = len(self.headers) + total_len(self.body)
+
+ @classmethod
+ def from_field(cls, field, encoding):
+ """Create a part from a Request Field generated by urllib3."""
+ headers = encode_with(field.render_headers(), encoding)
+ body = coerce_data(field.data, encoding)
+ return cls(headers, body)
+
+ def bytes_left_to_write(self):
+ """Determine if there are bytes left to write.
+
+ :returns: bool -- ``True`` if there are bytes left to write, otherwise
+ ``False``
+ """
+ to_read = 0
+ if self.headers_unread:
+ to_read += len(self.headers)
+
+ return (to_read + total_len(self.body)) > 0
+
+ def write_to(self, buffer, size):
+ """Write the requested amount of bytes to the buffer provided.
+
+ The number of bytes written may exceed size on the first read since we
+ load the headers ambitiously.
+
+ :param CustomBytesIO buffer: buffer we want to write bytes to
+ :param int size: number of bytes requested to be written to the buffer
+ :returns: int -- number of bytes actually written
+ """
+ written = 0
+ if self.headers_unread:
+ written += buffer.append(self.headers)
+ self.headers_unread = False
+
+ while total_len(self.body) > 0 and (size == -1 or written < size):
+ amount_to_read = size
+ if size != -1:
+ amount_to_read = size - written
+ written += buffer.append(self.body.read(amount_to_read))
+
+ return written
+
+
+class CustomBytesIO(io.BytesIO):
+ def __init__(self, buffer=None, encoding='utf-8'):
+ buffer = encode_with(buffer, encoding)
+ super(CustomBytesIO, self).__init__(buffer)
+
+ def _get_end(self):
+ current_pos = self.tell()
+ self.seek(0, 2)
+ length = self.tell()
+ self.seek(current_pos, 0)
+ return length
+
+ @property
+ def len(self):
+ length = self._get_end()
+ return length - self.tell()
+
+ def append(self, bytes):
+ with reset(self):
+ written = self.write(bytes)
+ return written
+
+ def smart_truncate(self):
+ to_be_read = total_len(self)
+ already_read = self._get_end() - to_be_read
+
+ if already_read >= to_be_read:
+ old_bytes = self.read()
+ self.seek(0, 0)
+ self.truncate()
+ self.write(old_bytes)
+ self.seek(0, 0) # We want to be at the beginning
+
+
+class FileWrapper(object):
+ def __init__(self, file_object):
+ self.fd = file_object
+
+ @property
+ def len(self):
+ return total_len(self.fd) - self.fd.tell()
+
+ def read(self, length=-1):
+ return self.fd.read(length)
+
+
+class FileFromURLWrapper(object):
+ """File from URL wrapper.
+
+ The :class:`FileFromURLWrapper` object gives you the ability to stream file
+ from provided URL in chunks by :class:`MultipartEncoder`.
+ Provide a stateless solution for streaming file from one server to another.
+ You can use the :class:`FileFromURLWrapper` without a session or with
+ a session as demonstated by the examples below:
+
+ .. code-block:: python
+ # no session
+
+ import requests
+ from requests_toolbelt import MultipartEncoder, FileFromURLWrapper
+
+ url = 'https://httpbin.org/image/png'
+ streaming_encoder = MultipartEncoder(
+ fields={
+ 'file': FileFromURLWrapper(url)
+ }
+ )
+ r = requests.post(
+ 'https://httpbin.org/post', data=streaming_encoder,
+ headers={'Content-Type': streaming_encoder.content_type}
+ )
+
+ .. code-block:: python
+ # using a session
+
+ import requests
+ from requests_toolbelt import MultipartEncoder, FileFromURLWrapper
+
+ session = requests.Session()
+ url = 'https://httpbin.org/image/png'
+ streaming_encoder = MultipartEncoder(
+ fields={
+ 'file': FileFromURLWrapper(url, session=session)
+ }
+ )
+ r = session.post(
+ 'https://httpbin.org/post', data=streaming_encoder,
+ headers={'Content-Type': streaming_encoder.content_type}
+ )
+
+ """
+
+ def __init__(self, file_url, session=None):
+ self.session = session or requests.Session()
+ requested_file = self._request_for_file(file_url)
+ self.len = int(requested_file.headers['content-length'])
+ self.raw_data = requested_file.raw
+
+ def _request_for_file(self, file_url):
+ """Make call for file under provided URL."""
+ response = self.session.get(file_url, stream=True)
+ content_length = response.headers.get('content-length', None)
+ if content_length is None:
+ error_msg = (
+ "Data from provided URL {url} is not supported. Lack of "
+ "content-length Header in requested file response.".format(
+ url=file_url)
+ )
+ raise FileNotSupportedError(error_msg)
+ elif not content_length.isdigit():
+ error_msg = (
+ "Data from provided URL {url} is not supported. content-length"
+ " header value is not a digit.".format(url=file_url)
+ )
+ raise FileNotSupportedError(error_msg)
+ return response
+
+ def read(self, chunk_size):
+ """Read file in chunks."""
+ chunk_size = chunk_size if chunk_size >= 0 else self.len
+ chunk = self.raw_data.read(chunk_size) or b''
+ self.len -= len(chunk) if chunk else 0 # left to read
+ return chunk