aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiohttp/formdata.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/formdata.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/formdata.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/formdata.py b/.venv/lib/python3.12/site-packages/aiohttp/formdata.py
new file mode 100644
index 00000000..73056f4b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/formdata.py
@@ -0,0 +1,182 @@
+import io
+import warnings
+from typing import Any, Iterable, List, Optional
+from urllib.parse import urlencode
+
+from multidict import MultiDict, MultiDictProxy
+
+from . import hdrs, multipart, payload
+from .helpers import guess_filename
+from .payload import Payload
+
+__all__ = ("FormData",)
+
+
+class FormData:
+ """Helper class for form body generation.
+
+ Supports multipart/form-data and application/x-www-form-urlencoded.
+ """
+
+ def __init__(
+ self,
+ fields: Iterable[Any] = (),
+ quote_fields: bool = True,
+ charset: Optional[str] = None,
+ *,
+ default_to_multipart: bool = False,
+ ) -> None:
+ self._writer = multipart.MultipartWriter("form-data")
+ self._fields: List[Any] = []
+ self._is_multipart = default_to_multipart
+ self._is_processed = False
+ self._quote_fields = quote_fields
+ self._charset = charset
+
+ if isinstance(fields, dict):
+ fields = list(fields.items())
+ elif not isinstance(fields, (list, tuple)):
+ fields = (fields,)
+ self.add_fields(*fields)
+
+ @property
+ def is_multipart(self) -> bool:
+ return self._is_multipart
+
+ def add_field(
+ self,
+ name: str,
+ value: Any,
+ *,
+ content_type: Optional[str] = None,
+ filename: Optional[str] = None,
+ content_transfer_encoding: Optional[str] = None,
+ ) -> None:
+
+ if isinstance(value, io.IOBase):
+ self._is_multipart = True
+ elif isinstance(value, (bytes, bytearray, memoryview)):
+ msg = (
+ "In v4, passing bytes will no longer create a file field. "
+ "Please explicitly use the filename parameter or pass a BytesIO object."
+ )
+ if filename is None and content_transfer_encoding is None:
+ warnings.warn(msg, DeprecationWarning)
+ filename = name
+
+ type_options: MultiDict[str] = MultiDict({"name": name})
+ if filename is not None and not isinstance(filename, str):
+ raise TypeError("filename must be an instance of str. Got: %s" % filename)
+ if filename is None and isinstance(value, io.IOBase):
+ filename = guess_filename(value, name)
+ if filename is not None:
+ type_options["filename"] = filename
+ self._is_multipart = True
+
+ headers = {}
+ if content_type is not None:
+ if not isinstance(content_type, str):
+ raise TypeError(
+ "content_type must be an instance of str. Got: %s" % content_type
+ )
+ headers[hdrs.CONTENT_TYPE] = content_type
+ self._is_multipart = True
+ if content_transfer_encoding is not None:
+ if not isinstance(content_transfer_encoding, str):
+ raise TypeError(
+ "content_transfer_encoding must be an instance"
+ " of str. Got: %s" % content_transfer_encoding
+ )
+ msg = (
+ "content_transfer_encoding is deprecated. "
+ "To maintain compatibility with v4 please pass a BytesPayload."
+ )
+ warnings.warn(msg, DeprecationWarning)
+ self._is_multipart = True
+
+ self._fields.append((type_options, headers, value))
+
+ def add_fields(self, *fields: Any) -> None:
+ to_add = list(fields)
+
+ while to_add:
+ rec = to_add.pop(0)
+
+ if isinstance(rec, io.IOBase):
+ k = guess_filename(rec, "unknown")
+ self.add_field(k, rec) # type: ignore[arg-type]
+
+ elif isinstance(rec, (MultiDictProxy, MultiDict)):
+ to_add.extend(rec.items())
+
+ elif isinstance(rec, (list, tuple)) and len(rec) == 2:
+ k, fp = rec
+ self.add_field(k, fp) # type: ignore[arg-type]
+
+ else:
+ raise TypeError(
+ "Only io.IOBase, multidict and (name, file) "
+ "pairs allowed, use .add_field() for passing "
+ "more complex parameters, got {!r}".format(rec)
+ )
+
+ def _gen_form_urlencoded(self) -> payload.BytesPayload:
+ # form data (x-www-form-urlencoded)
+ data = []
+ for type_options, _, value in self._fields:
+ data.append((type_options["name"], value))
+
+ charset = self._charset if self._charset is not None else "utf-8"
+
+ if charset == "utf-8":
+ content_type = "application/x-www-form-urlencoded"
+ else:
+ content_type = "application/x-www-form-urlencoded; charset=%s" % charset
+
+ return payload.BytesPayload(
+ urlencode(data, doseq=True, encoding=charset).encode(),
+ content_type=content_type,
+ )
+
+ def _gen_form_data(self) -> multipart.MultipartWriter:
+ """Encode a list of fields using the multipart/form-data MIME format"""
+ if self._is_processed:
+ raise RuntimeError("Form data has been processed already")
+ for dispparams, headers, value in self._fields:
+ try:
+ if hdrs.CONTENT_TYPE in headers:
+ part = payload.get_payload(
+ value,
+ content_type=headers[hdrs.CONTENT_TYPE],
+ headers=headers,
+ encoding=self._charset,
+ )
+ else:
+ part = payload.get_payload(
+ value, headers=headers, encoding=self._charset
+ )
+ except Exception as exc:
+ raise TypeError(
+ "Can not serialize value type: %r\n "
+ "headers: %r\n value: %r" % (type(value), headers, value)
+ ) from exc
+
+ if dispparams:
+ part.set_content_disposition(
+ "form-data", quote_fields=self._quote_fields, **dispparams
+ )
+ # FIXME cgi.FieldStorage doesn't likes body parts with
+ # Content-Length which were sent via chunked transfer encoding
+ assert part.headers is not None
+ part.headers.popall(hdrs.CONTENT_LENGTH, None)
+
+ self._writer.append_payload(part)
+
+ self._is_processed = True
+ return self._writer
+
+ def __call__(self) -> Payload:
+ if self._is_multipart:
+ return self._gen_form_data()
+ else:
+ return self._gen_form_urlencoded()