about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiohttp/formdata.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiohttp/formdata.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/formdata.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/formdata.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/formdata.py b/.venv/lib/python3.12/site-packages/aiohttp/formdata.py
new file mode 100644
index 00000000..73056f4b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/formdata.py
@@ -0,0 +1,182 @@
+import io
+import warnings
+from typing import Any, Iterable, List, Optional
+from urllib.parse import urlencode
+
+from multidict import MultiDict, MultiDictProxy
+
+from . import hdrs, multipart, payload
+from .helpers import guess_filename
+from .payload import Payload
+
+__all__ = ("FormData",)
+
+
+class FormData:
+    """Helper class for form body generation.
+
+    Supports multipart/form-data and application/x-www-form-urlencoded.
+    """
+
+    def __init__(
+        self,
+        fields: Iterable[Any] = (),
+        quote_fields: bool = True,
+        charset: Optional[str] = None,
+        *,
+        default_to_multipart: bool = False,
+    ) -> None:
+        self._writer = multipart.MultipartWriter("form-data")
+        self._fields: List[Any] = []
+        self._is_multipart = default_to_multipart
+        self._is_processed = False
+        self._quote_fields = quote_fields
+        self._charset = charset
+
+        if isinstance(fields, dict):
+            fields = list(fields.items())
+        elif not isinstance(fields, (list, tuple)):
+            fields = (fields,)
+        self.add_fields(*fields)
+
+    @property
+    def is_multipart(self) -> bool:
+        return self._is_multipart
+
+    def add_field(
+        self,
+        name: str,
+        value: Any,
+        *,
+        content_type: Optional[str] = None,
+        filename: Optional[str] = None,
+        content_transfer_encoding: Optional[str] = None,
+    ) -> None:
+
+        if isinstance(value, io.IOBase):
+            self._is_multipart = True
+        elif isinstance(value, (bytes, bytearray, memoryview)):
+            msg = (
+                "In v4, passing bytes will no longer create a file field. "
+                "Please explicitly use the filename parameter or pass a BytesIO object."
+            )
+            if filename is None and content_transfer_encoding is None:
+                warnings.warn(msg, DeprecationWarning)
+                filename = name
+
+        type_options: MultiDict[str] = MultiDict({"name": name})
+        if filename is not None and not isinstance(filename, str):
+            raise TypeError("filename must be an instance of str. Got: %s" % filename)
+        if filename is None and isinstance(value, io.IOBase):
+            filename = guess_filename(value, name)
+        if filename is not None:
+            type_options["filename"] = filename
+            self._is_multipart = True
+
+        headers = {}
+        if content_type is not None:
+            if not isinstance(content_type, str):
+                raise TypeError(
+                    "content_type must be an instance of str. Got: %s" % content_type
+                )
+            headers[hdrs.CONTENT_TYPE] = content_type
+            self._is_multipart = True
+        if content_transfer_encoding is not None:
+            if not isinstance(content_transfer_encoding, str):
+                raise TypeError(
+                    "content_transfer_encoding must be an instance"
+                    " of str. Got: %s" % content_transfer_encoding
+                )
+            msg = (
+                "content_transfer_encoding is deprecated. "
+                "To maintain compatibility with v4 please pass a BytesPayload."
+            )
+            warnings.warn(msg, DeprecationWarning)
+            self._is_multipart = True
+
+        self._fields.append((type_options, headers, value))
+
+    def add_fields(self, *fields: Any) -> None:
+        to_add = list(fields)
+
+        while to_add:
+            rec = to_add.pop(0)
+
+            if isinstance(rec, io.IOBase):
+                k = guess_filename(rec, "unknown")
+                self.add_field(k, rec)  # type: ignore[arg-type]
+
+            elif isinstance(rec, (MultiDictProxy, MultiDict)):
+                to_add.extend(rec.items())
+
+            elif isinstance(rec, (list, tuple)) and len(rec) == 2:
+                k, fp = rec
+                self.add_field(k, fp)  # type: ignore[arg-type]
+
+            else:
+                raise TypeError(
+                    "Only io.IOBase, multidict and (name, file) "
+                    "pairs allowed, use .add_field() for passing "
+                    "more complex parameters, got {!r}".format(rec)
+                )
+
+    def _gen_form_urlencoded(self) -> payload.BytesPayload:
+        # form data (x-www-form-urlencoded)
+        data = []
+        for type_options, _, value in self._fields:
+            data.append((type_options["name"], value))
+
+        charset = self._charset if self._charset is not None else "utf-8"
+
+        if charset == "utf-8":
+            content_type = "application/x-www-form-urlencoded"
+        else:
+            content_type = "application/x-www-form-urlencoded; charset=%s" % charset
+
+        return payload.BytesPayload(
+            urlencode(data, doseq=True, encoding=charset).encode(),
+            content_type=content_type,
+        )
+
+    def _gen_form_data(self) -> multipart.MultipartWriter:
+        """Encode a list of fields using the multipart/form-data MIME format"""
+        if self._is_processed:
+            raise RuntimeError("Form data has been processed already")
+        for dispparams, headers, value in self._fields:
+            try:
+                if hdrs.CONTENT_TYPE in headers:
+                    part = payload.get_payload(
+                        value,
+                        content_type=headers[hdrs.CONTENT_TYPE],
+                        headers=headers,
+                        encoding=self._charset,
+                    )
+                else:
+                    part = payload.get_payload(
+                        value, headers=headers, encoding=self._charset
+                    )
+            except Exception as exc:
+                raise TypeError(
+                    "Can not serialize value type: %r\n "
+                    "headers: %r\n value: %r" % (type(value), headers, value)
+                ) from exc
+
+            if dispparams:
+                part.set_content_disposition(
+                    "form-data", quote_fields=self._quote_fields, **dispparams
+                )
+                # FIXME cgi.FieldStorage doesn't likes body parts with
+                # Content-Length which were sent via chunked transfer encoding
+                assert part.headers is not None
+                part.headers.popall(hdrs.CONTENT_LENGTH, None)
+
+            self._writer.append_payload(part)
+
+        self._is_processed = True
+        return self._writer
+
+    def __call__(self) -> Payload:
+        if self._is_multipart:
+            return self._gen_form_data()
+        else:
+            return self._gen_form_urlencoded()