about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/h2/utilities.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/h2/utilities.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/h2/utilities.py')
-rw-r--r--.venv/lib/python3.12/site-packages/h2/utilities.py696
1 files changed, 696 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/h2/utilities.py b/.venv/lib/python3.12/site-packages/h2/utilities.py
new file mode 100644
index 00000000..8cafdbd5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/h2/utilities.py
@@ -0,0 +1,696 @@
+"""
+h2/utilities
+~~~~~~~~~~~~
+
+Utility functions that do not belong in a separate module.
+"""
+from __future__ import annotations
+
+import collections
+import re
+from string import whitespace
+from typing import TYPE_CHECKING, Any, NamedTuple
+
+from hpack.struct import HeaderTuple, NeverIndexedHeaderTuple
+
+from .exceptions import FlowControlError, ProtocolError
+
+if TYPE_CHECKING:  # pragma: no cover
+    from collections.abc import Generator, Iterable
+
+    from hpack.struct import Header, HeaderWeaklyTyped
+
+UPPER_RE = re.compile(b"[A-Z]")
+SIGIL = ord(b":")
+INFORMATIONAL_START = ord(b"1")
+
+
+# A set of headers that are hop-by-hop or connection-specific and thus
+# forbidden in HTTP/2. This list comes from RFC 7540 § 8.1.2.2.
+CONNECTION_HEADERS = frozenset([
+    b"connection",
+    b"proxy-connection",
+    b"keep-alive",
+    b"transfer-encoding",
+    b"upgrade",
+])
+
+
+_ALLOWED_PSEUDO_HEADER_FIELDS = frozenset([
+    b":method",
+    b":scheme",
+    b":authority",
+    b":path",
+    b":status",
+    b":protocol",
+])
+
+
+_SECURE_HEADERS = frozenset([
+    # May have basic credentials which are vulnerable to dictionary attacks.
+    b"authorization",
+    b"proxy-authorization",
+])
+
+
+_REQUEST_ONLY_HEADERS = frozenset([
+    b":scheme",
+    b":path",
+    b":authority",
+    b":method",
+    b":protocol",
+])
+
+
+_RESPONSE_ONLY_HEADERS = frozenset([b":status"])
+
+
+# A Set of pseudo headers that are only valid if the method is
+# CONNECT, see RFC 8441 § 5
+_CONNECT_REQUEST_ONLY_HEADERS = frozenset([b":protocol"])
+
+
+_WHITESPACE = frozenset(map(ord, whitespace))
+
+
+def _secure_headers(headers: Iterable[Header],
+                    hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]:
+    """
+    Certain headers are at risk of being attacked during the header compression
+    phase, and so need to be kept out of header compression contexts. This
+    function automatically transforms certain specific headers into HPACK
+    never-indexed fields to ensure they don't get added to header compression
+    contexts.
+
+    This function currently implements two rules:
+
+    - 'authorization' and 'proxy-authorization' fields are automatically made
+      never-indexed.
+    - Any 'cookie' header field shorter than 20 bytes long is made
+      never-indexed.
+
+    These fields are the most at-risk. These rules are inspired by Firefox
+    and nghttp2.
+    """
+    for header in headers:
+        assert isinstance(header[0], bytes)
+        if header[0] in _SECURE_HEADERS or (header[0] in b"cookie" and len(header[1]) < 20):
+            yield NeverIndexedHeaderTuple(header[0], header[1])
+        else:
+            yield header
+
+
+def extract_method_header(headers: Iterable[Header]) -> bytes | None:
+    """
+    Extracts the request method from the headers list.
+    """
+    for k, v in headers:
+        if isinstance(v, bytes) and k == b":method":
+            return v
+        if isinstance(v, str) and k == ":method":
+            return v.encode("utf-8")  # pragma: no cover
+    return None
+
+
+def is_informational_response(headers: Iterable[Header]) -> bool:
+    """
+    Searches headers list for a :status header to confirm that a given
+    collection of headers are an informational response. Assumes the header
+    are well formed and encoded as bytes: that is, that the HTTP/2 special
+    headers are first in the block, and so that it can stop looking when it
+    finds the first header field whose name does not begin with a colon.
+
+    :param headers: The HTTP/2 headers.
+    :returns: A boolean indicating if this is an informational response.
+    """
+    for n, v in headers:
+        if not n.startswith(b":"):
+            return False
+        if n != b":status":
+            # If we find a non-special header, we're done here: stop looping.
+            continue
+        # If the first digit is a 1, we've got informational headers.
+        return v.startswith(b"1")
+    return False
+
+
+def guard_increment_window(current: int, increment: int) -> int:
+    """
+    Increments a flow control window, guarding against that window becoming too
+    large.
+
+    :param current: The current value of the flow control window.
+    :param increment: The increment to apply to that window.
+    :returns: The new value of the window.
+    :raises: ``FlowControlError``
+    """
+    # The largest value the flow control window may take.
+    LARGEST_FLOW_CONTROL_WINDOW = 2**31 - 1  # noqa: N806
+
+    new_size = current + increment
+
+    if new_size > LARGEST_FLOW_CONTROL_WINDOW:
+        msg = f"May not increment flow control window past {LARGEST_FLOW_CONTROL_WINDOW}"
+        raise FlowControlError(msg)
+
+    return new_size
+
+
+def authority_from_headers(headers: Iterable[Header]) -> bytes | None:
+    """
+    Given a header set, searches for the authority header and returns the
+    value.
+
+    Note that this doesn't use indexing, so should only be called if the
+    headers are for a client request. Otherwise, will loop over the entire
+    header set, which is potentially unwise.
+
+    :param headers: The HTTP header set.
+    :returns: The value of the authority header, or ``None``.
+    :rtype: ``bytes`` or ``None``.
+    """
+    for n, v in headers:
+        if n == b":authority":
+            return v
+
+    return None
+
+
+# Flags used by the validate_headers pipeline to determine which checks
+# should be applied to a given set of headers.
+class HeaderValidationFlags(NamedTuple):
+    is_client: bool
+    is_trailer: bool
+    is_response_header: bool
+    is_push_promise: bool
+
+
+def validate_headers(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Iterable[Header]:
+    """
+    Validates a header sequence against a set of constraints from RFC 7540.
+
+    :param headers: The HTTP header set.
+    :param hdr_validation_flags: An instance of HeaderValidationFlags.
+    """
+    # This validation logic is built on a sequence of generators that are
+    # iterated over to provide the final header list. This reduces some of the
+    # overhead of doing this checking. However, it's worth noting that this
+    # checking remains somewhat expensive, and attempts should be made wherever
+    # possible to reduce the time spent doing them.
+    #
+    # For example, we avoid tuple unpacking in loops because it represents a
+    # fixed cost that we don't want to spend, instead indexing into the header
+    # tuples.
+    headers = _reject_empty_header_names(
+        headers, hdr_validation_flags,
+    )
+    headers = _reject_uppercase_header_fields(
+        headers, hdr_validation_flags,
+    )
+    headers = _reject_surrounding_whitespace(
+        headers, hdr_validation_flags,
+    )
+    headers = _reject_te(
+        headers, hdr_validation_flags,
+    )
+    headers = _reject_connection_header(
+        headers, hdr_validation_flags,
+    )
+    headers = _reject_pseudo_header_fields(
+        headers, hdr_validation_flags,
+    )
+    headers = _check_host_authority_header(
+        headers, hdr_validation_flags,
+    )
+    return _check_path_header(headers, hdr_validation_flags)
+
+
+
+def _reject_empty_header_names(headers: Iterable[Header],
+                               hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises a ProtocolError if any header names are empty (length 0).
+    While hpack decodes such headers without errors, they are semantically
+    forbidden in HTTP, see RFC 7230, stating that they must be at least one
+    character long.
+    """
+    for header in headers:
+        if len(header[0]) == 0:
+            msg = "Received header name with zero length."
+            raise ProtocolError(msg)
+        yield header
+
+
+def _reject_uppercase_header_fields(headers: Iterable[Header],
+                                    hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises a ProtocolError if any uppercase character is found in a header
+    block.
+    """
+    for header in headers:
+        if UPPER_RE.search(header[0]):
+            msg = f"Received uppercase header name {header[0]!r}."
+            raise ProtocolError(msg)
+        yield header
+
+
+def _reject_surrounding_whitespace(headers: Iterable[Header],
+                                   hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises a ProtocolError if any header name or value is surrounded by
+    whitespace characters.
+    """
+    # For compatibility with RFC 7230 header fields, we need to allow the field
+    # value to be an empty string. This is ludicrous, but technically allowed.
+    # The field name may not be empty, though, so we can safely assume that it
+    # must have at least one character in it and throw exceptions if it
+    # doesn't.
+    for header in headers:
+        if header[0][0] in _WHITESPACE or header[0][-1] in _WHITESPACE:
+            msg = f"Received header name surrounded by whitespace {header[0]!r}"
+            raise ProtocolError(msg)
+        if header[1] and ((header[1][0] in _WHITESPACE) or
+           (header[1][-1] in _WHITESPACE)):
+            msg = f"Received header value surrounded by whitespace {header[1]!r}"
+            raise ProtocolError(msg)
+        yield header
+
+
+def _reject_te(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises a ProtocolError if the TE header is present in a header block and
+    its value is anything other than "trailers".
+    """
+    for header in headers:
+        if header[0] == b"te" and header[1].lower() != b"trailers":
+            msg = f"Invalid value for TE header: {header[1]!r}"
+            raise ProtocolError(msg)
+
+        yield header
+
+
+def _reject_connection_header(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises a ProtocolError if the Connection header is present in a header
+    block.
+    """
+    for header in headers:
+        if header[0] in CONNECTION_HEADERS:
+            msg = f"Connection-specific header field present: {header[0]!r}."
+            raise ProtocolError(msg)
+
+        yield header
+
+
+def _assert_header_in_set(bytes_header: bytes,
+                          header_set: set[bytes | str] | set[bytes] | set[str]) -> None:
+    """
+    Given a set of header names, checks whether the string or byte version of
+    the header name is present. Raises a Protocol error with the appropriate
+    error if it's missing.
+    """
+    if bytes_header not in header_set:
+        msg = f"Header block missing mandatory {bytes_header!r} header"
+        raise ProtocolError(msg)
+
+
+def _reject_pseudo_header_fields(headers: Iterable[Header],
+                                 hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises a ProtocolError if duplicate pseudo-header fields are found in a
+    header block or if a pseudo-header field appears in a block after an
+    ordinary header field.
+
+    Raises a ProtocolError if pseudo-header fields are found in trailers.
+    """
+    seen_pseudo_header_fields = set()
+    seen_regular_header = False
+    method = None
+
+    for header in headers:
+        if header[0][0] == SIGIL:
+            if header[0] in seen_pseudo_header_fields:
+                msg = f"Received duplicate pseudo-header field {header[0]!r}"
+                raise ProtocolError(msg)
+
+            seen_pseudo_header_fields.add(header[0])
+
+            if seen_regular_header:
+                msg = f"Received pseudo-header field out of sequence: {header[0]!r}"
+                raise ProtocolError(msg)
+
+            if header[0] not in _ALLOWED_PSEUDO_HEADER_FIELDS:
+                msg = f"Received custom pseudo-header field {header[0]!r}"
+                raise ProtocolError(msg)
+
+            if header[0] in b":method":
+                method = header[1]
+
+        else:
+            seen_regular_header = True
+
+        yield header
+
+    # Check the pseudo-headers we got to confirm they're acceptable.
+    _check_pseudo_header_field_acceptability(
+        seen_pseudo_header_fields, method, hdr_validation_flags,
+    )
+
+
+def _check_pseudo_header_field_acceptability(pseudo_headers: set[bytes | str] | set[bytes] | set[str],
+                                             method: bytes | None,
+                                             hdr_validation_flags: HeaderValidationFlags) -> None:
+    """
+    Given the set of pseudo-headers present in a header block and the
+    validation flags, confirms that RFC 7540 allows them.
+    """
+    # Pseudo-header fields MUST NOT appear in trailers - RFC 7540 § 8.1.2.1
+    if hdr_validation_flags.is_trailer and pseudo_headers:
+        msg = f"Received pseudo-header in trailer {pseudo_headers}"
+        raise ProtocolError(msg)
+
+    # If ':status' pseudo-header is not there in a response header, reject it.
+    # Similarly, if ':path', ':method', or ':scheme' are not there in a request
+    # header, reject it. Additionally, if a response contains any request-only
+    # headers or vice-versa, reject it.
+    # Relevant RFC section: RFC 7540 § 8.1.2.4
+    # https://tools.ietf.org/html/rfc7540#section-8.1.2.4
+    if hdr_validation_flags.is_response_header:
+        _assert_header_in_set(b":status", pseudo_headers)
+        invalid_response_headers = pseudo_headers & _REQUEST_ONLY_HEADERS
+        if invalid_response_headers:
+            msg = f"Encountered request-only headers {invalid_response_headers}"
+            raise ProtocolError(msg)
+    elif (not hdr_validation_flags.is_response_header and
+          not hdr_validation_flags.is_trailer):
+        # This is a request, so we need to have seen :path, :method, and
+        # :scheme.
+        _assert_header_in_set(b":path", pseudo_headers)
+        _assert_header_in_set(b":method", pseudo_headers)
+        _assert_header_in_set(b":scheme", pseudo_headers)
+        invalid_request_headers = pseudo_headers & _RESPONSE_ONLY_HEADERS
+        if invalid_request_headers:
+            msg = f"Encountered response-only headers {invalid_request_headers}"
+            raise ProtocolError(msg)
+        if method != b"CONNECT":
+            invalid_headers = pseudo_headers & _CONNECT_REQUEST_ONLY_HEADERS
+            if invalid_headers:
+                msg = f"Encountered connect-request-only headers {invalid_headers!r}"
+                raise ProtocolError(msg)
+
+
+def _validate_host_authority_header(headers: Iterable[Header]) -> Generator[Header, None, None]:
+    """
+    Given the :authority and Host headers from a request block that isn't
+    a trailer, check that:
+     1. At least one of these headers is set.
+     2. If both headers are set, they match.
+
+    :param headers: The HTTP header set.
+    :raises: ``ProtocolError``
+    """
+    # We use None as a sentinel value.  Iterate over the list of headers,
+    # and record the value of these headers (if present).  We don't need
+    # to worry about receiving duplicate :authority headers, as this is
+    # enforced by the _reject_pseudo_header_fields() pipeline.
+    #
+    # TODO: We should also guard against receiving duplicate Host headers,
+    # and against sending duplicate headers.
+    authority_header_val = None
+    host_header_val = None
+
+    for header in headers:
+        if header[0] == b":authority":
+            authority_header_val = header[1]
+        elif header[0] == b"host":
+            host_header_val = header[1]
+
+        yield header
+
+    # If we have not-None values for these variables, then we know we saw
+    # the corresponding header.
+    authority_present = (authority_header_val is not None)
+    host_present = (host_header_val is not None)
+
+    # It is an error for a request header block to contain neither
+    # an :authority header nor a Host header.
+    if not authority_present and not host_present:
+        msg = "Request header block does not have an :authority or Host header."
+        raise ProtocolError(msg)
+
+    # If we receive both headers, they should definitely match.
+    if authority_present and host_present and authority_header_val != host_header_val:
+        msg = (
+            "Request header block has mismatched :authority and "
+            f"Host headers: {authority_header_val!r} / {host_header_val!r}"
+        )
+        raise ProtocolError(msg)
+
+
+def _check_host_authority_header(headers: Iterable[Header],
+                                 hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises a ProtocolError if a header block arrives that does not contain an
+    :authority or a Host header, or if a header block contains both fields,
+    but their values do not match.
+    """
+    # We only expect to see :authority and Host headers on request header
+    # blocks that aren't trailers, so skip this validation if this is a
+    # response header or we're looking at trailer blocks.
+    skip_validation = (
+        hdr_validation_flags.is_response_header or
+        hdr_validation_flags.is_trailer
+    )
+    if skip_validation:
+        return (h for h in headers)
+
+    return _validate_host_authority_header(headers)
+
+
+def _check_path_header(headers: Iterable[Header],
+                       hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raise a ProtocolError if a header block arrives or is sent that contains an
+    empty :path header.
+    """
+    def inner() -> Generator[Header, None, None]:
+        for header in headers:
+            if header[0] == b":path" and not header[1]:
+                msg = "An empty :path header is forbidden"
+                raise ProtocolError(msg)
+
+            yield header
+
+    # We only expect to see :authority and Host headers on request header
+    # blocks that aren't trailers, so skip this validation if this is a
+    # response header or we're looking at trailer blocks.
+    skip_validation = (
+        hdr_validation_flags.is_response_header or
+        hdr_validation_flags.is_trailer
+    )
+    if skip_validation:
+        return (h for h in headers)
+    return inner()
+
+
+def _to_bytes(v: bytes | str) -> bytes:
+    """
+    Given an assumed `str` (or anything that supports `.encode()`),
+    encodes it using utf-8 into bytes. Returns the unmodified object
+    if it is already a `bytes` object.
+    """
+    return v if isinstance(v, bytes) else v.encode("utf-8")
+
+
+def utf8_encode_headers(headers: Iterable[HeaderWeaklyTyped]) -> list[Header]:
+    """
+    Given an iterable of header two-tuples, rebuilds that as a list with the
+    header names and values encoded as utf-8 bytes. This function produces
+    tuples that preserve the original type of the header tuple for tuple and
+    any ``HeaderTuple``.
+    """
+    encoded_headers: list[Header] = []
+    for header in headers:
+        h = (_to_bytes(header[0]), _to_bytes(header[1]))
+        if isinstance(header, HeaderTuple):
+            encoded_headers.append(header.__class__(h[0], h[1]))
+        else:
+            encoded_headers.append(h)
+    return encoded_headers
+
+
+def _lowercase_header_names(headers: Iterable[Header],
+                            hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]:
+    """
+    Given an iterable of header two-tuples, rebuilds that iterable with the
+    header names lowercased. This generator produces tuples that preserve the
+    original type of the header tuple for tuple and any ``HeaderTuple``.
+    """
+    for header in headers:
+        if isinstance(header, HeaderTuple):
+            yield header.__class__(header[0].lower(), header[1])
+        else:
+            yield (header[0].lower(), header[1])
+
+
+def _strip_surrounding_whitespace(headers: Iterable[Header],
+                                  hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]:
+    """
+    Given an iterable of header two-tuples, strip both leading and trailing
+    whitespace from both header names and header values. This generator
+    produces tuples that preserve the original type of the header tuple for
+    tuple and any ``HeaderTuple``.
+    """
+    for header in headers:
+        if isinstance(header, HeaderTuple):
+            yield header.__class__(header[0].strip(), header[1].strip())
+        else:
+            yield (header[0].strip(), header[1].strip())
+
+
+def _strip_connection_headers(headers: Iterable[Header],
+                              hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]:
+    """
+    Strip any connection headers as per RFC7540 § 8.1.2.2.
+    """
+    for header in headers:
+        if header[0] not in CONNECTION_HEADERS:
+            yield header
+
+
+def _check_sent_host_authority_header(headers: Iterable[Header],
+                                      hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Raises an InvalidHeaderBlockError if we try to send a header block
+    that does not contain an :authority or a Host header, or if
+    the header block contains both fields, but their values do not match.
+    """
+    # We only expect to see :authority and Host headers on request header
+    # blocks that aren't trailers, so skip this validation if this is a
+    # response header or we're looking at trailer blocks.
+    skip_validation = (
+        hdr_validation_flags.is_response_header or
+        hdr_validation_flags.is_trailer
+    )
+    if skip_validation:
+        return (h for h in headers)
+
+    return _validate_host_authority_header(headers)
+
+
+def _combine_cookie_fields(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    RFC 7540 § 8.1.2.5 allows HTTP/2 clients to split the Cookie header field,
+    which must normally appear only once, into multiple fields for better
+    compression. However, they MUST be joined back up again when received.
+    This normalization step applies that transform. The side-effect is that
+    all cookie fields now appear *last* in the header block.
+    """
+    # There is a problem here about header indexing. Specifically, it's
+    # possible that all these cookies are sent with different header indexing
+    # values. At this point it shouldn't matter too much, so we apply our own
+    # logic and make them never-indexed.
+    cookies: list[bytes] = []
+    for header in headers:
+        if header[0] == b"cookie":
+            cookies.append(header[1])
+        else:
+            yield header
+    if cookies:
+        cookie_val = b"; ".join(cookies)
+        yield NeverIndexedHeaderTuple(b"cookie", cookie_val)
+
+
+def _split_outbound_cookie_fields(headers: Iterable[Header],
+                                  hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]:
+    """
+    RFC 7540 § 8.1.2.5 allows for better compression efficiency,
+    to split the Cookie header field into separate header fields
+
+    We want to do it for outbound requests, as we are doing for
+    inbound.
+    """
+    for header in headers:
+        assert isinstance(header[0], bytes)
+        assert isinstance(header[1], bytes)
+        if header[0] == b"cookie":
+            for cookie_val in header[1].split(b"; "):
+                if isinstance(header, HeaderTuple):
+                    yield header.__class__(header[0], cookie_val)
+                else:
+                    yield header[0], cookie_val
+        else:
+            yield header
+
+
+def normalize_outbound_headers(headers: Iterable[Header],
+                               hdr_validation_flags: HeaderValidationFlags | None,
+                               should_split_outbound_cookies: bool=False) -> Generator[Header, None, None]:
+    """
+    Normalizes a header sequence that we are about to send.
+
+    :param headers: The HTTP header set.
+    :param hdr_validation_flags: An instance of HeaderValidationFlags.
+    :param should_split_outbound_cookies: boolean flag
+    """
+    headers = _lowercase_header_names(headers, hdr_validation_flags)
+    if should_split_outbound_cookies:
+        headers = _split_outbound_cookie_fields(headers, hdr_validation_flags)
+    headers = _strip_surrounding_whitespace(headers, hdr_validation_flags)
+    headers = _strip_connection_headers(headers, hdr_validation_flags)
+    return _secure_headers(headers, hdr_validation_flags)
+
+
+
+def normalize_inbound_headers(headers: Iterable[Header],
+                              hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Normalizes a header sequence that we have received.
+
+    :param headers: The HTTP header set.
+    :param hdr_validation_flags: An instance of HeaderValidationFlags
+    """
+    return _combine_cookie_fields(headers, hdr_validation_flags)
+
+
+def validate_outbound_headers(headers: Iterable[Header],
+                              hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
+    """
+    Validates and normalizes a header sequence that we are about to send.
+
+    :param headers: The HTTP header set.
+    :param hdr_validation_flags: An instance of HeaderValidationFlags.
+    """
+    headers = _reject_te(
+        headers, hdr_validation_flags,
+    )
+    headers = _reject_connection_header(
+        headers, hdr_validation_flags,
+    )
+    headers = _reject_pseudo_header_fields(
+        headers, hdr_validation_flags,
+    )
+    headers = _check_sent_host_authority_header(
+        headers, hdr_validation_flags,
+    )
+    return _check_path_header(headers, hdr_validation_flags)
+
+
+
+class SizeLimitDict(collections.OrderedDict[int, Any]):
+
+    def __init__(self, *args: dict[int, int], **kwargs: Any) -> None:
+        self._size_limit = kwargs.pop("size_limit", None)
+        super().__init__(*args, **kwargs)
+
+        self._check_size_limit()
+
+    def __setitem__(self, key: int, value: Any | int) -> None:
+        super().__setitem__(key, value)
+
+        self._check_size_limit()
+
+    def _check_size_limit(self) -> None:
+        if self._size_limit is not None:
+            while len(self) > self._size_limit:
+                self.popitem(last=False)