aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiohttp/http_parser.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/http_parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/http_parser.py1046
1 files changed, 1046 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/http_parser.py b/.venv/lib/python3.12/site-packages/aiohttp/http_parser.py
new file mode 100644
index 00000000..1b8b5b4d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/http_parser.py
@@ -0,0 +1,1046 @@
+import abc
+import asyncio
+import re
+import string
+from contextlib import suppress
+from enum import IntEnum
+from typing import (
+ Any,
+ ClassVar,
+ Final,
+ Generic,
+ List,
+ Literal,
+ NamedTuple,
+ Optional,
+ Pattern,
+ Set,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+)
+
+from multidict import CIMultiDict, CIMultiDictProxy, istr
+from yarl import URL
+
+from . import hdrs
+from .base_protocol import BaseProtocol
+from .compression_utils import HAS_BROTLI, BrotliDecompressor, ZLibDecompressor
+from .helpers import (
+ _EXC_SENTINEL,
+ DEBUG,
+ EMPTY_BODY_METHODS,
+ EMPTY_BODY_STATUS_CODES,
+ NO_EXTENSIONS,
+ BaseTimerContext,
+ set_exception,
+)
+from .http_exceptions import (
+ BadHttpMessage,
+ BadHttpMethod,
+ BadStatusLine,
+ ContentEncodingError,
+ ContentLengthError,
+ InvalidHeader,
+ InvalidURLError,
+ LineTooLong,
+ TransferEncodingError,
+)
+from .http_writer import HttpVersion, HttpVersion10
+from .streams import EMPTY_PAYLOAD, StreamReader
+from .typedefs import RawHeaders
+
+__all__ = (
+ "HeadersParser",
+ "HttpParser",
+ "HttpRequestParser",
+ "HttpResponseParser",
+ "RawRequestMessage",
+ "RawResponseMessage",
+)
+
+_SEP = Literal[b"\r\n", b"\n"]
+
+ASCIISET: Final[Set[str]] = set(string.printable)
+
+# See https://www.rfc-editor.org/rfc/rfc9110.html#name-overview
+# and https://www.rfc-editor.org/rfc/rfc9110.html#name-tokens
+#
+# method = token
+# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" / "+" / "-" / "." /
+# "^" / "_" / "`" / "|" / "~" / DIGIT / ALPHA
+# token = 1*tchar
+_TCHAR_SPECIALS: Final[str] = re.escape("!#$%&'*+-.^_`|~")
+TOKENRE: Final[Pattern[str]] = re.compile(f"[0-9A-Za-z{_TCHAR_SPECIALS}]+")
+VERSRE: Final[Pattern[str]] = re.compile(r"HTTP/(\d)\.(\d)", re.ASCII)
+DIGITS: Final[Pattern[str]] = re.compile(r"\d+", re.ASCII)
+HEXDIGITS: Final[Pattern[bytes]] = re.compile(rb"[0-9a-fA-F]+")
+
+
+class RawRequestMessage(NamedTuple):
+ method: str
+ path: str
+ version: HttpVersion
+ headers: "CIMultiDictProxy[str]"
+ raw_headers: RawHeaders
+ should_close: bool
+ compression: Optional[str]
+ upgrade: bool
+ chunked: bool
+ url: URL
+
+
+class RawResponseMessage(NamedTuple):
+ version: HttpVersion
+ code: int
+ reason: str
+ headers: CIMultiDictProxy[str]
+ raw_headers: RawHeaders
+ should_close: bool
+ compression: Optional[str]
+ upgrade: bool
+ chunked: bool
+
+
+_MsgT = TypeVar("_MsgT", RawRequestMessage, RawResponseMessage)
+
+
+class ParseState(IntEnum):
+
+ PARSE_NONE = 0
+ PARSE_LENGTH = 1
+ PARSE_CHUNKED = 2
+ PARSE_UNTIL_EOF = 3
+
+
+class ChunkState(IntEnum):
+ PARSE_CHUNKED_SIZE = 0
+ PARSE_CHUNKED_CHUNK = 1
+ PARSE_CHUNKED_CHUNK_EOF = 2
+ PARSE_MAYBE_TRAILERS = 3
+ PARSE_TRAILERS = 4
+
+
+class HeadersParser:
+ def __init__(
+ self,
+ max_line_size: int = 8190,
+ max_headers: int = 32768,
+ max_field_size: int = 8190,
+ lax: bool = False,
+ ) -> None:
+ self.max_line_size = max_line_size
+ self.max_headers = max_headers
+ self.max_field_size = max_field_size
+ self._lax = lax
+
+ def parse_headers(
+ self, lines: List[bytes]
+ ) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
+ headers: CIMultiDict[str] = CIMultiDict()
+ # note: "raw" does not mean inclusion of OWS before/after the field value
+ raw_headers = []
+
+ lines_idx = 1
+ line = lines[1]
+ line_count = len(lines)
+
+ while line:
+ # Parse initial header name : value pair.
+ try:
+ bname, bvalue = line.split(b":", 1)
+ except ValueError:
+ raise InvalidHeader(line) from None
+
+ if len(bname) == 0:
+ raise InvalidHeader(bname)
+
+ # https://www.rfc-editor.org/rfc/rfc9112.html#section-5.1-2
+ if {bname[0], bname[-1]} & {32, 9}: # {" ", "\t"}
+ raise InvalidHeader(line)
+
+ bvalue = bvalue.lstrip(b" \t")
+ if len(bname) > self.max_field_size:
+ raise LineTooLong(
+ "request header name {}".format(
+ bname.decode("utf8", "backslashreplace")
+ ),
+ str(self.max_field_size),
+ str(len(bname)),
+ )
+ name = bname.decode("utf-8", "surrogateescape")
+ if not TOKENRE.fullmatch(name):
+ raise InvalidHeader(bname)
+
+ header_length = len(bvalue)
+
+ # next line
+ lines_idx += 1
+ line = lines[lines_idx]
+
+ # consume continuation lines
+ continuation = self._lax and line and line[0] in (32, 9) # (' ', '\t')
+
+ # Deprecated: https://www.rfc-editor.org/rfc/rfc9112.html#name-obsolete-line-folding
+ if continuation:
+ bvalue_lst = [bvalue]
+ while continuation:
+ header_length += len(line)
+ if header_length > self.max_field_size:
+ raise LineTooLong(
+ "request header field {}".format(
+ bname.decode("utf8", "backslashreplace")
+ ),
+ str(self.max_field_size),
+ str(header_length),
+ )
+ bvalue_lst.append(line)
+
+ # next line
+ lines_idx += 1
+ if lines_idx < line_count:
+ line = lines[lines_idx]
+ if line:
+ continuation = line[0] in (32, 9) # (' ', '\t')
+ else:
+ line = b""
+ break
+ bvalue = b"".join(bvalue_lst)
+ else:
+ if header_length > self.max_field_size:
+ raise LineTooLong(
+ "request header field {}".format(
+ bname.decode("utf8", "backslashreplace")
+ ),
+ str(self.max_field_size),
+ str(header_length),
+ )
+
+ bvalue = bvalue.strip(b" \t")
+ value = bvalue.decode("utf-8", "surrogateescape")
+
+ # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-5
+ if "\n" in value or "\r" in value or "\x00" in value:
+ raise InvalidHeader(bvalue)
+
+ headers.add(name, value)
+ raw_headers.append((bname, bvalue))
+
+ return (CIMultiDictProxy(headers), tuple(raw_headers))
+
+
+def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool:
+ """Check if the upgrade header is supported."""
+ return headers.get(hdrs.UPGRADE, "").lower() in {"tcp", "websocket"}
+
+
+class HttpParser(abc.ABC, Generic[_MsgT]):
+ lax: ClassVar[bool] = False
+
+ def __init__(
+ self,
+ protocol: Optional[BaseProtocol] = None,
+ loop: Optional[asyncio.AbstractEventLoop] = None,
+ limit: int = 2**16,
+ max_line_size: int = 8190,
+ max_headers: int = 32768,
+ max_field_size: int = 8190,
+ timer: Optional[BaseTimerContext] = None,
+ code: Optional[int] = None,
+ method: Optional[str] = None,
+ payload_exception: Optional[Type[BaseException]] = None,
+ response_with_body: bool = True,
+ read_until_eof: bool = False,
+ auto_decompress: bool = True,
+ ) -> None:
+ self.protocol = protocol
+ self.loop = loop
+ self.max_line_size = max_line_size
+ self.max_headers = max_headers
+ self.max_field_size = max_field_size
+ self.timer = timer
+ self.code = code
+ self.method = method
+ self.payload_exception = payload_exception
+ self.response_with_body = response_with_body
+ self.read_until_eof = read_until_eof
+
+ self._lines: List[bytes] = []
+ self._tail = b""
+ self._upgraded = False
+ self._payload = None
+ self._payload_parser: Optional[HttpPayloadParser] = None
+ self._auto_decompress = auto_decompress
+ self._limit = limit
+ self._headers_parser = HeadersParser(
+ max_line_size, max_headers, max_field_size, self.lax
+ )
+
+ @abc.abstractmethod
+ def parse_message(self, lines: List[bytes]) -> _MsgT: ...
+
+ @abc.abstractmethod
+ def _is_chunked_te(self, te: str) -> bool: ...
+
+ def feed_eof(self) -> Optional[_MsgT]:
+ if self._payload_parser is not None:
+ self._payload_parser.feed_eof()
+ self._payload_parser = None
+ else:
+ # try to extract partial message
+ if self._tail:
+ self._lines.append(self._tail)
+
+ if self._lines:
+ if self._lines[-1] != "\r\n":
+ self._lines.append(b"")
+ with suppress(Exception):
+ return self.parse_message(self._lines)
+ return None
+
+ def feed_data(
+ self,
+ data: bytes,
+ SEP: _SEP = b"\r\n",
+ EMPTY: bytes = b"",
+ CONTENT_LENGTH: istr = hdrs.CONTENT_LENGTH,
+ METH_CONNECT: str = hdrs.METH_CONNECT,
+ SEC_WEBSOCKET_KEY1: istr = hdrs.SEC_WEBSOCKET_KEY1,
+ ) -> Tuple[List[Tuple[_MsgT, StreamReader]], bool, bytes]:
+
+ messages = []
+
+ if self._tail:
+ data, self._tail = self._tail + data, b""
+
+ data_len = len(data)
+ start_pos = 0
+ loop = self.loop
+
+ should_close = False
+ while start_pos < data_len:
+
+ # read HTTP message (request/response line + headers), \r\n\r\n
+ # and split by lines
+ if self._payload_parser is None and not self._upgraded:
+ pos = data.find(SEP, start_pos)
+ # consume \r\n
+ if pos == start_pos and not self._lines:
+ start_pos = pos + len(SEP)
+ continue
+
+ if pos >= start_pos:
+ if should_close:
+ raise BadHttpMessage("Data after `Connection: close`")
+
+ # line found
+ line = data[start_pos:pos]
+ if SEP == b"\n": # For lax response parsing
+ line = line.rstrip(b"\r")
+ self._lines.append(line)
+ start_pos = pos + len(SEP)
+
+ # \r\n\r\n found
+ if self._lines[-1] == EMPTY:
+ try:
+ msg: _MsgT = self.parse_message(self._lines)
+ finally:
+ self._lines.clear()
+
+ def get_content_length() -> Optional[int]:
+ # payload length
+ length_hdr = msg.headers.get(CONTENT_LENGTH)
+ if length_hdr is None:
+ return None
+
+ # Shouldn't allow +/- or other number formats.
+ # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2
+ # msg.headers is already stripped of leading/trailing wsp
+ if not DIGITS.fullmatch(length_hdr):
+ raise InvalidHeader(CONTENT_LENGTH)
+
+ return int(length_hdr)
+
+ length = get_content_length()
+ # do not support old websocket spec
+ if SEC_WEBSOCKET_KEY1 in msg.headers:
+ raise InvalidHeader(SEC_WEBSOCKET_KEY1)
+
+ self._upgraded = msg.upgrade and _is_supported_upgrade(
+ msg.headers
+ )
+
+ method = getattr(msg, "method", self.method)
+ # code is only present on responses
+ code = getattr(msg, "code", 0)
+
+ assert self.protocol is not None
+ # calculate payload
+ empty_body = code in EMPTY_BODY_STATUS_CODES or bool(
+ method and method in EMPTY_BODY_METHODS
+ )
+ if not empty_body and (
+ ((length is not None and length > 0) or msg.chunked)
+ and not self._upgraded
+ ):
+ payload = StreamReader(
+ self.protocol,
+ timer=self.timer,
+ loop=loop,
+ limit=self._limit,
+ )
+ payload_parser = HttpPayloadParser(
+ payload,
+ length=length,
+ chunked=msg.chunked,
+ method=method,
+ compression=msg.compression,
+ code=self.code,
+ response_with_body=self.response_with_body,
+ auto_decompress=self._auto_decompress,
+ lax=self.lax,
+ )
+ if not payload_parser.done:
+ self._payload_parser = payload_parser
+ elif method == METH_CONNECT:
+ assert isinstance(msg, RawRequestMessage)
+ payload = StreamReader(
+ self.protocol,
+ timer=self.timer,
+ loop=loop,
+ limit=self._limit,
+ )
+ self._upgraded = True
+ self._payload_parser = HttpPayloadParser(
+ payload,
+ method=msg.method,
+ compression=msg.compression,
+ auto_decompress=self._auto_decompress,
+ lax=self.lax,
+ )
+ elif not empty_body and length is None and self.read_until_eof:
+ payload = StreamReader(
+ self.protocol,
+ timer=self.timer,
+ loop=loop,
+ limit=self._limit,
+ )
+ payload_parser = HttpPayloadParser(
+ payload,
+ length=length,
+ chunked=msg.chunked,
+ method=method,
+ compression=msg.compression,
+ code=self.code,
+ response_with_body=self.response_with_body,
+ auto_decompress=self._auto_decompress,
+ lax=self.lax,
+ )
+ if not payload_parser.done:
+ self._payload_parser = payload_parser
+ else:
+ payload = EMPTY_PAYLOAD
+
+ messages.append((msg, payload))
+ should_close = msg.should_close
+ else:
+ self._tail = data[start_pos:]
+ data = EMPTY
+ break
+
+ # no parser, just store
+ elif self._payload_parser is None and self._upgraded:
+ assert not self._lines
+ break
+
+ # feed payload
+ elif data and start_pos < data_len:
+ assert not self._lines
+ assert self._payload_parser is not None
+ try:
+ eof, data = self._payload_parser.feed_data(data[start_pos:], SEP)
+ except BaseException as underlying_exc:
+ reraised_exc = underlying_exc
+ if self.payload_exception is not None:
+ reraised_exc = self.payload_exception(str(underlying_exc))
+
+ set_exception(
+ self._payload_parser.payload,
+ reraised_exc,
+ underlying_exc,
+ )
+
+ eof = True
+ data = b""
+
+ if eof:
+ start_pos = 0
+ data_len = len(data)
+ self._payload_parser = None
+ continue
+ else:
+ break
+
+ if data and start_pos < data_len:
+ data = data[start_pos:]
+ else:
+ data = EMPTY
+
+ return messages, self._upgraded, data
+
+ def parse_headers(
+ self, lines: List[bytes]
+ ) -> Tuple[
+ "CIMultiDictProxy[str]", RawHeaders, Optional[bool], Optional[str], bool, bool
+ ]:
+ """Parses RFC 5322 headers from a stream.
+
+ Line continuations are supported. Returns list of header name
+ and value pairs. Header name is in upper case.
+ """
+ headers, raw_headers = self._headers_parser.parse_headers(lines)
+ close_conn = None
+ encoding = None
+ upgrade = False
+ chunked = False
+
+ # https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-6
+ # https://www.rfc-editor.org/rfc/rfc9110.html#name-collected-abnf
+ singletons = (
+ hdrs.CONTENT_LENGTH,
+ hdrs.CONTENT_LOCATION,
+ hdrs.CONTENT_RANGE,
+ hdrs.CONTENT_TYPE,
+ hdrs.ETAG,
+ hdrs.HOST,
+ hdrs.MAX_FORWARDS,
+ hdrs.SERVER,
+ hdrs.TRANSFER_ENCODING,
+ hdrs.USER_AGENT,
+ )
+ bad_hdr = next((h for h in singletons if len(headers.getall(h, ())) > 1), None)
+ if bad_hdr is not None:
+ raise BadHttpMessage(f"Duplicate '{bad_hdr}' header found.")
+
+ # keep-alive
+ conn = headers.get(hdrs.CONNECTION)
+ if conn:
+ v = conn.lower()
+ if v == "close":
+ close_conn = True
+ elif v == "keep-alive":
+ close_conn = False
+ # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols
+ elif v == "upgrade" and headers.get(hdrs.UPGRADE):
+ upgrade = True
+
+ # encoding
+ enc = headers.get(hdrs.CONTENT_ENCODING)
+ if enc:
+ enc = enc.lower()
+ if enc in ("gzip", "deflate", "br"):
+ encoding = enc
+
+ # chunking
+ te = headers.get(hdrs.TRANSFER_ENCODING)
+ if te is not None:
+ if self._is_chunked_te(te):
+ chunked = True
+
+ if hdrs.CONTENT_LENGTH in headers:
+ raise BadHttpMessage(
+ "Transfer-Encoding can't be present with Content-Length",
+ )
+
+ return (headers, raw_headers, close_conn, encoding, upgrade, chunked)
+
+ def set_upgraded(self, val: bool) -> None:
+ """Set connection upgraded (to websocket) mode.
+
+ :param bool val: new state.
+ """
+ self._upgraded = val
+
+
+class HttpRequestParser(HttpParser[RawRequestMessage]):
+ """Read request status line.
+
+ Exception .http_exceptions.BadStatusLine
+ could be raised in case of any errors in status line.
+ Returns RawRequestMessage.
+ """
+
+ def parse_message(self, lines: List[bytes]) -> RawRequestMessage:
+ # request line
+ line = lines[0].decode("utf-8", "surrogateescape")
+ try:
+ method, path, version = line.split(" ", maxsplit=2)
+ except ValueError:
+ raise BadHttpMethod(line) from None
+
+ if len(path) > self.max_line_size:
+ raise LineTooLong(
+ "Status line is too long", str(self.max_line_size), str(len(path))
+ )
+
+ # method
+ if not TOKENRE.fullmatch(method):
+ raise BadHttpMethod(method)
+
+ # version
+ match = VERSRE.fullmatch(version)
+ if match is None:
+ raise BadStatusLine(line)
+ version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
+
+ if method == "CONNECT":
+ # authority-form,
+ # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.3
+ url = URL.build(authority=path, encoded=True)
+ elif path.startswith("/"):
+ # origin-form,
+ # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.1
+ path_part, _hash_separator, url_fragment = path.partition("#")
+ path_part, _question_mark_separator, qs_part = path_part.partition("?")
+
+ # NOTE: `yarl.URL.build()` is used to mimic what the Cython-based
+ # NOTE: parser does, otherwise it results into the same
+ # NOTE: HTTP Request-Line input producing different
+ # NOTE: `yarl.URL()` objects
+ url = URL.build(
+ path=path_part,
+ query_string=qs_part,
+ fragment=url_fragment,
+ encoded=True,
+ )
+ elif path == "*" and method == "OPTIONS":
+ # asterisk-form,
+ url = URL(path, encoded=True)
+ else:
+ # absolute-form for proxy maybe,
+ # https://datatracker.ietf.org/doc/html/rfc7230#section-5.3.2
+ url = URL(path, encoded=True)
+ if url.scheme == "":
+ # not absolute-form
+ raise InvalidURLError(
+ path.encode(errors="surrogateescape").decode("latin1")
+ )
+
+ # read headers
+ (
+ headers,
+ raw_headers,
+ close,
+ compression,
+ upgrade,
+ chunked,
+ ) = self.parse_headers(lines)
+
+ if close is None: # then the headers weren't set in the request
+ if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
+ close = True
+ else: # HTTP 1.1 must ask to close.
+ close = False
+
+ return RawRequestMessage(
+ method,
+ path,
+ version_o,
+ headers,
+ raw_headers,
+ close,
+ compression,
+ upgrade,
+ chunked,
+ url,
+ )
+
+ def _is_chunked_te(self, te: str) -> bool:
+ if te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked":
+ return True
+ # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.3
+ raise BadHttpMessage("Request has invalid `Transfer-Encoding`")
+
+
+class HttpResponseParser(HttpParser[RawResponseMessage]):
+ """Read response status line and headers.
+
+ BadStatusLine could be raised in case of any errors in status line.
+ Returns RawResponseMessage.
+ """
+
+ # Lax mode should only be enabled on response parser.
+ lax = not DEBUG
+
+ def feed_data(
+ self,
+ data: bytes,
+ SEP: Optional[_SEP] = None,
+ *args: Any,
+ **kwargs: Any,
+ ) -> Tuple[List[Tuple[RawResponseMessage, StreamReader]], bool, bytes]:
+ if SEP is None:
+ SEP = b"\r\n" if DEBUG else b"\n"
+ return super().feed_data(data, SEP, *args, **kwargs)
+
+ def parse_message(self, lines: List[bytes]) -> RawResponseMessage:
+ line = lines[0].decode("utf-8", "surrogateescape")
+ try:
+ version, status = line.split(maxsplit=1)
+ except ValueError:
+ raise BadStatusLine(line) from None
+
+ try:
+ status, reason = status.split(maxsplit=1)
+ except ValueError:
+ status = status.strip()
+ reason = ""
+
+ if len(reason) > self.max_line_size:
+ raise LineTooLong(
+ "Status line is too long", str(self.max_line_size), str(len(reason))
+ )
+
+ # version
+ match = VERSRE.fullmatch(version)
+ if match is None:
+ raise BadStatusLine(line)
+ version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
+
+ # The status code is a three-digit ASCII number, no padding
+ if len(status) != 3 or not DIGITS.fullmatch(status):
+ raise BadStatusLine(line)
+ status_i = int(status)
+
+ # read headers
+ (
+ headers,
+ raw_headers,
+ close,
+ compression,
+ upgrade,
+ chunked,
+ ) = self.parse_headers(lines)
+
+ if close is None:
+ if version_o <= HttpVersion10:
+ close = True
+ # https://www.rfc-editor.org/rfc/rfc9112.html#name-message-body-length
+ elif 100 <= status_i < 200 or status_i in {204, 304}:
+ close = False
+ elif hdrs.CONTENT_LENGTH in headers or hdrs.TRANSFER_ENCODING in headers:
+ close = False
+ else:
+ # https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-2.8
+ close = True
+
+ return RawResponseMessage(
+ version_o,
+ status_i,
+ reason.strip(),
+ headers,
+ raw_headers,
+ close,
+ compression,
+ upgrade,
+ chunked,
+ )
+
+ def _is_chunked_te(self, te: str) -> bool:
+ # https://www.rfc-editor.org/rfc/rfc9112#section-6.3-2.4.2
+ return te.rsplit(",", maxsplit=1)[-1].strip(" \t").lower() == "chunked"
+
+
+class HttpPayloadParser:
+ def __init__(
+ self,
+ payload: StreamReader,
+ length: Optional[int] = None,
+ chunked: bool = False,
+ compression: Optional[str] = None,
+ code: Optional[int] = None,
+ method: Optional[str] = None,
+ response_with_body: bool = True,
+ auto_decompress: bool = True,
+ lax: bool = False,
+ ) -> None:
+ self._length = 0
+ self._type = ParseState.PARSE_UNTIL_EOF
+ self._chunk = ChunkState.PARSE_CHUNKED_SIZE
+ self._chunk_size = 0
+ self._chunk_tail = b""
+ self._auto_decompress = auto_decompress
+ self._lax = lax
+ self.done = False
+
+ # payload decompression wrapper
+ if response_with_body and compression and self._auto_decompress:
+ real_payload: Union[StreamReader, DeflateBuffer] = DeflateBuffer(
+ payload, compression
+ )
+ else:
+ real_payload = payload
+
+ # payload parser
+ if not response_with_body:
+ # don't parse payload if it's not expected to be received
+ self._type = ParseState.PARSE_NONE
+ real_payload.feed_eof()
+ self.done = True
+ elif chunked:
+ self._type = ParseState.PARSE_CHUNKED
+ elif length is not None:
+ self._type = ParseState.PARSE_LENGTH
+ self._length = length
+ if self._length == 0:
+ real_payload.feed_eof()
+ self.done = True
+
+ self.payload = real_payload
+
+ def feed_eof(self) -> None:
+ if self._type == ParseState.PARSE_UNTIL_EOF:
+ self.payload.feed_eof()
+ elif self._type == ParseState.PARSE_LENGTH:
+ raise ContentLengthError(
+ "Not enough data for satisfy content length header."
+ )
+ elif self._type == ParseState.PARSE_CHUNKED:
+ raise TransferEncodingError(
+ "Not enough data for satisfy transfer length header."
+ )
+
+ def feed_data(
+ self, chunk: bytes, SEP: _SEP = b"\r\n", CHUNK_EXT: bytes = b";"
+ ) -> Tuple[bool, bytes]:
+ # Read specified amount of bytes
+ if self._type == ParseState.PARSE_LENGTH:
+ required = self._length
+ chunk_len = len(chunk)
+
+ if required >= chunk_len:
+ self._length = required - chunk_len
+ self.payload.feed_data(chunk, chunk_len)
+ if self._length == 0:
+ self.payload.feed_eof()
+ return True, b""
+ else:
+ self._length = 0
+ self.payload.feed_data(chunk[:required], required)
+ self.payload.feed_eof()
+ return True, chunk[required:]
+
+ # Chunked transfer encoding parser
+ elif self._type == ParseState.PARSE_CHUNKED:
+ if self._chunk_tail:
+ chunk = self._chunk_tail + chunk
+ self._chunk_tail = b""
+
+ while chunk:
+
+ # read next chunk size
+ if self._chunk == ChunkState.PARSE_CHUNKED_SIZE:
+ pos = chunk.find(SEP)
+ if pos >= 0:
+ i = chunk.find(CHUNK_EXT, 0, pos)
+ if i >= 0:
+ size_b = chunk[:i] # strip chunk-extensions
+ # Verify no LF in the chunk-extension
+ if b"\n" in (ext := chunk[i:pos]):
+ exc = BadHttpMessage(
+ f"Unexpected LF in chunk-extension: {ext!r}"
+ )
+ set_exception(self.payload, exc)
+ raise exc
+ else:
+ size_b = chunk[:pos]
+
+ if self._lax: # Allow whitespace in lax mode.
+ size_b = size_b.strip()
+
+ if not re.fullmatch(HEXDIGITS, size_b):
+ exc = TransferEncodingError(
+ chunk[:pos].decode("ascii", "surrogateescape")
+ )
+ set_exception(self.payload, exc)
+ raise exc
+ size = int(bytes(size_b), 16)
+
+ chunk = chunk[pos + len(SEP) :]
+ if size == 0: # eof marker
+ self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
+ if self._lax and chunk.startswith(b"\r"):
+ chunk = chunk[1:]
+ else:
+ self._chunk = ChunkState.PARSE_CHUNKED_CHUNK
+ self._chunk_size = size
+ self.payload.begin_http_chunk_receiving()
+ else:
+ self._chunk_tail = chunk
+ return False, b""
+
+ # read chunk and feed buffer
+ if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK:
+ required = self._chunk_size
+ chunk_len = len(chunk)
+
+ if required > chunk_len:
+ self._chunk_size = required - chunk_len
+ self.payload.feed_data(chunk, chunk_len)
+ return False, b""
+ else:
+ self._chunk_size = 0
+ self.payload.feed_data(chunk[:required], required)
+ chunk = chunk[required:]
+ self._chunk = ChunkState.PARSE_CHUNKED_CHUNK_EOF
+ self.payload.end_http_chunk_receiving()
+
+ # toss the CRLF at the end of the chunk
+ if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF:
+ if self._lax and chunk.startswith(b"\r"):
+ chunk = chunk[1:]
+ if chunk[: len(SEP)] == SEP:
+ chunk = chunk[len(SEP) :]
+ self._chunk = ChunkState.PARSE_CHUNKED_SIZE
+ else:
+ self._chunk_tail = chunk
+ return False, b""
+
+ # if stream does not contain trailer, after 0\r\n
+ # we should get another \r\n otherwise
+ # trailers needs to be skipped until \r\n\r\n
+ if self._chunk == ChunkState.PARSE_MAYBE_TRAILERS:
+ head = chunk[: len(SEP)]
+ if head == SEP:
+ # end of stream
+ self.payload.feed_eof()
+ return True, chunk[len(SEP) :]
+ # Both CR and LF, or only LF may not be received yet. It is
+ # expected that CRLF or LF will be shown at the very first
+ # byte next time, otherwise trailers should come. The last
+ # CRLF which marks the end of response might not be
+ # contained in the same TCP segment which delivered the
+ # size indicator.
+ if not head:
+ return False, b""
+ if head == SEP[:1]:
+ self._chunk_tail = head
+ return False, b""
+ self._chunk = ChunkState.PARSE_TRAILERS
+
+ # read and discard trailer up to the CRLF terminator
+ if self._chunk == ChunkState.PARSE_TRAILERS:
+ pos = chunk.find(SEP)
+ if pos >= 0:
+ chunk = chunk[pos + len(SEP) :]
+ self._chunk = ChunkState.PARSE_MAYBE_TRAILERS
+ else:
+ self._chunk_tail = chunk
+ return False, b""
+
+ # Read all bytes until eof
+ elif self._type == ParseState.PARSE_UNTIL_EOF:
+ self.payload.feed_data(chunk, len(chunk))
+
+ return False, b""
+
+
+class DeflateBuffer:
+ """DeflateStream decompress stream and feed data into specified stream."""
+
+ decompressor: Any
+
+ def __init__(self, out: StreamReader, encoding: Optional[str]) -> None:
+ self.out = out
+ self.size = 0
+ self.encoding = encoding
+ self._started_decoding = False
+
+ self.decompressor: Union[BrotliDecompressor, ZLibDecompressor]
+ if encoding == "br":
+ if not HAS_BROTLI: # pragma: no cover
+ raise ContentEncodingError(
+ "Can not decode content-encoding: brotli (br). "
+ "Please install `Brotli`"
+ )
+ self.decompressor = BrotliDecompressor()
+ else:
+ self.decompressor = ZLibDecompressor(encoding=encoding)
+
+ def set_exception(
+ self,
+ exc: BaseException,
+ exc_cause: BaseException = _EXC_SENTINEL,
+ ) -> None:
+ set_exception(self.out, exc, exc_cause)
+
+ def feed_data(self, chunk: bytes, size: int) -> None:
+ if not size:
+ return
+
+ self.size += size
+
+ # RFC1950
+ # bits 0..3 = CM = 0b1000 = 8 = "deflate"
+ # bits 4..7 = CINFO = 1..7 = windows size.
+ if (
+ not self._started_decoding
+ and self.encoding == "deflate"
+ and chunk[0] & 0xF != 8
+ ):
+ # Change the decoder to decompress incorrectly compressed data
+ # Actually we should issue a warning about non-RFC-compliant data.
+ self.decompressor = ZLibDecompressor(
+ encoding=self.encoding, suppress_deflate_header=True
+ )
+
+ try:
+ chunk = self.decompressor.decompress_sync(chunk)
+ except Exception:
+ raise ContentEncodingError(
+ "Can not decode content-encoding: %s" % self.encoding
+ )
+
+ self._started_decoding = True
+
+ if chunk:
+ self.out.feed_data(chunk, len(chunk))
+
+ def feed_eof(self) -> None:
+ chunk = self.decompressor.flush()
+
+ if chunk or self.size > 0:
+ self.out.feed_data(chunk, len(chunk))
+ if self.encoding == "deflate" and not self.decompressor.eof:
+ raise ContentEncodingError("deflate")
+
+ self.out.feed_eof()
+
+ def begin_http_chunk_receiving(self) -> None:
+ self.out.begin_http_chunk_receiving()
+
+ def end_http_chunk_receiving(self) -> None:
+ self.out.end_http_chunk_receiving()
+
+
+HttpRequestParserPy = HttpRequestParser
+HttpResponseParserPy = HttpResponseParser
+RawRequestMessagePy = RawRequestMessage
+RawResponseMessagePy = RawResponseMessage
+
+try:
+ if not NO_EXTENSIONS:
+ from ._http_parser import ( # type: ignore[import-not-found,no-redef]
+ HttpRequestParser,
+ HttpResponseParser,
+ RawRequestMessage,
+ RawResponseMessage,
+ )
+
+ HttpRequestParserC = HttpRequestParser
+ HttpResponseParserC = HttpResponseParser
+ RawRequestMessageC = RawRequestMessage
+ RawResponseMessageC = RawResponseMessage
+except ImportError: # pragma: no cover
+ pass