about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiohttp/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/helpers.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/helpers.py958
1 files changed, 958 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/helpers.py b/.venv/lib/python3.12/site-packages/aiohttp/helpers.py
new file mode 100644
index 00000000..ace4f0e9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/helpers.py
@@ -0,0 +1,958 @@
+"""Various helper functions"""
+
+import asyncio
+import base64
+import binascii
+import contextlib
+import datetime
+import enum
+import functools
+import inspect
+import netrc
+import os
+import platform
+import re
+import sys
+import time
+import weakref
+from collections import namedtuple
+from contextlib import suppress
+from email.parser import HeaderParser
+from email.utils import parsedate
+from math import ceil
+from pathlib import Path
+from types import MappingProxyType, TracebackType
+from typing import (
+    Any,
+    Callable,
+    ContextManager,
+    Dict,
+    Generator,
+    Generic,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    Optional,
+    Protocol,
+    Tuple,
+    Type,
+    TypeVar,
+    Union,
+    get_args,
+    overload,
+)
+from urllib.parse import quote
+from urllib.request import getproxies, proxy_bypass
+
+import attr
+from multidict import MultiDict, MultiDictProxy, MultiMapping
+from propcache.api import under_cached_property as reify
+from yarl import URL
+
+from . import hdrs
+from .log import client_logger
+
+if sys.version_info >= (3, 11):
+    import asyncio as async_timeout
+else:
+    import async_timeout
+
+__all__ = ("BasicAuth", "ChainMapProxy", "ETag", "reify")
+
+IS_MACOS = platform.system() == "Darwin"
+IS_WINDOWS = platform.system() == "Windows"
+
+PY_310 = sys.version_info >= (3, 10)
+PY_311 = sys.version_info >= (3, 11)
+
+
+_T = TypeVar("_T")
+_S = TypeVar("_S")
+
+_SENTINEL = enum.Enum("_SENTINEL", "sentinel")
+sentinel = _SENTINEL.sentinel
+
+NO_EXTENSIONS = bool(os.environ.get("AIOHTTP_NO_EXTENSIONS"))
+
+# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1
+EMPTY_BODY_STATUS_CODES = frozenset((204, 304, *range(100, 200)))
+# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.1
+# https://datatracker.ietf.org/doc/html/rfc9112#section-6.3-2.2
+EMPTY_BODY_METHODS = hdrs.METH_HEAD_ALL
+
+DEBUG = sys.flags.dev_mode or (
+    not sys.flags.ignore_environment and bool(os.environ.get("PYTHONASYNCIODEBUG"))
+)
+
+
+CHAR = {chr(i) for i in range(0, 128)}
+CTL = {chr(i) for i in range(0, 32)} | {
+    chr(127),
+}
+SEPARATORS = {
+    "(",
+    ")",
+    "<",
+    ">",
+    "@",
+    ",",
+    ";",
+    ":",
+    "\\",
+    '"',
+    "/",
+    "[",
+    "]",
+    "?",
+    "=",
+    "{",
+    "}",
+    " ",
+    chr(9),
+}
+TOKEN = CHAR ^ CTL ^ SEPARATORS
+
+
+class noop:
+    def __await__(self) -> Generator[None, None, None]:
+        yield
+
+
+class BasicAuth(namedtuple("BasicAuth", ["login", "password", "encoding"])):
+    """Http basic authentication helper."""
+
+    def __new__(
+        cls, login: str, password: str = "", encoding: str = "latin1"
+    ) -> "BasicAuth":
+        if login is None:
+            raise ValueError("None is not allowed as login value")
+
+        if password is None:
+            raise ValueError("None is not allowed as password value")
+
+        if ":" in login:
+            raise ValueError('A ":" is not allowed in login (RFC 1945#section-11.1)')
+
+        return super().__new__(cls, login, password, encoding)
+
+    @classmethod
+    def decode(cls, auth_header: str, encoding: str = "latin1") -> "BasicAuth":
+        """Create a BasicAuth object from an Authorization HTTP header."""
+        try:
+            auth_type, encoded_credentials = auth_header.split(" ", 1)
+        except ValueError:
+            raise ValueError("Could not parse authorization header.")
+
+        if auth_type.lower() != "basic":
+            raise ValueError("Unknown authorization method %s" % auth_type)
+
+        try:
+            decoded = base64.b64decode(
+                encoded_credentials.encode("ascii"), validate=True
+            ).decode(encoding)
+        except binascii.Error:
+            raise ValueError("Invalid base64 encoding.")
+
+        try:
+            # RFC 2617 HTTP Authentication
+            # https://www.ietf.org/rfc/rfc2617.txt
+            # the colon must be present, but the username and password may be
+            # otherwise blank.
+            username, password = decoded.split(":", 1)
+        except ValueError:
+            raise ValueError("Invalid credentials.")
+
+        return cls(username, password, encoding=encoding)
+
+    @classmethod
+    def from_url(cls, url: URL, *, encoding: str = "latin1") -> Optional["BasicAuth"]:
+        """Create BasicAuth from url."""
+        if not isinstance(url, URL):
+            raise TypeError("url should be yarl.URL instance")
+        # Check raw_user and raw_password first as yarl is likely
+        # to already have these values parsed from the netloc in the cache.
+        if url.raw_user is None and url.raw_password is None:
+            return None
+        return cls(url.user or "", url.password or "", encoding=encoding)
+
+    def encode(self) -> str:
+        """Encode credentials."""
+        creds = (f"{self.login}:{self.password}").encode(self.encoding)
+        return "Basic %s" % base64.b64encode(creds).decode(self.encoding)
+
+
+def strip_auth_from_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:
+    """Remove user and password from URL if present and return BasicAuth object."""
+    # Check raw_user and raw_password first as yarl is likely
+    # to already have these values parsed from the netloc in the cache.
+    if url.raw_user is None and url.raw_password is None:
+        return url, None
+    return url.with_user(None), BasicAuth(url.user or "", url.password or "")
+
+
+def netrc_from_env() -> Optional[netrc.netrc]:
+    """Load netrc from file.
+
+    Attempt to load it from the path specified by the env-var
+    NETRC or in the default location in the user's home directory.
+
+    Returns None if it couldn't be found or fails to parse.
+    """
+    netrc_env = os.environ.get("NETRC")
+
+    if netrc_env is not None:
+        netrc_path = Path(netrc_env)
+    else:
+        try:
+            home_dir = Path.home()
+        except RuntimeError as e:  # pragma: no cover
+            # if pathlib can't resolve home, it may raise a RuntimeError
+            client_logger.debug(
+                "Could not resolve home directory when "
+                "trying to look for .netrc file: %s",
+                e,
+            )
+            return None
+
+        netrc_path = home_dir / ("_netrc" if IS_WINDOWS else ".netrc")
+
+    try:
+        return netrc.netrc(str(netrc_path))
+    except netrc.NetrcParseError as e:
+        client_logger.warning("Could not parse .netrc file: %s", e)
+    except OSError as e:
+        netrc_exists = False
+        with contextlib.suppress(OSError):
+            netrc_exists = netrc_path.is_file()
+        # we couldn't read the file (doesn't exist, permissions, etc.)
+        if netrc_env or netrc_exists:
+            # only warn if the environment wanted us to load it,
+            # or it appears like the default file does actually exist
+            client_logger.warning("Could not read .netrc file: %s", e)
+
+    return None
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class ProxyInfo:
+    proxy: URL
+    proxy_auth: Optional[BasicAuth]
+
+
+def basicauth_from_netrc(netrc_obj: Optional[netrc.netrc], host: str) -> BasicAuth:
+    """
+    Return :py:class:`~aiohttp.BasicAuth` credentials for ``host`` from ``netrc_obj``.
+
+    :raises LookupError: if ``netrc_obj`` is :py:data:`None` or if no
+            entry is found for the ``host``.
+    """
+    if netrc_obj is None:
+        raise LookupError("No .netrc file found")
+    auth_from_netrc = netrc_obj.authenticators(host)
+
+    if auth_from_netrc is None:
+        raise LookupError(f"No entry for {host!s} found in the `.netrc` file.")
+    login, account, password = auth_from_netrc
+
+    # TODO(PY311): username = login or account
+    # Up to python 3.10, account could be None if not specified,
+    # and login will be empty string if not specified. From 3.11,
+    # login and account will be empty string if not specified.
+    username = login if (login or account is None) else account
+
+    # TODO(PY311): Remove this, as password will be empty string
+    # if not specified
+    if password is None:
+        password = ""
+
+    return BasicAuth(username, password)
+
+
+def proxies_from_env() -> Dict[str, ProxyInfo]:
+    proxy_urls = {
+        k: URL(v)
+        for k, v in getproxies().items()
+        if k in ("http", "https", "ws", "wss")
+    }
+    netrc_obj = netrc_from_env()
+    stripped = {k: strip_auth_from_url(v) for k, v in proxy_urls.items()}
+    ret = {}
+    for proto, val in stripped.items():
+        proxy, auth = val
+        if proxy.scheme in ("https", "wss"):
+            client_logger.warning(
+                "%s proxies %s are not supported, ignoring", proxy.scheme.upper(), proxy
+            )
+            continue
+        if netrc_obj and auth is None:
+            if proxy.host is not None:
+                try:
+                    auth = basicauth_from_netrc(netrc_obj, proxy.host)
+                except LookupError:
+                    auth = None
+        ret[proto] = ProxyInfo(proxy, auth)
+    return ret
+
+
+def get_env_proxy_for_url(url: URL) -> Tuple[URL, Optional[BasicAuth]]:
+    """Get a permitted proxy for the given URL from the env."""
+    if url.host is not None and proxy_bypass(url.host):
+        raise LookupError(f"Proxying is disallowed for `{url.host!r}`")
+
+    proxies_in_env = proxies_from_env()
+    try:
+        proxy_info = proxies_in_env[url.scheme]
+    except KeyError:
+        raise LookupError(f"No proxies found for `{url!s}` in the env")
+    else:
+        return proxy_info.proxy, proxy_info.proxy_auth
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class MimeType:
+    type: str
+    subtype: str
+    suffix: str
+    parameters: "MultiDictProxy[str]"
+
+
+@functools.lru_cache(maxsize=56)
+def parse_mimetype(mimetype: str) -> MimeType:
+    """Parses a MIME type into its components.
+
+    mimetype is a MIME type string.
+
+    Returns a MimeType object.
+
+    Example:
+
+    >>> parse_mimetype('text/html; charset=utf-8')
+    MimeType(type='text', subtype='html', suffix='',
+             parameters={'charset': 'utf-8'})
+
+    """
+    if not mimetype:
+        return MimeType(
+            type="", subtype="", suffix="", parameters=MultiDictProxy(MultiDict())
+        )
+
+    parts = mimetype.split(";")
+    params: MultiDict[str] = MultiDict()
+    for item in parts[1:]:
+        if not item:
+            continue
+        key, _, value = item.partition("=")
+        params.add(key.lower().strip(), value.strip(' "'))
+
+    fulltype = parts[0].strip().lower()
+    if fulltype == "*":
+        fulltype = "*/*"
+
+    mtype, _, stype = fulltype.partition("/")
+    stype, _, suffix = stype.partition("+")
+
+    return MimeType(
+        type=mtype, subtype=stype, suffix=suffix, parameters=MultiDictProxy(params)
+    )
+
+
+@functools.lru_cache(maxsize=56)
+def parse_content_type(raw: str) -> Tuple[str, MappingProxyType[str, str]]:
+    """Parse Content-Type header.
+
+    Returns a tuple of the parsed content type and a
+    MappingProxyType of parameters.
+    """
+    msg = HeaderParser().parsestr(f"Content-Type: {raw}")
+    content_type = msg.get_content_type()
+    params = msg.get_params(())
+    content_dict = dict(params[1:])  # First element is content type again
+    return content_type, MappingProxyType(content_dict)
+
+
+def guess_filename(obj: Any, default: Optional[str] = None) -> Optional[str]:
+    name = getattr(obj, "name", None)
+    if name and isinstance(name, str) and name[0] != "<" and name[-1] != ">":
+        return Path(name).name
+    return default
+
+
+not_qtext_re = re.compile(r"[^\041\043-\133\135-\176]")
+QCONTENT = {chr(i) for i in range(0x20, 0x7F)} | {"\t"}
+
+
+def quoted_string(content: str) -> str:
+    """Return 7-bit content as quoted-string.
+
+    Format content into a quoted-string as defined in RFC5322 for
+    Internet Message Format. Notice that this is not the 8-bit HTTP
+    format, but the 7-bit email format. Content must be in usascii or
+    a ValueError is raised.
+    """
+    if not (QCONTENT > set(content)):
+        raise ValueError(f"bad content for quoted-string {content!r}")
+    return not_qtext_re.sub(lambda x: "\\" + x.group(0), content)
+
+
+def content_disposition_header(
+    disptype: str, quote_fields: bool = True, _charset: str = "utf-8", **params: str
+) -> str:
+    """Sets ``Content-Disposition`` header for MIME.
+
+    This is the MIME payload Content-Disposition header from RFC 2183
+    and RFC 7579 section 4.2, not the HTTP Content-Disposition from
+    RFC 6266.
+
+    disptype is a disposition type: inline, attachment, form-data.
+    Should be valid extension token (see RFC 2183)
+
+    quote_fields performs value quoting to 7-bit MIME headers
+    according to RFC 7578. Set to quote_fields to False if recipient
+    can take 8-bit file names and field values.
+
+    _charset specifies the charset to use when quote_fields is True.
+
+    params is a dict with disposition params.
+    """
+    if not disptype or not (TOKEN > set(disptype)):
+        raise ValueError(f"bad content disposition type {disptype!r}")
+
+    value = disptype
+    if params:
+        lparams = []
+        for key, val in params.items():
+            if not key or not (TOKEN > set(key)):
+                raise ValueError(f"bad content disposition parameter {key!r}={val!r}")
+            if quote_fields:
+                if key.lower() == "filename":
+                    qval = quote(val, "", encoding=_charset)
+                    lparams.append((key, '"%s"' % qval))
+                else:
+                    try:
+                        qval = quoted_string(val)
+                    except ValueError:
+                        qval = "".join(
+                            (_charset, "''", quote(val, "", encoding=_charset))
+                        )
+                        lparams.append((key + "*", qval))
+                    else:
+                        lparams.append((key, '"%s"' % qval))
+            else:
+                qval = val.replace("\\", "\\\\").replace('"', '\\"')
+                lparams.append((key, '"%s"' % qval))
+        sparams = "; ".join("=".join(pair) for pair in lparams)
+        value = "; ".join((value, sparams))
+    return value
+
+
+def is_ip_address(host: Optional[str]) -> bool:
+    """Check if host looks like an IP Address.
+
+    This check is only meant as a heuristic to ensure that
+    a host is not a domain name.
+    """
+    if not host:
+        return False
+    # For a host to be an ipv4 address, it must be all numeric.
+    # The host must contain a colon to be an IPv6 address.
+    return ":" in host or host.replace(".", "").isdigit()
+
+
+_cached_current_datetime: Optional[int] = None
+_cached_formatted_datetime = ""
+
+
+def rfc822_formatted_time() -> str:
+    global _cached_current_datetime
+    global _cached_formatted_datetime
+
+    now = int(time.time())
+    if now != _cached_current_datetime:
+        # Weekday and month names for HTTP date/time formatting;
+        # always English!
+        # Tuples are constants stored in codeobject!
+        _weekdayname = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")
+        _monthname = (
+            "",  # Dummy so we can use 1-based month numbers
+            "Jan",
+            "Feb",
+            "Mar",
+            "Apr",
+            "May",
+            "Jun",
+            "Jul",
+            "Aug",
+            "Sep",
+            "Oct",
+            "Nov",
+            "Dec",
+        )
+
+        year, month, day, hh, mm, ss, wd, *tail = time.gmtime(now)
+        _cached_formatted_datetime = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
+            _weekdayname[wd],
+            day,
+            _monthname[month],
+            year,
+            hh,
+            mm,
+            ss,
+        )
+        _cached_current_datetime = now
+    return _cached_formatted_datetime
+
+
+def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None:
+    ref, name = info
+    ob = ref()
+    if ob is not None:
+        with suppress(Exception):
+            getattr(ob, name)()
+
+
+def weakref_handle(
+    ob: object,
+    name: str,
+    timeout: float,
+    loop: asyncio.AbstractEventLoop,
+    timeout_ceil_threshold: float = 5,
+) -> Optional[asyncio.TimerHandle]:
+    if timeout is not None and timeout > 0:
+        when = loop.time() + timeout
+        if timeout >= timeout_ceil_threshold:
+            when = ceil(when)
+
+        return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name))
+    return None
+
+
+def call_later(
+    cb: Callable[[], Any],
+    timeout: float,
+    loop: asyncio.AbstractEventLoop,
+    timeout_ceil_threshold: float = 5,
+) -> Optional[asyncio.TimerHandle]:
+    if timeout is None or timeout <= 0:
+        return None
+    now = loop.time()
+    when = calculate_timeout_when(now, timeout, timeout_ceil_threshold)
+    return loop.call_at(when, cb)
+
+
+def calculate_timeout_when(
+    loop_time: float,
+    timeout: float,
+    timeout_ceiling_threshold: float,
+) -> float:
+    """Calculate when to execute a timeout."""
+    when = loop_time + timeout
+    if timeout > timeout_ceiling_threshold:
+        return ceil(when)
+    return when
+
+
+class TimeoutHandle:
+    """Timeout handle"""
+
+    __slots__ = ("_timeout", "_loop", "_ceil_threshold", "_callbacks")
+
+    def __init__(
+        self,
+        loop: asyncio.AbstractEventLoop,
+        timeout: Optional[float],
+        ceil_threshold: float = 5,
+    ) -> None:
+        self._timeout = timeout
+        self._loop = loop
+        self._ceil_threshold = ceil_threshold
+        self._callbacks: List[
+            Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]
+        ] = []
+
+    def register(
+        self, callback: Callable[..., None], *args: Any, **kwargs: Any
+    ) -> None:
+        self._callbacks.append((callback, args, kwargs))
+
+    def close(self) -> None:
+        self._callbacks.clear()
+
+    def start(self) -> Optional[asyncio.TimerHandle]:
+        timeout = self._timeout
+        if timeout is not None and timeout > 0:
+            when = self._loop.time() + timeout
+            if timeout >= self._ceil_threshold:
+                when = ceil(when)
+            return self._loop.call_at(when, self.__call__)
+        else:
+            return None
+
+    def timer(self) -> "BaseTimerContext":
+        if self._timeout is not None and self._timeout > 0:
+            timer = TimerContext(self._loop)
+            self.register(timer.timeout)
+            return timer
+        else:
+            return TimerNoop()
+
+    def __call__(self) -> None:
+        for cb, args, kwargs in self._callbacks:
+            with suppress(Exception):
+                cb(*args, **kwargs)
+
+        self._callbacks.clear()
+
+
+class BaseTimerContext(ContextManager["BaseTimerContext"]):
+
+    __slots__ = ()
+
+    def assert_timeout(self) -> None:
+        """Raise TimeoutError if timeout has been exceeded."""
+
+
+class TimerNoop(BaseTimerContext):
+
+    __slots__ = ()
+
+    def __enter__(self) -> BaseTimerContext:
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> None:
+        return
+
+
+class TimerContext(BaseTimerContext):
+    """Low resolution timeout context manager"""
+
+    __slots__ = ("_loop", "_tasks", "_cancelled", "_cancelling")
+
+    def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
+        self._loop = loop
+        self._tasks: List[asyncio.Task[Any]] = []
+        self._cancelled = False
+        self._cancelling = 0
+
+    def assert_timeout(self) -> None:
+        """Raise TimeoutError if timer has already been cancelled."""
+        if self._cancelled:
+            raise asyncio.TimeoutError from None
+
+    def __enter__(self) -> BaseTimerContext:
+        task = asyncio.current_task(loop=self._loop)
+        if task is None:
+            raise RuntimeError("Timeout context manager should be used inside a task")
+
+        if sys.version_info >= (3, 11):
+            # Remember if the task was already cancelling
+            # so when we __exit__ we can decide if we should
+            # raise asyncio.TimeoutError or let the cancellation propagate
+            self._cancelling = task.cancelling()
+
+        if self._cancelled:
+            raise asyncio.TimeoutError from None
+
+        self._tasks.append(task)
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        enter_task: Optional[asyncio.Task[Any]] = None
+        if self._tasks:
+            enter_task = self._tasks.pop()
+
+        if exc_type is asyncio.CancelledError and self._cancelled:
+            assert enter_task is not None
+            # The timeout was hit, and the task was cancelled
+            # so we need to uncancel the last task that entered the context manager
+            # since the cancellation should not leak out of the context manager
+            if sys.version_info >= (3, 11):
+                # If the task was already cancelling don't raise
+                # asyncio.TimeoutError and instead return None
+                # to allow the cancellation to propagate
+                if enter_task.uncancel() > self._cancelling:
+                    return None
+            raise asyncio.TimeoutError from exc_val
+        return None
+
+    def timeout(self) -> None:
+        if not self._cancelled:
+            for task in set(self._tasks):
+                task.cancel()
+
+            self._cancelled = True
+
+
+def ceil_timeout(
+    delay: Optional[float], ceil_threshold: float = 5
+) -> async_timeout.Timeout:
+    if delay is None or delay <= 0:
+        return async_timeout.timeout(None)
+
+    loop = asyncio.get_running_loop()
+    now = loop.time()
+    when = now + delay
+    if delay > ceil_threshold:
+        when = ceil(when)
+    return async_timeout.timeout_at(when)
+
+
+class HeadersMixin:
+    """Mixin for handling headers."""
+
+    ATTRS = frozenset(["_content_type", "_content_dict", "_stored_content_type"])
+
+    _headers: MultiMapping[str]
+    _content_type: Optional[str] = None
+    _content_dict: Optional[Dict[str, str]] = None
+    _stored_content_type: Union[str, None, _SENTINEL] = sentinel
+
+    def _parse_content_type(self, raw: Optional[str]) -> None:
+        self._stored_content_type = raw
+        if raw is None:
+            # default value according to RFC 2616
+            self._content_type = "application/octet-stream"
+            self._content_dict = {}
+        else:
+            content_type, content_mapping_proxy = parse_content_type(raw)
+            self._content_type = content_type
+            # _content_dict needs to be mutable so we can update it
+            self._content_dict = content_mapping_proxy.copy()
+
+    @property
+    def content_type(self) -> str:
+        """The value of content part for Content-Type HTTP header."""
+        raw = self._headers.get(hdrs.CONTENT_TYPE)
+        if self._stored_content_type != raw:
+            self._parse_content_type(raw)
+        assert self._content_type is not None
+        return self._content_type
+
+    @property
+    def charset(self) -> Optional[str]:
+        """The value of charset part for Content-Type HTTP header."""
+        raw = self._headers.get(hdrs.CONTENT_TYPE)
+        if self._stored_content_type != raw:
+            self._parse_content_type(raw)
+        assert self._content_dict is not None
+        return self._content_dict.get("charset")
+
+    @property
+    def content_length(self) -> Optional[int]:
+        """The value of Content-Length HTTP header."""
+        content_length = self._headers.get(hdrs.CONTENT_LENGTH)
+        return None if content_length is None else int(content_length)
+
+
+def set_result(fut: "asyncio.Future[_T]", result: _T) -> None:
+    if not fut.done():
+        fut.set_result(result)
+
+
+_EXC_SENTINEL = BaseException()
+
+
+class ErrorableProtocol(Protocol):
+    def set_exception(
+        self,
+        exc: BaseException,
+        exc_cause: BaseException = ...,
+    ) -> None: ...  # pragma: no cover
+
+
+def set_exception(
+    fut: "asyncio.Future[_T] | ErrorableProtocol",
+    exc: BaseException,
+    exc_cause: BaseException = _EXC_SENTINEL,
+) -> None:
+    """Set future exception.
+
+    If the future is marked as complete, this function is a no-op.
+
+    :param exc_cause: An exception that is a direct cause of ``exc``.
+                      Only set if provided.
+    """
+    if asyncio.isfuture(fut) and fut.done():
+        return
+
+    exc_is_sentinel = exc_cause is _EXC_SENTINEL
+    exc_causes_itself = exc is exc_cause
+    if not exc_is_sentinel and not exc_causes_itself:
+        exc.__cause__ = exc_cause
+
+    fut.set_exception(exc)
+
+
+@functools.total_ordering
+class AppKey(Generic[_T]):
+    """Keys for static typing support in Application."""
+
+    __slots__ = ("_name", "_t", "__orig_class__")
+
+    # This may be set by Python when instantiating with a generic type. We need to
+    # support this, in order to support types that are not concrete classes,
+    # like Iterable, which can't be passed as the second parameter to __init__.
+    __orig_class__: Type[object]
+
+    def __init__(self, name: str, t: Optional[Type[_T]] = None):
+        # Prefix with module name to help deduplicate key names.
+        frame = inspect.currentframe()
+        while frame:
+            if frame.f_code.co_name == "<module>":
+                module: str = frame.f_globals["__name__"]
+                break
+            frame = frame.f_back
+
+        self._name = module + "." + name
+        self._t = t
+
+    def __lt__(self, other: object) -> bool:
+        if isinstance(other, AppKey):
+            return self._name < other._name
+        return True  # Order AppKey above other types.
+
+    def __repr__(self) -> str:
+        t = self._t
+        if t is None:
+            with suppress(AttributeError):
+                # Set to type arg.
+                t = get_args(self.__orig_class__)[0]
+
+        if t is None:
+            t_repr = "<<Unknown>>"
+        elif isinstance(t, type):
+            if t.__module__ == "builtins":
+                t_repr = t.__qualname__
+            else:
+                t_repr = f"{t.__module__}.{t.__qualname__}"
+        else:
+            t_repr = repr(t)
+        return f"<AppKey({self._name}, type={t_repr})>"
+
+
+class ChainMapProxy(Mapping[Union[str, AppKey[Any]], Any]):
+    __slots__ = ("_maps",)
+
+    def __init__(self, maps: Iterable[Mapping[Union[str, AppKey[Any]], Any]]) -> None:
+        self._maps = tuple(maps)
+
+    def __init_subclass__(cls) -> None:
+        raise TypeError(
+            "Inheritance class {} from ChainMapProxy "
+            "is forbidden".format(cls.__name__)
+        )
+
+    @overload  # type: ignore[override]
+    def __getitem__(self, key: AppKey[_T]) -> _T: ...
+
+    @overload
+    def __getitem__(self, key: str) -> Any: ...
+
+    def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any:
+        for mapping in self._maps:
+            try:
+                return mapping[key]
+            except KeyError:
+                pass
+        raise KeyError(key)
+
+    @overload  # type: ignore[override]
+    def get(self, key: AppKey[_T], default: _S) -> Union[_T, _S]: ...
+
+    @overload
+    def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: ...
+
+    @overload
+    def get(self, key: str, default: Any = ...) -> Any: ...
+
+    def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any:
+        try:
+            return self[key]
+        except KeyError:
+            return default
+
+    def __len__(self) -> int:
+        # reuses stored hash values if possible
+        return len(set().union(*self._maps))
+
+    def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]:
+        d: Dict[Union[str, AppKey[Any]], Any] = {}
+        for mapping in reversed(self._maps):
+            # reuses stored hash values if possible
+            d.update(mapping)
+        return iter(d)
+
+    def __contains__(self, key: object) -> bool:
+        return any(key in m for m in self._maps)
+
+    def __bool__(self) -> bool:
+        return any(self._maps)
+
+    def __repr__(self) -> str:
+        content = ", ".join(map(repr, self._maps))
+        return f"ChainMapProxy({content})"
+
+
+# https://tools.ietf.org/html/rfc7232#section-2.3
+_ETAGC = r"[!\x23-\x7E\x80-\xff]+"
+_ETAGC_RE = re.compile(_ETAGC)
+_QUOTED_ETAG = rf'(W/)?"({_ETAGC})"'
+QUOTED_ETAG_RE = re.compile(_QUOTED_ETAG)
+LIST_QUOTED_ETAG_RE = re.compile(rf"({_QUOTED_ETAG})(?:\s*,\s*|$)|(.)")
+
+ETAG_ANY = "*"
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class ETag:
+    value: str
+    is_weak: bool = False
+
+
+def validate_etag_value(value: str) -> None:
+    if value != ETAG_ANY and not _ETAGC_RE.fullmatch(value):
+        raise ValueError(
+            f"Value {value!r} is not a valid etag. Maybe it contains '\"'?"
+        )
+
+
+def parse_http_date(date_str: Optional[str]) -> Optional[datetime.datetime]:
+    """Process a date string, return a datetime object"""
+    if date_str is not None:
+        timetuple = parsedate(date_str)
+        if timetuple is not None:
+            with suppress(ValueError):
+                return datetime.datetime(*timetuple[:6], tzinfo=datetime.timezone.utc)
+    return None
+
+
+@functools.lru_cache
+def must_be_empty_body(method: str, code: int) -> bool:
+    """Check if a request must return an empty body."""
+    return (
+        code in EMPTY_BODY_STATUS_CODES
+        or method in EMPTY_BODY_METHODS
+        or (200 <= code < 300 and method in hdrs.METH_CONNECT_ALL)
+    )
+
+
+def should_remove_content_length(method: str, code: int) -> bool:
+    """Check if a Content-Length header should be removed.
+
+    This should always be a subset of must_be_empty_body
+    """
+    # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-8
+    # https://www.rfc-editor.org/rfc/rfc9110.html#section-15.4.5-4
+    return code in EMPTY_BODY_STATUS_CODES or (
+        200 <= code < 300 and method in hdrs.METH_CONNECT_ALL
+    )